Files
go-cart-actor/cmd/checkout/pool-server.go
Mats Törnberg 8f2a354f9d
All checks were successful
Build and Publish / BuildAndDeployAmd64 (push) Successful in 43s
Build and Publish / BuildAndDeployArm64 (push) Successful in 4m28s
update
2025-12-03 18:27:37 +01:00

405 lines
12 KiB
Go

package main
import (
"bytes"
"context"
"encoding/json"
"log"
"net/http"
"os"
"strconv"
"time"
"git.k6n.net/go-cart-actor/pkg/actor"
"git.k6n.net/go-cart-actor/pkg/cart"
"git.k6n.net/go-cart-actor/pkg/checkout"
messages "git.k6n.net/go-cart-actor/proto/checkout"
adyen "github.com/adyen/adyen-go-api-library/v21/src/adyen"
"github.com/matst80/go-redis-inventory/pkg/inventory"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)
var (
grainMutations = promauto.NewCounter(prometheus.CounterOpts{
Name: "checkout_grain_mutations_total",
Help: "The total number of mutations",
})
grainLookups = promauto.NewCounter(prometheus.CounterOpts{
Name: "checkout_grain_lookups_total",
Help: "The total number of lookups",
})
)
type CheckoutPoolServer struct {
actor.GrainPool[*checkout.CheckoutGrain]
pod_name string
klarnaClient *KlarnaClient
adyenClient *adyen.APIClient
cartClient *CartClient
inventoryService *inventory.RedisInventoryService
}
func NewCheckoutPoolServer(pool actor.GrainPool[*checkout.CheckoutGrain], pod_name string, klarnaClient *KlarnaClient, cartClient *CartClient, adyenClient *adyen.APIClient) *CheckoutPoolServer {
srv := &CheckoutPoolServer{
GrainPool: pool,
pod_name: pod_name,
klarnaClient: klarnaClient,
cartClient: cartClient,
adyenClient: adyenClient,
}
return srv
}
func (s *CheckoutPoolServer) ApplyLocal(ctx context.Context, id checkout.CheckoutId, mutation ...proto.Message) (*actor.MutationResult[*checkout.CheckoutGrain], error) {
return s.Apply(ctx, uint64(id), mutation...)
}
func (s *CheckoutPoolServer) GetCheckoutHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
grain, err := s.Get(r.Context(), uint64(id))
if err != nil {
return err
}
return s.WriteResult(w, grain)
}
func (s *CheckoutPoolServer) SetDeliveryHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
var msg messages.SetDelivery
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
return err
}
result, err := s.ApplyLocal(r.Context(), id, &msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) RemoveDeliveryHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
deliveryId := r.PathValue("id")
uintDeliveryId, err := strconv.ParseUint(deliveryId, 10, 64)
if err != nil {
return err
}
msg := &messages.RemoveDelivery{
Id: uint32(uintDeliveryId),
}
result, err := s.ApplyLocal(r.Context(), id, msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) SetPickupPointHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
var msg messages.SetPickupPoint
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
return err
}
result, err := s.ApplyLocal(r.Context(), id, &msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) InitializeCheckoutHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
var msg messages.InitializeCheckout
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
return err
}
result, err := s.ApplyLocal(r.Context(), id, &msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) InventoryReservedHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
var msg messages.InventoryReserved
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
return err
}
result, err := s.ApplyLocal(r.Context(), id, &msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) OrderCreatedHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
var msg messages.OrderCreated
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
return err
}
result, err := s.ApplyLocal(r.Context(), id, &msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) ConfirmationViewedHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
var msg messages.ConfirmationViewed
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
return err
}
result, err := s.ApplyLocal(r.Context(), id, &msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) StartCheckoutHandler(w http.ResponseWriter, r *http.Request) {
cartIdStr := r.PathValue("cartid")
if cartIdStr == "" {
http.Error(w, "cart id required", http.StatusBadRequest)
return
}
cartId, ok := cart.ParseCartId(cartIdStr)
if !ok {
http.Error(w, "invalid cart id", http.StatusBadRequest)
return
}
// Fetch cart state from cart service
cartGrain, err := s.cartClient.getCartGrain(r.Context(), cartId)
if err != nil {
logger.Error("failed to fetch cart", "error", err, "cartId", cartId)
http.Error(w, "failed to fetch cart", http.StatusInternalServerError)
return
}
// Serialize cart state to Any
cartStateBytes, err := json.Marshal(cartGrain)
if err != nil {
logger.Error("failed to marshal cart state", "error", err)
http.Error(w, "failed to process cart state", http.StatusInternalServerError)
return
}
// Create checkout with same ID as cart
checkoutId := checkout.CheckoutId(cartId)
// Initialize checkout with cart state wrapped in Any
cartStateAny := &messages.InitializeCheckout{
OrderId: "",
CartId: uint64(cartId),
Version: uint32(cartGrain.Version),
CartState: &anypb.Any{
TypeUrl: "type.googleapis.com/cart.CartGrain",
Value: cartStateBytes,
},
}
result, err := s.ApplyLocal(r.Context(), checkoutId, cartStateAny)
if err != nil {
logger.Error("failed to initialize checkout", "error", err)
http.Error(w, "failed to initialize checkout", http.StatusInternalServerError)
return
}
// Set checkout cookie
setCheckoutCookie(w, result.Result.Id, r.TLS != nil)
if err := s.WriteResult(w, result.Result); err != nil {
logger.Error("failed to write result", "error", err)
}
}
func (s *CheckoutPoolServer) WriteResult(w http.ResponseWriter, result any) error {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("X-Pod-Name", s.pod_name)
if result == nil {
w.WriteHeader(http.StatusInternalServerError)
return nil
}
w.WriteHeader(http.StatusOK)
enc := json.NewEncoder(w)
err := enc.Encode(result)
return err
}
func (s *CheckoutPoolServer) CreateOrUpdateCheckout(r *http.Request, id checkout.CheckoutId) (*CheckoutOrder, error) {
// Get cart state from cart service
cartGrain, err := s.Get(r.Context(), uint64(id))
if err != nil {
return nil, err
}
meta := GetCheckoutMetaFromRequest(r)
payload, _, err := BuildCheckoutOrderPayload(cartGrain, meta)
if err != nil {
return nil, err
}
grain, err := s.Get(r.Context(), uint64(id))
if err != nil {
return nil, err
}
if grain.OrderId != nil {
return s.klarnaClient.UpdateOrder(r.Context(), *grain.OrderId, bytes.NewReader(payload))
} else {
return s.klarnaClient.CreateOrder(r.Context(), bytes.NewReader(payload))
}
}
func (s *CheckoutPoolServer) ApplyKlarnaPaymentStarted(ctx context.Context, klarnaOrder *CheckoutOrder, id checkout.CheckoutId) (*actor.MutationResult[*checkout.CheckoutGrain], error) {
method := "checkout"
return s.ApplyLocal(ctx, id, &messages.PaymentStarted{
PaymentId: klarnaOrder.ID,
Amount: int64(klarnaOrder.OrderAmount),
Currency: klarnaOrder.PurchaseCurrency,
Provider: "klarna",
Method: &method,
StartedAt: timestamppb.New(time.Now()),
})
}
// func (s *CheckoutPoolServer) CheckoutHandler(fn func(order *CheckoutOrder, w http.ResponseWriter) error) func(w http.ResponseWriter, r *http.Request) {
// return CheckoutIdHandler(s.ProxyHandler(func(w http.ResponseWriter, r *http.Request, checkoutId checkout.CheckoutId) error {
// orderId := r.URL.Query().Get("order_id")
// if orderId == "" {
// order, err := s.CreateOrUpdateCheckout(r, checkoutId)
// if err != nil {
// logger.Error("unable to create klarna session", "error", err)
// return err
// }
// s.ApplyKlarnaPaymentStarted(r.Context(), order, checkoutId)
// return fn(order, w)
// }
// order, err := s.klarnaClient.GetOrder(r.Context(), orderId)
// if err != nil {
// return err
// }
// return fn(order, w)
// }))
// }
var (
tracer = otel.Tracer(name)
hmacKey = os.Getenv("ADYEN_HMAC")
meter = otel.Meter(name)
logger = otelslog.NewLogger(name)
proxyCalls metric.Int64Counter
)
func init() {
var err error
proxyCalls, err = meter.Int64Counter("proxy.calls",
metric.WithDescription("Number of proxy calls"),
metric.WithUnit("{calls}"))
if err != nil {
panic(err)
}
}
func (s *CheckoutPoolServer) applyAnywhere(ctx context.Context, id uint64, msgs ...proto.Message) error {
host, found := s.OwnerHost(id)
if !found {
_, err := s.Apply(ctx, id, msgs...)
return err
}
_, err := host.Apply(ctx, id, msgs...)
return err
}
func (s *CheckoutPoolServer) getAnywhere(ctx context.Context, id uint64) (*checkout.CheckoutGrain, error) {
host, found := s.OwnerHost(id)
if !found {
grain, err := s.Get(ctx, id)
return grain, err
}
ret := &checkout.CheckoutGrain{}
err := host.Get(ctx, id, ret)
if err != nil {
return nil, err
}
return ret, nil
}
func (s *CheckoutPoolServer) Serve(mux *http.ServeMux) {
handleFunc := func(pattern string, handlerFunc func(http.ResponseWriter, *http.Request)) {
attr := attribute.String("http.route", pattern)
mux.HandleFunc(pattern, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
span := trace.SpanFromContext(r.Context())
span.SetName(pattern)
span.SetAttributes(attr)
labeler, _ := otelhttp.LabelerFromContext(r.Context())
labeler.Add(attr)
handlerFunc(w, r)
}))
}
handleFunc("/payment/adyen/session", CookieCheckoutIdHandler(s.AdyenSessionHandler))
handleFunc("/payment/adyen/push", s.AdyenHookHandler)
handleFunc("/payment/adyen/return", s.AdyenReturnHandler)
//handleFunc("/payment/adyen/cancel", s.AdyenCancelHandler)
handleFunc("/payment/klarna/validate", s.KlarnaValidationHandler)
handleFunc("/payment/klarna/push", s.KlarnaPushHandler)
handleFunc("/payment/klarna/notification", s.KlarnaNotificationHandler)
conn, err := amqp.Dial(amqpUrl)
if err != nil {
log.Fatalf("failed to connect to RabbitMQ: %v", err)
}
orderHandler := NewAmqpOrderHandler(conn)
orderHandler.DefineQueue()
handleFunc("POST /api/checkout/start/{cartid}", s.StartCheckoutHandler)
handleFunc("GET /api/checkout", CookieCheckoutIdHandler(s.ProxyHandler(s.GetCheckoutHandler)))
handleFunc("POST /api/checkout/delivery", CookieCheckoutIdHandler(s.ProxyHandler(s.SetDeliveryHandler)))
handleFunc("DELETE /api/checkout/delivery/{id}", CookieCheckoutIdHandler(s.ProxyHandler(s.RemoveDeliveryHandler)))
handleFunc("POST /api/checkout/pickup-point", CookieCheckoutIdHandler(s.ProxyHandler(s.SetPickupPointHandler)))
// handleFunc("POST /api/checkout/initialize", CookieCheckoutIdHandler(s.ProxyHandler(s.InitializeCheckoutHandler)))
// handleFunc("POST /api/checkout/inventory-reserved", CookieCheckoutIdHandler(s.ProxyHandler(s.InventoryReservedHandler)))
// handleFunc("POST /api/checkout/order-created", CookieCheckoutIdHandler(s.ProxyHandler(s.OrderCreatedHandler)))
// handleFunc("POST /api/checkout/confirmation-viewed", CookieCheckoutIdHandler(s.ProxyHandler(s.ConfirmationViewedHandler)))
handleFunc("GET /payment/klarna/session", CookieCheckoutIdHandler(s.ProxyHandler(s.KlarnaSessionHandler)))
handleFunc("GET /payment/klarna/checkout", CookieCheckoutIdHandler(s.ProxyHandler(s.KlarnaHtmlCheckoutHandler)))
handleFunc("GET /payment/klarna/confirmation/{order_id}", s.KlarnaConfirmationHandler)
}