feature/pubsub #7

Merged
mats merged 67 commits from feature/pubsub into main 2025-11-28 17:45:22 +01:00
2 changed files with 7 additions and 9 deletions
Showing only changes of commit e7e572b0f7 - Show all commits

View File

@@ -145,7 +145,7 @@ func main() {
log.Fatalf("Error creating inventory service: %v\n", err) log.Fatalf("Error creating inventory service: %v\n", err)
} }
syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient, inventoryService) syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient, inventoryService, rdb)
app := &App{ app := &App{
pool: pool, pool: pool,

View File

@@ -55,15 +55,13 @@ func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarn
klarnaClient: klarnaClient, klarnaClient: klarnaClient,
inventoryService: inventoryService, inventoryService: inventoryService,
} }
inventory.NewInventoryChangeListener(inventoryRedisClient, func(ctx context.Context, sku inventory.SKU, locationID inventory.LocationID) { inventory.NewInventoryChangeListener(inventoryRedisClient, context.Background(), func(changes []inventory.InventoryChange) {
qty, err := inventoryService.GetInventory(ctx, sku, locationID) for _, change := range changes {
if err != nil {
log.Printf("error fetching inventory for sku %s at location %s: %v", sku, locationID, err)
}
srv.GrainPool.GetPubSub().Publish(actor.Event{ srv.GrainPool.GetPubSub().Publish(actor.Event{
Topic: fmt.Sprintf("inventory:%s:%s", sku, locationID), Topic: fmt.Sprintf("inventory:%s:%s", change.SKU, change.StockLocationID),
Payload: qty, Payload: change.Value,
}) })
}
}) })
return srv return srv
} }