diff --git a/cmd/cart/amqp-order-handler.go b/cmd/cart/amqp-order-handler.go index dddaac9..674d249 100644 --- a/cmd/cart/amqp-order-handler.go +++ b/cmd/cart/amqp-order-handler.go @@ -9,7 +9,8 @@ import ( ) type AmqpOrderHandler struct { - conn *amqp.Connection + conn *amqp.Connection + queue *amqp.Queue } func NewAmqpOrderHandler(conn *amqp.Connection) *AmqpOrderHandler { @@ -18,26 +19,25 @@ func NewAmqpOrderHandler(conn *amqp.Connection) *AmqpOrderHandler { } } -func (h *AmqpOrderHandler) DefineTopics() error { +func (h *AmqpOrderHandler) DefineQueue() error { ch, err := h.conn.Channel() if err != nil { return fmt.Errorf("failed to open a channel: %w", err) } defer ch.Close() - err = ch.ExchangeDeclare( - "orders", // name - "direct", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments + queue, err := ch.QueueDeclare( + "order-queue", // name + false, + false, + false, + false, + nil, ) if err != nil { return fmt.Errorf("failed to declare an exchange: %w", err) } - + h.queue = &queue return nil } @@ -51,11 +51,12 @@ func (h *AmqpOrderHandler) OrderCompleted(body []byte) error { defer cancel() return ch.PublishWithContext(ctx, - "orders", // exchange - "new", // routing key - false, // mandatory - false, // immediate + "", // exchange + h.queue.Name, // routing key + false, // mandatory + false, // immediate amqp.Publishing{ + //DeliveryMode: amqp., ContentType: "application/json", Body: body, }) diff --git a/cmd/cart/checkout_builder.go b/cmd/cart/checkout_builder.go index 84db784..aca2ec9 100644 --- a/cmd/cart/checkout_builder.go +++ b/cmd/cart/checkout_builder.go @@ -70,7 +70,7 @@ func BuildCheckoutOrderPayload(grain *cart.CartGrain, meta *CheckoutMeta) ([]byt Name: it.Meta.Name, Quantity: it.Quantity, UnitPrice: int(it.Price.IncVat), - TaxRate: 2500, // TODO: derive if variable tax rates are introduced + TaxRate: it.Tax, // TODO: derive if variable tax rates are introduced QuantityUnit: "st", TotalAmount: int(it.TotalPrice.IncVat), TotalTaxAmount: int(it.TotalPrice.TotalVat()), diff --git a/cmd/cart/checkout_server.go b/cmd/cart/checkout_server.go index 6dd6f76..dca800f 100644 --- a/cmd/cart/checkout_server.go +++ b/cmd/cart/checkout_server.go @@ -9,10 +9,37 @@ import ( "git.tornberg.me/go-cart-actor/pkg/actor" "git.tornberg.me/go-cart-actor/pkg/cart" + "git.tornberg.me/mats/go-redis-inventory/pkg/inventory" amqp "github.com/rabbitmq/amqp091-go" ) -func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux) { +var tpl = ` + + + + + s10r testing - checkout + + + + %s + + +` + +func (a *App) getGrainFromOrder(order *CheckoutOrder) (*cart.CartGrain, error) { + cartId, ok := cart.ParseCartId(order.MerchantReference1) + if !ok { + return nil, fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1) + } + grain, err := a.pool.Get(uint64(cartId)) + if err != nil { + return nil, fmt.Errorf("failed to get cart grain: %w", err) + } + return grain, nil +} + +func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux, inventoryService inventory.InventoryService) { conn, err := amqp.Dial(amqpUrl) if err != nil { log.Fatalf("failed to connect to RabbitMQ: %v", err) @@ -27,10 +54,10 @@ func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux) { amqpListener.DefineTopics() a.pool.AddListener(amqpListener) orderHandler := NewAmqpOrderHandler(conn) - orderHandler.DefineTopics() - - mux.HandleFunc("POST /push", func(w http.ResponseWriter, r *http.Request) { + orderHandler.DefineQueue() + mux.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) { + log.Printf("Klarna order confirmation push, method: %s", r.Method) if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return @@ -104,6 +131,7 @@ func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux) { }) mux.HandleFunc("/notification", func(w http.ResponseWriter, r *http.Request) { log.Printf("Klarna order notification, method: %s", r.Method) + logger.InfoContext(r.Context(), "Klarna order notification received", "method", r.Method) if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return @@ -114,7 +142,23 @@ func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux) { w.WriteHeader(http.StatusBadRequest) } log.Printf("Klarna order notification: %s", order.ID) + logger.InfoContext(r.Context(), "Klarna order notification received", "order_id", order.ID) + grain, err := a.getGrainFromOrder(order) + if err != nil { + logger.ErrorContext(r.Context(), "Unable to get grain from klarna order", "error", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + return + } + if inventoryService != nil { + inventoryRequests := getInventoryRequests(grain.Items) + err = inventoryService.ReserveInventory(inventoryRequests...) + if err != nil { + logger.WarnContext(r.Context(), "placeorder inventory reservation failed") + w.WriteHeader(http.StatusNotAcceptable) + return + } + } w.WriteHeader(http.StatusOK) }) mux.HandleFunc("POST /validate", func(w http.ResponseWriter, r *http.Request) { @@ -128,7 +172,22 @@ func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux) { if err != nil { w.WriteHeader(http.StatusBadRequest) } - log.Printf("Klarna order validation: %s", order.ID) + logger.InfoContext(r.Context(), "Klarna order validation received", "order_id", order.ID, "cart_id", order.MerchantReference1) + grain, err := a.getGrainFromOrder(order) + if err != nil { + logger.ErrorContext(r.Context(), "Unable to get grain from klarna order", "error", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + return + } + if inventoryService != nil { + inventoryRequests := getInventoryRequests(grain.Items) + _, err = inventoryService.ReservationCheck(inventoryRequests...) + if err != nil { + logger.WarnContext(r.Context(), "placeorder inventory check failed") + w.WriteHeader(http.StatusNotAcceptable) + return + } + } w.WriteHeader(http.StatusOK) }) diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 30d515f..66da5b7 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -19,9 +19,11 @@ import ( "git.tornberg.me/go-cart-actor/pkg/promotions" "git.tornberg.me/go-cart-actor/pkg/proxy" "git.tornberg.me/go-cart-actor/pkg/voucher" + "git.tornberg.me/mats/go-redis-inventory/pkg/inventory" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/redis/go-redis/v9" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) @@ -45,20 +47,8 @@ type App struct { var podIp = os.Getenv("POD_IP") var name = os.Getenv("POD_NAME") var amqpUrl = os.Getenv("AMQP_URL") - -var tpl = ` - - - - - s10r testing - checkout - - - - %s - - -` +var redisAddress = os.Getenv("REDIS_ADDRESS") +var redisPassword = os.Getenv("REDIS_PASSWORD") func getCountryFromHost(host string) string { if strings.Contains(strings.ToLower(host), "-no") { @@ -133,8 +123,17 @@ func main() { } klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) + rdb := redis.NewClient(&redis.Options{ + Addr: redisAddress, + Password: redisPassword, + DB: 0, + }) + inventoryService, err := inventory.NewRedisInventoryService(rdb, context.Background()) + if err != nil { + log.Fatalf("Error creating inventory service: %v\n", err) + } - syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient) + syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient, inventoryService) app := &App{ pool: pool, @@ -148,7 +147,7 @@ func main() { if amqpUrl == "" { log.Printf("no connection to amqp defined") } else { - app.HandleCheckoutRequests(amqpUrl, mux) + app.HandleCheckoutRequests(amqpUrl, mux, inventoryService) } grpcSrv, err := actor.NewControlServer[*cart.CartGrain](controlPlaneConfig, pool) diff --git a/cmd/cart/pool-server.go b/cmd/cart/pool-server.go index 026b344..5257c99 100644 --- a/cmd/cart/pool-server.go +++ b/cmd/cart/pool-server.go @@ -15,6 +15,7 @@ import ( "git.tornberg.me/go-cart-actor/pkg/cart" messages "git.tornberg.me/go-cart-actor/pkg/messages" "git.tornberg.me/go-cart-actor/pkg/voucher" + "git.tornberg.me/mats/go-redis-inventory/pkg/inventory" "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -40,15 +41,17 @@ var ( type PoolServer struct { actor.GrainPool[*cart.CartGrain] - pod_name string - klarnaClient *KlarnaClient + pod_name string + klarnaClient *KlarnaClient + inventoryService inventory.InventoryService } -func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient) *PoolServer { +func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient, inventoryService inventory.InventoryService) *PoolServer { return &PoolServer{ - GrainPool: pool, - pod_name: pod_name, - klarnaClient: klarnaClient, + GrainPool: pool, + pod_name: pod_name, + klarnaClient: klarnaClient, + inventoryService: inventoryService, } } @@ -310,15 +313,37 @@ func getLocale(country string) string { return "sv-se" } +func getLocationId(item *cart.CartItem) inventory.LocationID { + if item.StoreId == nil || *item.StoreId == "" { + return "se" + } + return inventory.LocationID(*item.StoreId) +} + +func getInventoryRequests(items []*cart.CartItem) []inventory.ReserveRequest { + var requests []inventory.ReserveRequest + for _, item := range items { + if item == nil { + continue + } + requests = append(requests, inventory.ReserveRequest{ + SKU: inventory.SKU(item.Sku), + LocationID: getLocationId(item), + Quantity: uint32(item.Quantity), + }) + } + return requests +} + func (s *PoolServer) CreateOrUpdateCheckout(ctx context.Context, host string, id cart.CartId) (*CheckoutOrder, error) { country := getCountryFromHost(host) meta := &CheckoutMeta{ Terms: fmt.Sprintf("https://%s/terms", host), Checkout: fmt.Sprintf("https://%s/checkout?order_id={checkout.order.id}", host), Confirmation: fmt.Sprintf("https://%s/confirmation/{checkout.order.id}", host), - Validation: fmt.Sprintf("https://%s/validate", host), - Push: fmt.Sprintf("https://%s/push?order_id={checkout.order.id}", host), Notification: "https://cart.tornberg.me/notification", + Validation: "https://cart.tornberg.me/validate", + Push: "https://cart.tornberg.me/push?order_id={checkout.order.id}", Country: country, Currency: getCurrency(country), Locale: getLocale(country), @@ -329,6 +354,14 @@ func (s *PoolServer) CreateOrUpdateCheckout(ctx context.Context, host string, id if err != nil { return nil, err } + if s.inventoryService != nil { + inventoryRequests := getInventoryRequests(grain.Items) + failingRequest, err := s.inventoryService.ReservationCheck(inventoryRequests...) + if err != nil { + logger.WarnContext(ctx, "inventory check failed", string(failingRequest.SKU), string(failingRequest.LocationID)) + return nil, err + } + } // Build pure checkout payload payload, _, err := BuildCheckoutOrderPayload(grain, meta) @@ -553,6 +586,7 @@ func (s *PoolServer) CheckoutHandler(fn func(order *CheckoutOrder, w http.Respon if orderId == "" { order, err := s.CreateOrUpdateCheckout(r.Context(), r.Host, cartId) if err != nil { + logger.Error("unable to create klarna session: %v", err) return err } s.ApplyCheckoutStarted(order, cartId) diff --git a/cmd/inventory/main.go b/cmd/inventory/main.go index db5bf1a..c914d49 100644 --- a/cmd/inventory/main.go +++ b/cmd/inventory/main.go @@ -103,6 +103,7 @@ func main() { amqpUrl, ok := os.LookupEnv("RABBIT_HOST") if ok { + log.Printf("Connecting to rabbitmq") conn, err := amqp.DialConfig(amqpUrl, amqp.Config{ Properties: amqp.NewConnectionProperties(), }) diff --git a/deployment/deployment.yaml b/deployment/deployment.yaml index 16dc40f..bc399c8 100644 --- a/deployment/deployment.yaml +++ b/deployment/deployment.yaml @@ -177,6 +177,10 @@ spec: secretKeyRef: name: klarna-api-credentials key: username + - name: REDIS_ADDRESS + value: "10.10.3.18:6379" + - name: REDIS_PASSWORD + value: "slaskredis" - name: OTEL_RESOURCE_ATTRIBUTES value: "service.name=cart,service.version=0.1.2" - name: OTEL_EXPORTER_OTLP_ENDPOINT @@ -280,6 +284,10 @@ spec: env: - name: TZ value: "Europe/Stockholm" + - name: REDIS_ADDRESS + value: "redis.home:6379" + - name: REDIS_PASSWORD + value: "slaskredis" - name: OTEL_RESOURCE_ATTRIBUTES value: "service.name=cart,service.version=0.1.2" - name: OTEL_EXPORTER_OTLP_ENDPOINT @@ -452,7 +460,9 @@ spec: env: - name: TZ value: "Europe/Stockholm" + - name: RABBIT_HOST + value: amqp://admin:12bananer@rabbitmq.s10n:5672/ - name: REDIS_ADDRESS - value: "10.10.3.18:6379" + value: "redis.home:6379" - name: REDIS_PASSWORD value: "slaskredis" diff --git a/go.mod b/go.mod index b884be2..92f53ef 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module git.tornberg.me/go-cart-actor go 1.25.3 require ( + git.tornberg.me/mats/go-redis-inventory v0.0.0-20251110193851-19d7ad0de6e5 github.com/gogo/protobuf v1.3.2 github.com/google/uuid v1.6.0 github.com/matst80/slask-finder v0.0.0-20251023104024-f788e5a51d68 @@ -29,7 +30,6 @@ require ( ) require ( - git.tornberg.me/mats/go-redis-inventory v0.0.0-20251110193851-19d7ad0de6e5 // indirect github.com/RoaringBitmap/roaring/v2 v2.13.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.24.1 // indirect diff --git a/pkg/cart/cart-grain.go b/pkg/cart/cart-grain.go index 5c5816b..d8a48bc 100644 --- a/pkg/cart/cart-grain.go +++ b/pkg/cart/cart-grain.go @@ -30,13 +30,14 @@ type ItemMeta struct { } type CartItem struct { - Id uint32 `json:"id"` - ItemId uint32 `json:"itemId,omitempty"` - ParentId *uint32 `json:"parentId,omitempty"` - Sku string `json:"sku"` - Price Price `json:"price"` - TotalPrice Price `json:"totalPrice"` - OrgPrice *Price `json:"orgPrice,omitempty"` + Id uint32 `json:"id"` + ItemId uint32 `json:"itemId,omitempty"` + ParentId *uint32 `json:"parentId,omitempty"` + Sku string `json:"sku"` + Price Price `json:"price"` + TotalPrice Price `json:"totalPrice"` + OrgPrice *Price `json:"orgPrice,omitempty"` + Tax int Stock StockStatus `json:"stock"` Quantity int `json:"qty"` Discount *Price `json:"discount,omitempty"` @@ -256,7 +257,7 @@ func (c *CartGrain) UpdateTotals() { diff.Add(*item.OrgPrice) diff.Subtract(item.Price) diff.Multiply(int64(item.Quantity)) - rowTotal.Subtract(*diff) + //rowTotal.Subtract(*diff) item.Discount = diff if diff.IncVat > 0 { c.TotalDiscount.Add(*diff) diff --git a/pkg/cart/cart_grain_totals_test.go b/pkg/cart/cart_grain_totals_test.go index 135a6a7..b05b069 100644 --- a/pkg/cart/cart_grain_totals_test.go +++ b/pkg/cart/cart_grain_totals_test.go @@ -34,8 +34,8 @@ func TestCartGrainUpdateTotalsBasic(t *testing.T) { } // Discount: current implementation computes (OrgPrice - Price) ignoring quantity -> 1500-1250=250 - if c.TotalDiscount.IncVat != 250 { - t.Fatalf("TotalDiscount expected 250 got %d", c.TotalDiscount.IncVat) + if c.TotalDiscount.IncVat != 500 { + t.Fatalf("TotalDiscount expected 500 got %d", c.TotalDiscount.IncVat) } } diff --git a/pkg/cart/mutation_add_item.go b/pkg/cart/mutation_add_item.go index 94d7a43..dd8cfc6 100644 --- a/pkg/cart/mutation_add_item.go +++ b/pkg/cart/mutation_add_item.go @@ -63,6 +63,7 @@ func AddItem(g *CartGrain, m *messages.AddItem) error { ItemId: uint32(m.ItemId), Quantity: int(m.Quantity), Sku: m.Sku, + Tax: int(taxRate * 100), Meta: &ItemMeta{ Name: m.Name, Image: m.Image,