Files
go-cart-actor/cmd/checkout/pool-server.go
matst80 0ef29596c1
All checks were successful
Build and Publish / BuildAndDeployAmd64 (push) Successful in 46s
Build and Publish / BuildAndDeployArm64 (push) Successful in 4m25s
update
2025-12-04 14:19:00 +01:00

543 lines
16 KiB
Go

package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"log"
"net/http"
"os"
"strconv"
"time"
"git.k6n.net/go-cart-actor/pkg/actor"
"git.k6n.net/go-cart-actor/pkg/cart"
"git.k6n.net/go-cart-actor/pkg/checkout"
messages "git.k6n.net/go-cart-actor/proto/checkout"
adyen "github.com/adyen/adyen-go-api-library/v21/src/adyen"
"github.com/matst80/go-redis-inventory/pkg/inventory"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)
var (
grainMutations = promauto.NewCounter(prometheus.CounterOpts{
Name: "checkout_grain_mutations_total",
Help: "The total number of mutations",
})
grainLookups = promauto.NewCounter(prometheus.CounterOpts{
Name: "checkout_grain_lookups_total",
Help: "The total number of lookups",
})
)
type CheckoutPoolServer struct {
actor.GrainPool[*checkout.CheckoutGrain]
pod_name string
klarnaClient *KlarnaClient
adyenClient *adyen.APIClient
cartClient *CartClient
inventoryService *inventory.RedisInventoryService
}
func NewCheckoutPoolServer(pool actor.GrainPool[*checkout.CheckoutGrain], pod_name string, klarnaClient *KlarnaClient, cartClient *CartClient, adyenClient *adyen.APIClient) *CheckoutPoolServer {
srv := &CheckoutPoolServer{
GrainPool: pool,
pod_name: pod_name,
klarnaClient: klarnaClient,
cartClient: cartClient,
adyenClient: adyenClient,
}
return srv
}
func (s *CheckoutPoolServer) ApplyLocal(ctx context.Context, id checkout.CheckoutId, mutation ...proto.Message) (*actor.MutationResult[*checkout.CheckoutGrain], error) {
return s.Apply(ctx, uint64(id), mutation...)
}
func (s *CheckoutPoolServer) GetCheckoutHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
grain, err := s.Get(r.Context(), uint64(id))
if err != nil {
return err
}
return s.WriteResult(w, grain)
}
func (s *CheckoutPoolServer) SetDeliveryHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
var msg messages.SetDelivery
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
return err
}
result, err := s.ApplyLocal(r.Context(), id, &msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) RemoveDeliveryHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
deliveryId := r.PathValue("id")
uintDeliveryId, err := strconv.ParseUint(deliveryId, 10, 64)
if err != nil {
return err
}
msg := &messages.RemoveDelivery{
Id: uint32(uintDeliveryId),
}
result, err := s.ApplyLocal(r.Context(), id, msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) SetPickupPointHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
var msg messages.SetPickupPoint
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
return err
}
result, err := s.ApplyLocal(r.Context(), id, &msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) InitializeCheckoutHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
var msg messages.InitializeCheckout
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
return err
}
result, err := s.ApplyLocal(r.Context(), id, &msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) InventoryReservedHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
var msg messages.InventoryReserved
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
return err
}
result, err := s.ApplyLocal(r.Context(), id, &msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) OrderCreatedHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
var msg messages.OrderCreated
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
return err
}
result, err := s.ApplyLocal(r.Context(), id, &msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) ConfirmationViewedHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
var msg messages.ConfirmationViewed
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
return err
}
result, err := s.ApplyLocal(r.Context(), id, &msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) ContactDetailsUpdatedHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error {
var msg messages.ContactDetailsUpdated
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
return err
}
result, err := s.ApplyLocal(r.Context(), id, &msg)
if err != nil {
return err
}
return s.WriteResult(w, result.Result)
}
func (s *CheckoutPoolServer) StartCheckoutHandler(w http.ResponseWriter, r *http.Request) {
cartIdStr := r.PathValue("cartid")
if cartIdStr == "" {
http.Error(w, "cart id required", http.StatusBadRequest)
return
}
cartId, ok := cart.ParseCartId(cartIdStr)
if !ok {
http.Error(w, "invalid cart id", http.StatusBadRequest)
return
}
// Fetch cart state from cart service
cartGrain, err := s.cartClient.getCartGrain(r.Context(), cartId)
if err != nil {
logger.Error("failed to fetch cart", "error", err, "cartId", cartId)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Serialize cart state to Any
cartStateBytes, err := json.Marshal(cartGrain)
if err != nil {
logger.Error("failed to marshal cart state", "error", err)
http.Error(w, "failed to process cart state", http.StatusInternalServerError)
return
}
// Create checkout with same ID as cart
var checkoutId checkout.CheckoutId = cart.MustNewCartId()
cookie, err := r.Cookie(checkoutCookieName)
if err == nil {
parsed, ok := cart.ParseCartId(cookie.Value)
if ok {
checkoutId = parsed
}
}
// Initialize checkout with cart state wrapped in Any
cartStateAny := &messages.InitializeCheckout{
OrderId: "",
CartId: uint64(cartId),
Version: uint32(cartGrain.Version),
CartState: &anypb.Any{
TypeUrl: "type.googleapis.com/cart.CartGrain",
Value: cartStateBytes,
},
}
result, err := s.ApplyLocal(r.Context(), checkoutId, cartStateAny)
if err != nil {
setCheckoutCookie(w, 0, r.TLS != nil)
logger.Error("failed to initialize checkout", "error", err)
http.Error(w, "failed to initialize checkout", http.StatusInternalServerError)
return
}
// Set checkout cookie
setCheckoutCookie(w, checkoutId, r.TLS != nil)
if err := s.WriteResult(w, result.Result); err != nil {
logger.Error("failed to write result", "error", err)
}
}
func (s *CheckoutPoolServer) WriteResult(w http.ResponseWriter, result any) error {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("X-Pod-Name", s.pod_name)
if result == nil {
w.WriteHeader(http.StatusInternalServerError)
return nil
}
w.WriteHeader(http.StatusOK)
enc := json.NewEncoder(w)
err := enc.Encode(result)
return err
}
func (s *CheckoutPoolServer) CreateOrUpdateCheckout(r *http.Request, grain *checkout.CheckoutGrain, orderId *string) (*CheckoutOrder, error) {
// Get cart state from cart service
// cartGrain, err := s.Get(r.Context(), uint64(id))
// if err != nil {
// return nil, err
// }
meta := GetCheckoutMetaFromRequest(r)
payload, _, err := BuildCheckoutOrderPayload(grain, meta)
if err != nil {
return nil, err
}
// grain, err := s.Get(r.Context(), uint64(id))
// if err != nil {
// return nil, err
// }
var payment *checkout.Payment
if orderId != nil {
payment, _ = grain.FindPayment(*orderId)
}
if payment != nil && payment.PaymentId != "" {
return s.klarnaClient.UpdateOrder(r.Context(), payment.PaymentId, bytes.NewReader(payload))
} else {
return s.klarnaClient.CreateOrder(r.Context(), bytes.NewReader(payload))
}
}
func (s *CheckoutPoolServer) ApplyKlarnaPaymentStarted(ctx context.Context, klarnaOrder *CheckoutOrder, id checkout.CheckoutId) (*actor.MutationResult[*checkout.CheckoutGrain], error) {
method := "checkout"
return s.ApplyLocal(ctx, id, &messages.PaymentStarted{
PaymentId: klarnaOrder.ID,
Amount: int64(klarnaOrder.OrderAmount),
Currency: klarnaOrder.PurchaseCurrency,
Provider: "klarna",
Method: &method,
StartedAt: timestamppb.New(time.Now()),
})
}
// func (s *CheckoutPoolServer) CheckoutHandler(fn func(order *CheckoutOrder, w http.ResponseWriter) error) func(w http.ResponseWriter, r *http.Request) {
// return CheckoutIdHandler(s.ProxyHandler(func(w http.ResponseWriter, r *http.Request, checkoutId checkout.CheckoutId) error {
// orderId := r.URL.Query().Get("order_id")
// if orderId == "" {
// order, err := s.CreateOrUpdateCheckout(r, checkoutId)
// if err != nil {
// logger.Error("unable to create klarna session", "error", err)
// return err
// }
// s.ApplyKlarnaPaymentStarted(r.Context(), order, checkoutId)
// return fn(order, w)
// }
// order, err := s.klarnaClient.GetOrder(r.Context(), orderId)
// if err != nil {
// return err
// }
// return fn(order, w)
// }))
// }
var (
tracer = otel.Tracer(name)
hmacKey = os.Getenv("ADYEN_HMAC")
meter = otel.Meter(name)
logger = otelslog.NewLogger(name)
proxyCalls metric.Int64Counter
)
func init() {
var err error
proxyCalls, err = meter.Int64Counter("proxy.calls",
metric.WithDescription("Number of proxy calls"),
metric.WithUnit("{calls}"))
if err != nil {
panic(err)
}
}
func (s *CheckoutPoolServer) applyAnywhere(ctx context.Context, id uint64, msgs ...proto.Message) error {
host, found := s.OwnerHost(id)
if !found {
_, err := s.Apply(ctx, id, msgs...)
return err
}
_, err := host.Apply(ctx, id, msgs...)
return err
}
func (s *CheckoutPoolServer) getAnywhere(ctx context.Context, id uint64) (*checkout.CheckoutGrain, error) {
host, found := s.OwnerHost(id)
if !found {
grain, err := s.Get(ctx, id)
return grain, err
}
ret := &checkout.CheckoutGrain{}
err := host.Get(ctx, id, ret)
if err != nil {
return nil, err
}
return ret, nil
}
type StartPayment struct {
Provider string `json:"provider"`
Method string `json:"method,omitempty"`
}
func (s *CheckoutPoolServer) GetPaymentSessionHandler(w http.ResponseWriter, r *http.Request, checkoutId checkout.CheckoutId) error {
paymentId := r.PathValue("id")
grain, err := s.Get(r.Context(), uint64(checkoutId))
if err != nil {
return err
}
payment, ok := grain.FindPayment(paymentId)
if !ok {
http.Error(w, "payment not found", http.StatusNotFound)
return nil
}
switch payment.Provider {
case "adyen":
// For Adyen, recreate the session with updated cart data
// The frontend uses this to reinitialize the Drop-in component
meta := GetCheckoutMetaFromRequest(r)
sessionData, err := BuildAdyenCheckoutSession(grain, meta)
if err != nil {
logger.Error("unable to build adyen session", "error", err)
return err
}
service := s.adyenClient.Checkout()
req := service.PaymentsApi.SessionsInput().CreateCheckoutSessionRequest(*sessionData)
session, _, err := service.PaymentsApi.Sessions(r.Context(), req)
if err != nil {
logger.Error("unable to create adyen session", "error", err)
return err
}
return s.WriteResult(w, session)
case "klarna":
order, err := s.CreateOrUpdateCheckout(r, grain, &payment.PaymentId)
if err != nil {
return err
}
return s.WriteResult(w, order)
}
return errors.New("unsupported payment provider")
}
func (s *CheckoutPoolServer) StartPaymentHandler(w http.ResponseWriter, r *http.Request, checkoutId checkout.CheckoutId) error {
grain, err := s.Get(r.Context(), uint64(checkoutId))
if err != nil {
return err
}
payload := &StartPayment{}
if err := json.NewDecoder(r.Body).Decode(payload); err != nil {
return err
}
switch payload.Provider {
case "adyen":
meta := GetCheckoutMetaFromRequest(r)
sessionData, err := BuildAdyenCheckoutSession(grain, meta)
if err != nil {
logger.Error("unable to build adyen session", "error", err)
return err
}
service := s.adyenClient.Checkout()
req := service.PaymentsApi.SessionsInput().CreateCheckoutSessionRequest(*sessionData)
session, _, err := service.PaymentsApi.Sessions(r.Context(), req)
if err != nil {
logger.Error("unable to create adyen session", "error", err)
return err
}
// Apply PaymentStarted mutation
res, err := s.ApplyLocal(r.Context(), checkoutId, &messages.PaymentStarted{
PaymentId: session.Id,
Amount: session.Amount.Value,
Currency: session.Amount.Currency,
Provider: "adyen",
Method: &payload.Method,
StartedAt: timestamppb.New(time.Now()),
})
if err != nil {
logger.Error("unable to apply payment started mutation", "error", err)
return err
}
return s.WriteResult(w, res)
case "klarna":
order, err := s.CreateOrUpdateCheckout(r, grain, nil)
if err != nil {
logger.Error("unable to create klarna session", "error", err)
return err
}
res, err := s.ApplyLocal(r.Context(), checkoutId, &messages.PaymentStarted{
PaymentId: order.ID,
Amount: int64(order.OrderAmount),
Currency: order.PurchaseCurrency,
Provider: "klarna",
Method: &payload.Method,
StartedAt: timestamppb.New(time.Now()),
})
return s.WriteResult(w, res)
default:
http.Error(w, "unsupported payment provider", http.StatusBadRequest)
return nil
}
}
func (s *CheckoutPoolServer) Serve(mux *http.ServeMux) {
handleFunc := func(pattern string, handlerFunc func(http.ResponseWriter, *http.Request)) {
attr := attribute.String("http.route", pattern)
mux.HandleFunc(pattern, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
span := trace.SpanFromContext(r.Context())
span.SetName(pattern)
span.SetAttributes(attr)
labeler, _ := otelhttp.LabelerFromContext(r.Context())
labeler.Add(attr)
handlerFunc(w, r)
}))
}
//handleFunc("/payment/adyen/session", CookieCheckoutIdHandler(s.AdyenSessionHandler))
handleFunc("/payment/adyen/push", s.AdyenHookHandler)
handleFunc("/payment/adyen/return", s.AdyenReturnHandler)
//handleFunc("/payment/adyen/cancel", s.AdyenCancelHandler)
handleFunc("/payment/klarna/validate", s.KlarnaValidationHandler)
handleFunc("/payment/klarna/push", s.KlarnaPushHandler)
handleFunc("/payment/klarna/notification", s.KlarnaNotificationHandler)
conn, err := amqp.Dial(amqpUrl)
if err != nil {
log.Fatalf("failed to connect to RabbitMQ: %v", err)
}
orderHandler := NewAmqpOrderHandler(conn)
orderHandler.DefineQueue()
handleFunc("POST /api/checkout/start/{cartid}", s.StartCheckoutHandler)
handleFunc("GET /api/checkout", CookieCheckoutIdHandler(s.ProxyHandler(s.GetCheckoutHandler)))
handleFunc("POST /api/checkout/delivery", CookieCheckoutIdHandler(s.ProxyHandler(s.SetDeliveryHandler)))
handleFunc("DELETE /api/checkout/delivery/{id}", CookieCheckoutIdHandler(s.ProxyHandler(s.RemoveDeliveryHandler)))
handleFunc("POST /api/checkout/pickup-point", CookieCheckoutIdHandler(s.ProxyHandler(s.SetPickupPointHandler)))
handleFunc("POST /api/checkout/contact-details", CookieCheckoutIdHandler(s.ProxyHandler(s.ContactDetailsUpdatedHandler)))
handleFunc("POST /payment", CookieCheckoutIdHandler(s.ProxyHandler(s.StartPaymentHandler)))
handleFunc("GET /payment/{id}/session", CookieCheckoutIdHandler(s.ProxyHandler(s.GetPaymentSessionHandler)))
// handleFunc("POST /api/checkout/initialize", CookieCheckoutIdHandler(s.ProxyHandler(s.InitializeCheckoutHandler)))
// handleFunc("POST /api/checkout/inventory-reserved", CookieCheckoutIdHandler(s.ProxyHandler(s.InventoryReservedHandler)))
// handleFunc("POST /api/checkout/order-created", CookieCheckoutIdHandler(s.ProxyHandler(s.OrderCreatedHandler)))
// handleFunc("POST /api/checkout/confirmation-viewed", CookieCheckoutIdHandler(s.ProxyHandler(s.ConfirmationViewedHandler)))
//handleFunc("GET /payment/klarna/session", CookieCheckoutIdHandler(s.ProxyHandler(s.KlarnaSessionHandler)))
//handleFunc("GET /payment/klarna/checkout", CookieCheckoutIdHandler(s.ProxyHandler(s.KlarnaHtmlCheckoutHandler)))
handleFunc("GET /payment/klarna/confirmation/{order_id}", s.KlarnaConfirmationHandler)
}