diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 6f4ee5c..ec6c7c2 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -127,6 +127,13 @@ func main() { log.Fatalf("Error creating cart pool: %v\n", err) } + pool.SetPubSub(actor.NewPubSub(func(id uint64, event actor.Event) { + grain, _ := pool.Get(context.Background(), id) + if sub, ok := any(grain).(actor.Subscribable); ok { + sub.Notify(event) + } + })) + klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) rdb := redis.NewClient(&redis.Options{ Addr: redisAddress, diff --git a/pkg/actor/grain_pool.go b/pkg/actor/grain_pool.go index f51c220..7aa5bbd 100644 --- a/pkg/actor/grain_pool.go +++ b/pkg/actor/grain_pool.go @@ -26,6 +26,7 @@ type GrainPool[V any] interface { AddRemoteHost(host string) IsHealthy() bool IsKnown(string) bool + GetPubSub() *PubSub Close() } diff --git a/pkg/actor/pubsub.go b/pkg/actor/pubsub.go new file mode 100644 index 0000000..4c8cf11 --- /dev/null +++ b/pkg/actor/pubsub.go @@ -0,0 +1,93 @@ +package actor + +import ( + "sync" +) + +// Event represents an event to be published. +type Event struct { + Topic string + Payload interface{} +} + +// NotifyFunc is a function to notify a grain of an event. +type NotifyFunc func(grainID uint64, event Event) + +// Subscribable is an interface for grains that can receive notifications. +type Subscribable interface { + Notify(event Event) + UpdateSubscriptions(pubsub *PubSub) +} + +// PubSub manages subscriptions for grains to topics. +// Topics are strings, e.g., "sku:12345" +// Subscribers are grain IDs (uint64) +type PubSub struct { + subscribers map[string][]uint64 + mu sync.RWMutex + notify NotifyFunc +} + +// NewPubSub creates a new PubSub instance. +func NewPubSub(notify NotifyFunc) *PubSub { + return &PubSub{ + subscribers: make(map[string][]uint64), + notify: notify, + } +} + +// Subscribe adds a grain ID to the subscribers of a topic. +func (p *PubSub) Subscribe(topic string, grainID uint64) { + p.mu.Lock() + defer p.mu.Unlock() + p.subscribers[topic] = append(p.subscribers[topic], grainID) +} + +// Unsubscribe removes a grain ID from the subscribers of a topic. +func (p *PubSub) Unsubscribe(topic string, grainID uint64) { + p.mu.Lock() + defer p.mu.Unlock() + list := p.subscribers[topic] + for i, id := range list { + if id == grainID { + p.subscribers[topic] = append(list[:i], list[i+1:]...) + break + } + } + // If list is empty, could delete, but not necessary +} + +// UnsubscribeAll removes the grain ID from all topics. +func (p *PubSub) UnsubscribeAll(grainID uint64) { + p.mu.Lock() + defer p.mu.Unlock() + for topic, list := range p.subscribers { + newList := make([]uint64, 0, len(list)) + for _, id := range list { + if id != grainID { + newList = append(newList, id) + } + } + if len(newList) == 0 { + delete(p.subscribers, topic) + } else { + p.subscribers[topic] = newList + } + } +} + +// GetSubscribers returns a copy of the subscriber IDs for a topic. +func (p *PubSub) GetSubscribers(topic string) []uint64 { + p.mu.RLock() + defer p.mu.RUnlock() + list := p.subscribers[topic] + return append([]uint64(nil), list...) +} + +// Publish sends an event to all subscribers of the topic. +func (p *PubSub) Publish(event Event) { + subs := p.GetSubscribers(event.Topic) + for _, id := range subs { + p.notify(id, event) + } +} diff --git a/pkg/actor/simple_grain_pool.go b/pkg/actor/simple_grain_pool.go index f08dab4..eb8682f 100644 --- a/pkg/actor/simple_grain_pool.go +++ b/pkg/actor/simple_grain_pool.go @@ -22,6 +22,7 @@ type SimpleGrainPool[V any] struct { storage LogStorage[V] ttl time.Duration poolSize int + pubsub *PubSub // Cluster coordination -------------------------------------------------- hostname string @@ -88,7 +89,9 @@ func (p *SimpleGrainPool[V]) purge() { for id, grain := range p.grains { if grain.GetLastAccess().Before(purgeLimit) { purgedIds = append(purgedIds, id) - + if p.pubsub != nil { + p.pubsub.UnsubscribeAll(id) + } delete(p.grains, id) } } @@ -414,6 +417,11 @@ func (p *SimpleGrainPool[V]) Apply(ctx context.Context, id uint64, mutation ...p if err != nil { return nil, err } + if p.pubsub != nil { + if sub, ok := any(grain).(Subscribable); ok { + sub.UpdateSubscriptions(p.pubsub) + } + } return &MutationResult[*V]{ Result: result, Mutations: mutations, @@ -442,6 +450,21 @@ func (p *SimpleGrainPool[V]) Hostname() string { return p.hostname } +// GetPubSub returns the pubsub instance. +func (p *SimpleGrainPool[V]) GetPubSub() *PubSub { + return p.pubsub +} + +func (p *SimpleGrainPool[V]) SetPubSub(pubsub *PubSub) { + p.pubsub = pubsub +} + +func (p *SimpleGrainPool[V]) Publish(event Event) { + if p.pubsub != nil { + p.pubsub.Publish(event) + } +} + // Close notifies remotes that this host is shutting down. func (p *SimpleGrainPool[V]) Close() { diff --git a/pkg/cart/cart-grain.go b/pkg/cart/cart-grain.go index f780bd4..9ed8775 100644 --- a/pkg/cart/cart-grain.go +++ b/pkg/cart/cart-grain.go @@ -3,9 +3,11 @@ package cart import ( "encoding/json" "slices" + "strings" "sync" "time" + "git.tornberg.me/go-cart-actor/pkg/actor" messages "git.tornberg.me/go-cart-actor/pkg/messages" "git.tornberg.me/go-cart-actor/pkg/voucher" ) @@ -184,6 +186,40 @@ func (c *CartGrain) GetCurrentState() (*CartGrain, error) { return c, nil } +// Notify handles incoming events, e.g., inventory changes. +func (c *CartGrain) Notify(event actor.Event) { + c.mu.Lock() + defer c.mu.Unlock() + // Example: if event is inventory change for a SKU in the cart + if strings.HasPrefix(event.Topic, "inventory:") { + sku := strings.TrimPrefix(event.Topic, "inventory:") + for _, item := range c.Items { + if item.Sku == sku { + // Update stock status based on payload, e.g., if payload is bool available + if available, ok := event.Payload.(bool); ok { + if available { + item.Stock = StockStatus(1) // assuming 1 is in stock + } else { + item.Stock = StockStatus(0) // out of stock + } + } + break + } + } + } +} + +func (c *CartGrain) UpdateSubscriptions(pubsub *actor.PubSub) { + pubsub.UnsubscribeAll(c.GetId()) + skuSet := make(map[string]bool) + for _, item := range c.Items { + skuSet[item.Sku] = true + } + for sku := range skuSet { + pubsub.Subscribe("inventory:"+sku, c.GetId()) + } +} + func (c *CartGrain) GetState() ([]byte, error) { return json.Marshal(c) }