From 44d7c1faadca06a6665f0afe40e95786d207a452 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mats=20T=C3=B6rnberg?= Date: Mon, 10 Nov 2025 19:47:15 +0100 Subject: [PATCH] add listener and remove memory inventory service --- cmd/inventory/main.go | 15 ++++++ pkg/inventory/listener.go | 85 +++++++++++++++++++++++++++++++++ pkg/inventory/memory_service.go | 47 ------------------ 3 files changed, 100 insertions(+), 47 deletions(-) create mode 100644 pkg/inventory/listener.go delete mode 100644 pkg/inventory/memory_service.go diff --git a/cmd/inventory/main.go b/cmd/inventory/main.go index c8b4a8e..fa07589 100644 --- a/cmd/inventory/main.go +++ b/cmd/inventory/main.go @@ -3,6 +3,7 @@ package main import ( "context" "log" + "time" "git.tornberg.me/go-cart-actor/pkg/inventory" "github.com/redis/go-redis/v9" @@ -24,6 +25,18 @@ func main() { log.Fatalf("Unable to connect to inventory redis: %v", err) return } + + // Start inventory change listener + listener := inventory.NewInventoryChangeListener(rdb, ctx, func(changes []inventory.InventoryChange) { + for _, change := range changes { + log.Printf("Inventory change detected: SKU %s at location %s now has %d", change.SKU, change.StockLocationID, change.Value) + } + }) + log.Println("Starting inventory change listener...") + go listener.Start() + listener.WaitReady() + log.Println("Listener is ready, proceeding with inventory operations...") + rdb.Pipelined(ctx, func(p redis.Pipeliner) error { s.UpdateInventory(p, "1", "1", 10) s.UpdateInventory(p, "2", "2", 20) @@ -51,4 +64,6 @@ func main() { } log.Printf("Inventory after reservation: %v", v) + // Wait a bit for listener to process messages + time.Sleep(2 * time.Second) } diff --git a/pkg/inventory/listener.go b/pkg/inventory/listener.go new file mode 100644 index 0000000..41b32d2 --- /dev/null +++ b/pkg/inventory/listener.go @@ -0,0 +1,85 @@ +package inventory + +import ( + "context" + "encoding/json" + "fmt" + "log" + "strings" + + "github.com/redis/go-redis/v9" +) + +type InventoryChange struct { + SKU string `json:"sku"` + StockLocationID string `json:"stock_location_id"` + Value int64 `json:"value"` +} + +type InventoryChangeHandler func(changes []InventoryChange) + +type InventoryChangeListener struct { + client *redis.Client + ctx context.Context + handler InventoryChangeHandler + ready chan struct{} +} + +func NewInventoryChangeListener(client *redis.Client, ctx context.Context, handler InventoryChangeHandler) *InventoryChangeListener { + return &InventoryChangeListener{ + client: client, + ctx: ctx, + handler: handler, + ready: make(chan struct{}), + } +} + +func (l *InventoryChangeListener) WaitReady() { + <-l.ready +} + +func (l *InventoryChangeListener) Start() error { + pubsub := l.client.Subscribe(l.ctx, "inventory_changed") + defer pubsub.Close() + + // Signal that we're ready to receive messages + close(l.ready) + + ch := pubsub.Channel() + + for msg := range ch { + var payload map[string]int64 + if err := json.Unmarshal([]byte(msg.Payload), &payload); err != nil { + log.Printf("Failed to unmarshal inventory change message: %v", err) + continue + } + + changes := make([]InventoryChange, 0, len(payload)) + for key, newValue := range payload { + // Parse key: "inventory:sku:location" + parts := strings.Split(key, ":") + if len(parts) != 3 || parts[0] != "inventory" { + log.Printf("Invalid inventory key format: %s", key) + continue + } + sku := parts[1] + locationID := parts[2] + changes = append(changes, InventoryChange{ + SKU: sku, + StockLocationID: locationID, + Value: newValue, + }) + } + + if l.handler != nil { + l.handler(changes) + } else { + // Default logging + for _, change := range changes { + fmt.Printf("Inventory changed: SKU %s at location %s -> %d\n", change.SKU, change.StockLocationID, change.Value) + } + } + } + + return nil +} diff --git a/pkg/inventory/memory_service.go b/pkg/inventory/memory_service.go deleted file mode 100644 index d3f6a70..0000000 --- a/pkg/inventory/memory_service.go +++ /dev/null @@ -1,47 +0,0 @@ -package inventory - -import ( - "errors" - "sync" -) - -type MemoryInventoryService struct { - warehouses map[LocationID]*Warehouse - mu sync.RWMutex -} - -func NewMemoryInventoryService() *MemoryInventoryService { - return &MemoryInventoryService{ - warehouses: make(map[LocationID]*Warehouse), - } -} - -func (s *MemoryInventoryService) AddWarehouse(warehouse *Warehouse) { - s.mu.Lock() - defer s.mu.Unlock() - s.warehouses[warehouse.ID] = warehouse -} - -func (s *MemoryInventoryService) GetInventory(sku SKU, locationID LocationID) (*InventoryItem, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - warehouse, ok := s.warehouses[locationID] - if !ok { - return nil, errors.New("warehouse not found") - } - - for _, item := range warehouse.Inventory { - if item.SKU == sku { - return &item, nil - } - } - return nil, errors.New("sku not found in warehouse") -} - -func (s *MemoryInventoryService) ReserveInventory(req ...ReserveRequest) error { - // We'll implement the reservation logic using Lua script execution here - - // For now, let's just return an error indicating it's not implemented - return errors.New("reservation not implemented yet") -}