diff --git a/cmd/cart/main.go b/cmd/cart/main.go index bf3ad0b..1882c2f 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -80,6 +80,8 @@ func main() { log.Printf("loaded %d promotions", len(promotionData.State.Promotions)) + inventoryPubSub := actor.NewPubSub[inventory.InventoryChange]() + promotionService := promotions.NewPromotionService(nil) reg := cart.NewCartMultationRegistry() @@ -109,11 +111,20 @@ func main() { grainSpawns.Inc() ret := cart.NewCartGrain(id, time.Now()) // Set baseline lastChange at spawn; replay may update it to last event timestamp. - + inventoryPubSub.Subscribe(ret.HandleInventoryChange) err := diskStorage.LoadEvents(ctx, id, ret) return ret, err }, + Destroy: func(grain actor.Grain[cart.CartGrain]) error { + cart, err := grain.GetCurrentState() + if err != nil { + return err + } + inventoryPubSub.Unsubscribe(cart.HandleInventoryChange) + + return nil + }, SpawnHost: func(host string) (actor.Host, error) { return proxy.NewRemoteHost(host) }, @@ -127,13 +138,6 @@ func main() { log.Fatalf("Error creating cart pool: %v\n", err) } - pool.SetPubSub(actor.NewPubSub(func(id uint64, event actor.Event) { - grain, _ := pool.Get(context.Background(), id) - if sub, ok := any(grain).(actor.Subscribable); ok { - sub.Notify(event) - } - })) - klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) rdb := redis.NewClient(&redis.Options{ Addr: redisAddress, @@ -245,6 +249,15 @@ func main() { srvErr <- srv.ListenAndServe() }() + listener := inventory.NewInventoryChangeListener(rdb, context.Background(), func(changes []inventory.InventoryChange) { + for _, change := range changes { + log.Printf("inventory change: %v", change) + inventoryPubSub.Publish(change) + } + }) + + go listener.Start() + log.Print("Server started at port 8080") go http.ListenAndServe(":8081", debugMux) diff --git a/cmd/cart/pool-server.go b/cmd/cart/pool-server.go index 2affdde..5a1b7fa 100644 --- a/cmd/cart/pool-server.go +++ b/cmd/cart/pool-server.go @@ -43,10 +43,9 @@ var ( type PoolServer struct { actor.GrainPool[*cart.CartGrain] - pod_name string - klarnaClient *KlarnaClient - inventoryService inventory.InventoryService - inventoryListener *inventory.InventoryChangeListener + pod_name string + klarnaClient *KlarnaClient + inventoryService inventory.InventoryService } func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient, inventoryService inventory.InventoryService, inventoryRedisClient *redis.Client) *PoolServer { @@ -56,16 +55,7 @@ func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarn klarnaClient: klarnaClient, inventoryService: inventoryService, } - listener := inventory.NewInventoryChangeListener(inventoryRedisClient, context.Background(), func(changes []inventory.InventoryChange) { - for _, change := range changes { - srv.GrainPool.GetPubSub().Publish(actor.Event{ - Topic: fmt.Sprintf("inventory:%s", change.SKU), - Payload: change, - }) - } - }) - srv.inventoryListener = listener - go listener.Start() + return srv } diff --git a/pkg/actor/grain_pool.go b/pkg/actor/grain_pool.go index 7aa5bbd..f51c220 100644 --- a/pkg/actor/grain_pool.go +++ b/pkg/actor/grain_pool.go @@ -26,7 +26,6 @@ type GrainPool[V any] interface { AddRemoteHost(host string) IsHealthy() bool IsKnown(string) bool - GetPubSub() *PubSub Close() } diff --git a/pkg/actor/pubsub.go b/pkg/actor/pubsub.go index 4c8cf11..c066ac7 100644 --- a/pkg/actor/pubsub.go +++ b/pkg/actor/pubsub.go @@ -1,93 +1,79 @@ package actor import ( + "iter" + "log" + "slices" "sync" ) -// Event represents an event to be published. -type Event struct { - Topic string - Payload interface{} -} +type ReceiverFunc[V any] func(event V) -// NotifyFunc is a function to notify a grain of an event. -type NotifyFunc func(grainID uint64, event Event) - -// Subscribable is an interface for grains that can receive notifications. -type Subscribable interface { - Notify(event Event) - UpdateSubscriptions(pubsub *PubSub) -} - -// PubSub manages subscriptions for grains to topics. -// Topics are strings, e.g., "sku:12345" -// Subscribers are grain IDs (uint64) -type PubSub struct { - subscribers map[string][]uint64 +type PubSub[V any] struct { + subscribers []*ReceiverFunc[V] mu sync.RWMutex - notify NotifyFunc } // NewPubSub creates a new PubSub instance. -func NewPubSub(notify NotifyFunc) *PubSub { - return &PubSub{ - subscribers: make(map[string][]uint64), - notify: notify, +func NewPubSub[V any]() *PubSub[V] { + return &PubSub[V]{ + subscribers: make([]*ReceiverFunc[V], 0), } } // Subscribe adds a grain ID to the subscribers of a topic. -func (p *PubSub) Subscribe(topic string, grainID uint64) { +func (p *PubSub[V]) Subscribe(receiver ReceiverFunc[V]) { + if receiver == nil { + return + } p.mu.Lock() defer p.mu.Unlock() - p.subscribers[topic] = append(p.subscribers[topic], grainID) + log.Printf("adding subscriber") + p.subscribers = append(p.subscribers, &receiver) } // Unsubscribe removes a grain ID from the subscribers of a topic. -func (p *PubSub) Unsubscribe(topic string, grainID uint64) { +func (p *PubSub[V]) Unsubscribe(receiver ReceiverFunc[V]) { p.mu.Lock() defer p.mu.Unlock() - list := p.subscribers[topic] - for i, id := range list { - if id == grainID { - p.subscribers[topic] = append(list[:i], list[i+1:]...) + list := p.subscribers + prt := &receiver + for i, sub := range list { + if sub == nil { + continue + } + if sub == prt { + log.Printf("removing subscriber") + p.subscribers = append(list[:i], list[i+1:]...) break } } + p.subscribers = slices.DeleteFunc(p.subscribers, func(fn *ReceiverFunc[V]) bool { + return fn == nil + }) // If list is empty, could delete, but not necessary } -// UnsubscribeAll removes the grain ID from all topics. -func (p *PubSub) UnsubscribeAll(grainID uint64) { - p.mu.Lock() - defer p.mu.Unlock() - for topic, list := range p.subscribers { - newList := make([]uint64, 0, len(list)) - for _, id := range list { - if id != grainID { - newList = append(newList, id) - } - } - if len(newList) == 0 { - delete(p.subscribers, topic) - } else { - p.subscribers[topic] = newList - } - } -} - // GetSubscribers returns a copy of the subscriber IDs for a topic. -func (p *PubSub) GetSubscribers(topic string) []uint64 { +func (p *PubSub[V]) GetSubscribers() iter.Seq[ReceiverFunc[V]] { p.mu.RLock() defer p.mu.RUnlock() - list := p.subscribers[topic] - return append([]uint64(nil), list...) + + return func(yield func(ReceiverFunc[V]) bool) { + for _, sub := range p.subscribers { + if sub == nil { + continue + } + if !yield(*sub) { + return + } + } + } } // Publish sends an event to all subscribers of the topic. -func (p *PubSub) Publish(event Event) { - subs := p.GetSubscribers(event.Topic) - for _, id := range subs { - p.notify(id, event) +func (p *PubSub[V]) Publish(event V) { + for notify := range p.GetSubscribers() { + notify(event) } } diff --git a/pkg/actor/simple_grain_pool.go b/pkg/actor/simple_grain_pool.go index eb8682f..4f673b6 100644 --- a/pkg/actor/simple_grain_pool.go +++ b/pkg/actor/simple_grain_pool.go @@ -17,12 +17,12 @@ type SimpleGrainPool[V any] struct { grains map[uint64]Grain[V] mutationRegistry MutationRegistry spawn func(ctx context.Context, id uint64) (Grain[V], error) + destroy func(grain Grain[V]) error spawnHost func(host string) (Host, error) listeners []LogListener storage LogStorage[V] ttl time.Duration poolSize int - pubsub *PubSub // Cluster coordination -------------------------------------------------- hostname string @@ -39,6 +39,7 @@ type GrainPoolConfig[V any] struct { Hostname string Spawn func(ctx context.Context, id uint64) (Grain[V], error) SpawnHost func(host string) (Host, error) + Destroy func(grain Grain[V]) error TTL time.Duration PoolSize int MutationRegistry MutationRegistry @@ -52,6 +53,7 @@ func NewSimpleGrainPool[V any](config GrainPoolConfig[V]) (*SimpleGrainPool[V], storage: config.Storage, spawn: config.Spawn, spawnHost: config.SpawnHost, + destroy: config.Destroy, ttl: config.TTL, poolSize: config.PoolSize, hostname: config.Hostname, @@ -89,9 +91,10 @@ func (p *SimpleGrainPool[V]) purge() { for id, grain := range p.grains { if grain.GetLastAccess().Before(purgeLimit) { purgedIds = append(purgedIds, id) - if p.pubsub != nil { - p.pubsub.UnsubscribeAll(id) + if err := p.destroy(grain); err != nil { + log.Printf("failed to destroy grain %d: %v", id, err) } + delete(p.grains, id) } } @@ -417,11 +420,7 @@ func (p *SimpleGrainPool[V]) Apply(ctx context.Context, id uint64, mutation ...p if err != nil { return nil, err } - if p.pubsub != nil { - if sub, ok := any(grain).(Subscribable); ok { - sub.UpdateSubscriptions(p.pubsub) - } - } + return &MutationResult[*V]{ Result: result, Mutations: mutations, @@ -450,21 +449,6 @@ func (p *SimpleGrainPool[V]) Hostname() string { return p.hostname } -// GetPubSub returns the pubsub instance. -func (p *SimpleGrainPool[V]) GetPubSub() *PubSub { - return p.pubsub -} - -func (p *SimpleGrainPool[V]) SetPubSub(pubsub *PubSub) { - p.pubsub = pubsub -} - -func (p *SimpleGrainPool[V]) Publish(event Event) { - if p.pubsub != nil { - p.pubsub.Publish(event) - } -} - // Close notifies remotes that this host is shutting down. func (p *SimpleGrainPool[V]) Close() { diff --git a/pkg/cart/cart-grain.go b/pkg/cart/cart-grain.go index eb671fe..a7481f1 100644 --- a/pkg/cart/cart-grain.go +++ b/pkg/cart/cart-grain.go @@ -2,13 +2,10 @@ package cart import ( "encoding/json" - "log" "slices" - "strings" "sync" "time" - "git.k6n.net/go-cart-actor/pkg/actor" messages "git.k6n.net/go-cart-actor/pkg/messages" "git.k6n.net/go-cart-actor/pkg/voucher" "github.com/matst80/go-redis-inventory/pkg/inventory" @@ -230,43 +227,12 @@ func (c *CartGrain) GetCurrentState() (*CartGrain, error) { return c, nil } -// Notify handles incoming events, e.g., inventory changes. -func (c *CartGrain) Notify(event actor.Event) { - c.mu.Lock() - defer c.mu.Unlock() - // Example: if event is inventory change for a SKU in the cart - if strings.HasPrefix(event.Topic, "inventory:") { - sku := strings.TrimPrefix(event.Topic, "inventory:") - - log.Printf("cart grain got inventory update: %s", event.Topic) - update, ok := event.Payload.(inventory.InventoryChange) - if !ok { - log.Printf("cart grain inventory update has invalid payload") - return - } - for _, item := range c.Items { - - if item.Sku == sku && update.StockLocationID == *item.StoreId { - // Update stock status based on payload, e.g., if payload is bool available - - log.Printf("cart grain got item stock update %d", update.Value) - - item.Stock = uint16(update.Value) - - break - } - } - } -} - -func (c *CartGrain) UpdateSubscriptions(pubsub *actor.PubSub) { - pubsub.UnsubscribeAll(c.GetId()) - skuSet := make(map[string]bool) +func (c *CartGrain) HandleInventoryChange(change inventory.InventoryChange) { for _, item := range c.Items { - skuSet[item.Sku] = true - } - for sku := range skuSet { - pubsub.Subscribe("inventory:"+sku, c.GetId()) + if item.Sku == change.SKU && change.StockLocationID == *item.StoreId { + item.Stock = uint16(change.Value) + break + } } }