Files
go-cart-actor/pkg/actor/pubsub.go
matst80 59e01e763f
All checks were successful
Build and Publish / BuildAndDeployAmd64 (push) Successful in 35s
Build and Publish / BuildAndDeployArm64 (push) Successful in 3m49s
remove promotions for now
2025-11-25 23:16:54 +01:00

79 lines
1.6 KiB
Go

package actor
import (
"iter"
"slices"
"sync"
)
type ReceiverFunc[V any] func(event V)
type PubSub[V any] struct {
subscribers []*ReceiverFunc[V]
mu sync.RWMutex
}
// NewPubSub creates a new PubSub instance.
func NewPubSub[V any]() *PubSub[V] {
return &PubSub[V]{
subscribers: make([]*ReceiverFunc[V], 0),
}
}
// Subscribe adds a grain ID to the subscribers of a topic.
func (p *PubSub[V]) Subscribe(receiver ReceiverFunc[V]) {
if receiver == nil {
return
}
p.mu.Lock()
defer p.mu.Unlock()
// log.Printf("adding subscriber")
p.subscribers = append(p.subscribers, &receiver)
}
// Unsubscribe removes a grain ID from the subscribers of a topic.
func (p *PubSub[V]) Unsubscribe(receiver ReceiverFunc[V]) {
p.mu.Lock()
defer p.mu.Unlock()
list := p.subscribers
prt := &receiver
for i, sub := range list {
if sub == nil {
continue
}
if sub == prt {
// log.Printf("removing subscriber")
p.subscribers = append(list[:i], list[i+1:]...)
break
}
}
p.subscribers = slices.DeleteFunc(p.subscribers, func(fn *ReceiverFunc[V]) bool {
return fn == nil
})
// If list is empty, could delete, but not necessary
}
// GetSubscribers returns a copy of the subscriber IDs for a topic.
func (p *PubSub[V]) GetSubscribers() iter.Seq[ReceiverFunc[V]] {
p.mu.RLock()
defer p.mu.RUnlock()
return func(yield func(ReceiverFunc[V]) bool) {
for _, sub := range p.subscribers {
if sub == nil {
continue
}
if !yield(*sub) {
return
}
}
}
}
// Publish sends an event to all subscribers of the topic.
func (p *PubSub[V]) Publish(event V) {
for notify := range p.GetSubscribers() {
notify(event)
}
}