package actor import ( "iter" "slices" "sync" ) type ReceiverFunc[V any] func(event V) type PubSub[V any] struct { subscribers []*ReceiverFunc[V] mu sync.RWMutex } // NewPubSub creates a new PubSub instance. func NewPubSub[V any]() *PubSub[V] { return &PubSub[V]{ subscribers: make([]*ReceiverFunc[V], 0), } } // Subscribe adds a grain ID to the subscribers of a topic. func (p *PubSub[V]) Subscribe(receiver ReceiverFunc[V]) { if receiver == nil { return } p.mu.Lock() defer p.mu.Unlock() // log.Printf("adding subscriber") p.subscribers = append(p.subscribers, &receiver) } // Unsubscribe removes a grain ID from the subscribers of a topic. func (p *PubSub[V]) Unsubscribe(receiver ReceiverFunc[V]) { p.mu.Lock() defer p.mu.Unlock() list := p.subscribers prt := &receiver for i, sub := range list { if sub == nil { continue } if sub == prt { // log.Printf("removing subscriber") p.subscribers = append(list[:i], list[i+1:]...) break } } p.subscribers = slices.DeleteFunc(p.subscribers, func(fn *ReceiverFunc[V]) bool { return fn == nil }) // If list is empty, could delete, but not necessary } // GetSubscribers returns a copy of the subscriber IDs for a topic. func (p *PubSub[V]) GetSubscribers() iter.Seq[ReceiverFunc[V]] { p.mu.RLock() defer p.mu.RUnlock() return func(yield func(ReceiverFunc[V]) bool) { for _, sub := range p.subscribers { if sub == nil { continue } if !yield(*sub) { return } } } } // Publish sends an event to all subscribers of the topic. func (p *PubSub[V]) Publish(event V) { for notify := range p.GetSubscribers() { notify(event) } }