add listener and remove memory inventory service
This commit is contained in:
@@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"log"
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.tornberg.me/go-cart-actor/pkg/inventory"
|
"git.tornberg.me/go-cart-actor/pkg/inventory"
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
@@ -24,6 +25,18 @@ func main() {
|
|||||||
log.Fatalf("Unable to connect to inventory redis: %v", err)
|
log.Fatalf("Unable to connect to inventory redis: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start inventory change listener
|
||||||
|
listener := inventory.NewInventoryChangeListener(rdb, ctx, func(changes []inventory.InventoryChange) {
|
||||||
|
for _, change := range changes {
|
||||||
|
log.Printf("Inventory change detected: SKU %s at location %s now has %d", change.SKU, change.StockLocationID, change.Value)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
log.Println("Starting inventory change listener...")
|
||||||
|
go listener.Start()
|
||||||
|
listener.WaitReady()
|
||||||
|
log.Println("Listener is ready, proceeding with inventory operations...")
|
||||||
|
|
||||||
rdb.Pipelined(ctx, func(p redis.Pipeliner) error {
|
rdb.Pipelined(ctx, func(p redis.Pipeliner) error {
|
||||||
s.UpdateInventory(p, "1", "1", 10)
|
s.UpdateInventory(p, "1", "1", 10)
|
||||||
s.UpdateInventory(p, "2", "2", 20)
|
s.UpdateInventory(p, "2", "2", 20)
|
||||||
@@ -51,4 +64,6 @@ func main() {
|
|||||||
}
|
}
|
||||||
log.Printf("Inventory after reservation: %v", v)
|
log.Printf("Inventory after reservation: %v", v)
|
||||||
|
|
||||||
|
// Wait a bit for listener to process messages
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
}
|
}
|
||||||
|
|||||||
85
pkg/inventory/listener.go
Normal file
85
pkg/inventory/listener.go
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
package inventory
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
type InventoryChange struct {
|
||||||
|
SKU string `json:"sku"`
|
||||||
|
StockLocationID string `json:"stock_location_id"`
|
||||||
|
Value int64 `json:"value"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type InventoryChangeHandler func(changes []InventoryChange)
|
||||||
|
|
||||||
|
type InventoryChangeListener struct {
|
||||||
|
client *redis.Client
|
||||||
|
ctx context.Context
|
||||||
|
handler InventoryChangeHandler
|
||||||
|
ready chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewInventoryChangeListener(client *redis.Client, ctx context.Context, handler InventoryChangeHandler) *InventoryChangeListener {
|
||||||
|
return &InventoryChangeListener{
|
||||||
|
client: client,
|
||||||
|
ctx: ctx,
|
||||||
|
handler: handler,
|
||||||
|
ready: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *InventoryChangeListener) WaitReady() {
|
||||||
|
<-l.ready
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *InventoryChangeListener) Start() error {
|
||||||
|
pubsub := l.client.Subscribe(l.ctx, "inventory_changed")
|
||||||
|
defer pubsub.Close()
|
||||||
|
|
||||||
|
// Signal that we're ready to receive messages
|
||||||
|
close(l.ready)
|
||||||
|
|
||||||
|
ch := pubsub.Channel()
|
||||||
|
|
||||||
|
for msg := range ch {
|
||||||
|
var payload map[string]int64
|
||||||
|
if err := json.Unmarshal([]byte(msg.Payload), &payload); err != nil {
|
||||||
|
log.Printf("Failed to unmarshal inventory change message: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
changes := make([]InventoryChange, 0, len(payload))
|
||||||
|
for key, newValue := range payload {
|
||||||
|
// Parse key: "inventory:sku:location"
|
||||||
|
parts := strings.Split(key, ":")
|
||||||
|
if len(parts) != 3 || parts[0] != "inventory" {
|
||||||
|
log.Printf("Invalid inventory key format: %s", key)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sku := parts[1]
|
||||||
|
locationID := parts[2]
|
||||||
|
changes = append(changes, InventoryChange{
|
||||||
|
SKU: sku,
|
||||||
|
StockLocationID: locationID,
|
||||||
|
Value: newValue,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if l.handler != nil {
|
||||||
|
l.handler(changes)
|
||||||
|
} else {
|
||||||
|
// Default logging
|
||||||
|
for _, change := range changes {
|
||||||
|
fmt.Printf("Inventory changed: SKU %s at location %s -> %d\n", change.SKU, change.StockLocationID, change.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -1,47 +0,0 @@
|
|||||||
package inventory
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type MemoryInventoryService struct {
|
|
||||||
warehouses map[LocationID]*Warehouse
|
|
||||||
mu sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMemoryInventoryService() *MemoryInventoryService {
|
|
||||||
return &MemoryInventoryService{
|
|
||||||
warehouses: make(map[LocationID]*Warehouse),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *MemoryInventoryService) AddWarehouse(warehouse *Warehouse) {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
s.warehouses[warehouse.ID] = warehouse
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *MemoryInventoryService) GetInventory(sku SKU, locationID LocationID) (*InventoryItem, error) {
|
|
||||||
s.mu.RLock()
|
|
||||||
defer s.mu.RUnlock()
|
|
||||||
|
|
||||||
warehouse, ok := s.warehouses[locationID]
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("warehouse not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, item := range warehouse.Inventory {
|
|
||||||
if item.SKU == sku {
|
|
||||||
return &item, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil, errors.New("sku not found in warehouse")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *MemoryInventoryService) ReserveInventory(req ...ReserveRequest) error {
|
|
||||||
// We'll implement the reservation logic using Lua script execution here
|
|
||||||
|
|
||||||
// For now, let's just return an error indicating it's not implemented
|
|
||||||
return errors.New("reservation not implemented yet")
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user