package actor import ( "sync" ) // Event represents an event to be published. type Event struct { Topic string Payload interface{} } // NotifyFunc is a function to notify a grain of an event. type NotifyFunc func(grainID uint64, event Event) // Subscribable is an interface for grains that can receive notifications. type Subscribable interface { Notify(event Event) UpdateSubscriptions(pubsub *PubSub) } // PubSub manages subscriptions for grains to topics. // Topics are strings, e.g., "sku:12345" // Subscribers are grain IDs (uint64) type PubSub struct { subscribers map[string][]uint64 mu sync.RWMutex notify NotifyFunc } // NewPubSub creates a new PubSub instance. func NewPubSub(notify NotifyFunc) *PubSub { return &PubSub{ subscribers: make(map[string][]uint64), notify: notify, } } // Subscribe adds a grain ID to the subscribers of a topic. func (p *PubSub) Subscribe(topic string, grainID uint64) { p.mu.Lock() defer p.mu.Unlock() p.subscribers[topic] = append(p.subscribers[topic], grainID) } // Unsubscribe removes a grain ID from the subscribers of a topic. func (p *PubSub) Unsubscribe(topic string, grainID uint64) { p.mu.Lock() defer p.mu.Unlock() list := p.subscribers[topic] for i, id := range list { if id == grainID { p.subscribers[topic] = append(list[:i], list[i+1:]...) break } } // If list is empty, could delete, but not necessary } // UnsubscribeAll removes the grain ID from all topics. func (p *PubSub) UnsubscribeAll(grainID uint64) { p.mu.Lock() defer p.mu.Unlock() for topic, list := range p.subscribers { newList := make([]uint64, 0, len(list)) for _, id := range list { if id != grainID { newList = append(newList, id) } } if len(newList) == 0 { delete(p.subscribers, topic) } else { p.subscribers[topic] = newList } } } // GetSubscribers returns a copy of the subscriber IDs for a topic. func (p *PubSub) GetSubscribers(topic string) []uint64 { p.mu.RLock() defer p.mu.RUnlock() list := p.subscribers[topic] return append([]uint64(nil), list...) } // Publish sends an event to all subscribers of the topic. func (p *PubSub) Publish(event Event) { subs := p.GetSubscribers(event.Topic) for _, id := range subs { p.notify(id, event) } }