From b0907aee41c822217e4f705dc8e8f250da10011c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mats=20T=C3=B6rnberg?= Date: Tue, 25 Nov 2025 20:27:04 +0100 Subject: [PATCH] update --- cmd/cart/pool-server.go | 16 ++++++++++++++-- pkg/cart/cart-grain.go | 18 ++++++++++++++---- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/cmd/cart/pool-server.go b/cmd/cart/pool-server.go index 8f9afb3..36bc93a 100644 --- a/cmd/cart/pool-server.go +++ b/cmd/cart/pool-server.go @@ -19,6 +19,7 @@ import ( "github.com/matst80/go-redis-inventory/pkg/inventory" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/redis/go-redis/v9" "go.opentelemetry.io/contrib/bridges/otelslog" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "google.golang.org/protobuf/types/known/anypb" @@ -47,13 +48,24 @@ type PoolServer struct { inventoryService inventory.InventoryService } -func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient, inventoryService inventory.InventoryService) *PoolServer { - return &PoolServer{ +func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient, inventoryService inventory.InventoryService, inventoryRedisClient *redis.Client) *PoolServer { + srv := &PoolServer{ GrainPool: pool, pod_name: pod_name, klarnaClient: klarnaClient, inventoryService: inventoryService, } + inventory.NewInventoryChangeListener(inventoryRedisClient, func(ctx context.Context, sku inventory.SKU, locationID inventory.LocationID) { + qty, err := inventoryService.GetInventory(ctx, sku, locationID) + if err != nil { + log.Printf("error fetching inventory for sku %s at location %s: %v", sku, locationID, err) + } + srv.GrainPool.GetPubSub().Publish(actor.Event{ + Topic: fmt.Sprintf("inventory:%s:%s", sku, locationID), + Payload: qty, + }) + }) + return srv } func (s *PoolServer) ApplyLocal(ctx context.Context, id cart.CartId, mutation ...proto.Message) (*actor.MutationResult[*cart.CartGrain], error) { diff --git a/pkg/cart/cart-grain.go b/pkg/cart/cart-grain.go index 050cf57..7f3d96e 100644 --- a/pkg/cart/cart-grain.go +++ b/pkg/cart/cart-grain.go @@ -2,6 +2,7 @@ package cart import ( "encoding/json" + "log" "slices" "strings" "sync" @@ -234,12 +235,21 @@ func (c *CartGrain) Notify(event actor.Event) { defer c.mu.Unlock() // Example: if event is inventory change for a SKU in the cart if strings.HasPrefix(event.Topic, "inventory:") { - sku := strings.TrimPrefix(event.Topic, "inventory:") + skuAndLocationId := strings.TrimPrefix(event.Topic, "inventory:") + parts := strings.Split(skuAndLocationId, ":") + if len(parts) != 2 { + return + } + sku := parts[0] + locationId := parts[1] + log.Printf("cart grain got inventory update: %s", event.Topic) for _, item := range c.Items { - if item.Sku == sku { + if item.Sku == sku && item.StoreId != nil && *item.StoreId == locationId { // Update stock status based on payload, e.g., if payload is bool available - if available, ok := event.Payload.(bool); ok { - if available { + + if quantity, ok := event.Payload.(int64); ok { + log.Printf("cart grain got item stock update %d", quantity) + if quantity > 0 { item.Stock = 1 // assuming 1 is in stock } else { item.Stock = 0 // out of stock