94 lines
2.2 KiB
Go
94 lines
2.2 KiB
Go
package actor
|
|
|
|
import (
|
|
"sync"
|
|
)
|
|
|
|
// Event represents an event to be published.
|
|
type Event struct {
|
|
Topic string
|
|
Payload interface{}
|
|
}
|
|
|
|
// NotifyFunc is a function to notify a grain of an event.
|
|
type NotifyFunc func(grainID uint64, event Event)
|
|
|
|
// Subscribable is an interface for grains that can receive notifications.
|
|
type Subscribable interface {
|
|
Notify(event Event)
|
|
UpdateSubscriptions(pubsub *PubSub)
|
|
}
|
|
|
|
// PubSub manages subscriptions for grains to topics.
|
|
// Topics are strings, e.g., "sku:12345"
|
|
// Subscribers are grain IDs (uint64)
|
|
type PubSub struct {
|
|
subscribers map[string][]uint64
|
|
mu sync.RWMutex
|
|
notify NotifyFunc
|
|
}
|
|
|
|
// NewPubSub creates a new PubSub instance.
|
|
func NewPubSub(notify NotifyFunc) *PubSub {
|
|
return &PubSub{
|
|
subscribers: make(map[string][]uint64),
|
|
notify: notify,
|
|
}
|
|
}
|
|
|
|
// Subscribe adds a grain ID to the subscribers of a topic.
|
|
func (p *PubSub) Subscribe(topic string, grainID uint64) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
p.subscribers[topic] = append(p.subscribers[topic], grainID)
|
|
}
|
|
|
|
// Unsubscribe removes a grain ID from the subscribers of a topic.
|
|
func (p *PubSub) Unsubscribe(topic string, grainID uint64) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
list := p.subscribers[topic]
|
|
for i, id := range list {
|
|
if id == grainID {
|
|
p.subscribers[topic] = append(list[:i], list[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
// If list is empty, could delete, but not necessary
|
|
}
|
|
|
|
// UnsubscribeAll removes the grain ID from all topics.
|
|
func (p *PubSub) UnsubscribeAll(grainID uint64) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
for topic, list := range p.subscribers {
|
|
newList := make([]uint64, 0, len(list))
|
|
for _, id := range list {
|
|
if id != grainID {
|
|
newList = append(newList, id)
|
|
}
|
|
}
|
|
if len(newList) == 0 {
|
|
delete(p.subscribers, topic)
|
|
} else {
|
|
p.subscribers[topic] = newList
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetSubscribers returns a copy of the subscriber IDs for a topic.
|
|
func (p *PubSub) GetSubscribers(topic string) []uint64 {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
list := p.subscribers[topic]
|
|
return append([]uint64(nil), list...)
|
|
}
|
|
|
|
// Publish sends an event to all subscribers of the topic.
|
|
func (p *PubSub) Publish(event Event) {
|
|
subs := p.GetSubscribers(event.Topic)
|
|
for _, id := range subs {
|
|
p.notify(id, event)
|
|
}
|
|
}
|