79 lines
1.6 KiB
Go
79 lines
1.6 KiB
Go
package actor
|
|
|
|
import (
|
|
"iter"
|
|
"slices"
|
|
"sync"
|
|
)
|
|
|
|
type ReceiverFunc[V any] func(event V)
|
|
|
|
type PubSub[V any] struct {
|
|
subscribers []*ReceiverFunc[V]
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewPubSub creates a new PubSub instance.
|
|
func NewPubSub[V any]() *PubSub[V] {
|
|
return &PubSub[V]{
|
|
subscribers: make([]*ReceiverFunc[V], 0),
|
|
}
|
|
}
|
|
|
|
// Subscribe adds a grain ID to the subscribers of a topic.
|
|
func (p *PubSub[V]) Subscribe(receiver ReceiverFunc[V]) {
|
|
if receiver == nil {
|
|
return
|
|
}
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
// log.Printf("adding subscriber")
|
|
p.subscribers = append(p.subscribers, &receiver)
|
|
}
|
|
|
|
// Unsubscribe removes a grain ID from the subscribers of a topic.
|
|
func (p *PubSub[V]) Unsubscribe(receiver ReceiverFunc[V]) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
list := p.subscribers
|
|
prt := &receiver
|
|
for i, sub := range list {
|
|
if sub == nil {
|
|
continue
|
|
}
|
|
if sub == prt {
|
|
// log.Printf("removing subscriber")
|
|
p.subscribers = append(list[:i], list[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
p.subscribers = slices.DeleteFunc(p.subscribers, func(fn *ReceiverFunc[V]) bool {
|
|
return fn == nil
|
|
})
|
|
// If list is empty, could delete, but not necessary
|
|
}
|
|
|
|
// GetSubscribers returns a copy of the subscriber IDs for a topic.
|
|
func (p *PubSub[V]) GetSubscribers() iter.Seq[ReceiverFunc[V]] {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
|
|
return func(yield func(ReceiverFunc[V]) bool) {
|
|
for _, sub := range p.subscribers {
|
|
if sub == nil {
|
|
continue
|
|
}
|
|
if !yield(*sub) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Publish sends an event to all subscribers of the topic.
|
|
func (p *PubSub[V]) Publish(event V) {
|
|
for notify := range p.GetSubscribers() {
|
|
notify(event)
|
|
}
|
|
}
|