feature/pubsub #7

Merged
mats merged 67 commits from feature/pubsub into main 2025-11-28 17:45:22 +01:00
5 changed files with 161 additions and 1 deletions
Showing only changes of commit a7aab5161b - Show all commits

View File

@@ -127,6 +127,13 @@ func main() {
log.Fatalf("Error creating cart pool: %v\n", err) log.Fatalf("Error creating cart pool: %v\n", err)
} }
pool.SetPubSub(actor.NewPubSub(func(id uint64, event actor.Event) {
grain, _ := pool.Get(context.Background(), id)
if sub, ok := any(grain).(actor.Subscribable); ok {
sub.Notify(event)
}
}))
klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD"))
rdb := redis.NewClient(&redis.Options{ rdb := redis.NewClient(&redis.Options{
Addr: redisAddress, Addr: redisAddress,

View File

@@ -26,6 +26,7 @@ type GrainPool[V any] interface {
AddRemoteHost(host string) AddRemoteHost(host string)
IsHealthy() bool IsHealthy() bool
IsKnown(string) bool IsKnown(string) bool
GetPubSub() *PubSub
Close() Close()
} }

93
pkg/actor/pubsub.go Normal file
View File

@@ -0,0 +1,93 @@
package actor
import (
"sync"
)
// Event represents an event to be published.
type Event struct {
Topic string
Payload interface{}
}
// NotifyFunc is a function to notify a grain of an event.
type NotifyFunc func(grainID uint64, event Event)
// Subscribable is an interface for grains that can receive notifications.
type Subscribable interface {
Notify(event Event)
UpdateSubscriptions(pubsub *PubSub)
}
// PubSub manages subscriptions for grains to topics.
// Topics are strings, e.g., "sku:12345"
// Subscribers are grain IDs (uint64)
type PubSub struct {
subscribers map[string][]uint64
mu sync.RWMutex
notify NotifyFunc
}
// NewPubSub creates a new PubSub instance.
func NewPubSub(notify NotifyFunc) *PubSub {
return &PubSub{
subscribers: make(map[string][]uint64),
notify: notify,
}
}
// Subscribe adds a grain ID to the subscribers of a topic.
func (p *PubSub) Subscribe(topic string, grainID uint64) {
p.mu.Lock()
defer p.mu.Unlock()
p.subscribers[topic] = append(p.subscribers[topic], grainID)
}
// Unsubscribe removes a grain ID from the subscribers of a topic.
func (p *PubSub) Unsubscribe(topic string, grainID uint64) {
p.mu.Lock()
defer p.mu.Unlock()
list := p.subscribers[topic]
for i, id := range list {
if id == grainID {
p.subscribers[topic] = append(list[:i], list[i+1:]...)
break
}
}
// If list is empty, could delete, but not necessary
}
// UnsubscribeAll removes the grain ID from all topics.
func (p *PubSub) UnsubscribeAll(grainID uint64) {
p.mu.Lock()
defer p.mu.Unlock()
for topic, list := range p.subscribers {
newList := make([]uint64, 0, len(list))
for _, id := range list {
if id != grainID {
newList = append(newList, id)
}
}
if len(newList) == 0 {
delete(p.subscribers, topic)
} else {
p.subscribers[topic] = newList
}
}
}
// GetSubscribers returns a copy of the subscriber IDs for a topic.
func (p *PubSub) GetSubscribers(topic string) []uint64 {
p.mu.RLock()
defer p.mu.RUnlock()
list := p.subscribers[topic]
return append([]uint64(nil), list...)
}
// Publish sends an event to all subscribers of the topic.
func (p *PubSub) Publish(event Event) {
subs := p.GetSubscribers(event.Topic)
for _, id := range subs {
p.notify(id, event)
}
}

View File

@@ -22,6 +22,7 @@ type SimpleGrainPool[V any] struct {
storage LogStorage[V] storage LogStorage[V]
ttl time.Duration ttl time.Duration
poolSize int poolSize int
pubsub *PubSub
// Cluster coordination -------------------------------------------------- // Cluster coordination --------------------------------------------------
hostname string hostname string
@@ -88,7 +89,9 @@ func (p *SimpleGrainPool[V]) purge() {
for id, grain := range p.grains { for id, grain := range p.grains {
if grain.GetLastAccess().Before(purgeLimit) { if grain.GetLastAccess().Before(purgeLimit) {
purgedIds = append(purgedIds, id) purgedIds = append(purgedIds, id)
if p.pubsub != nil {
p.pubsub.UnsubscribeAll(id)
}
delete(p.grains, id) delete(p.grains, id)
} }
} }
@@ -414,6 +417,11 @@ func (p *SimpleGrainPool[V]) Apply(ctx context.Context, id uint64, mutation ...p
if err != nil { if err != nil {
return nil, err return nil, err
} }
if p.pubsub != nil {
if sub, ok := any(grain).(Subscribable); ok {
sub.UpdateSubscriptions(p.pubsub)
}
}
return &MutationResult[*V]{ return &MutationResult[*V]{
Result: result, Result: result,
Mutations: mutations, Mutations: mutations,
@@ -442,6 +450,21 @@ func (p *SimpleGrainPool[V]) Hostname() string {
return p.hostname return p.hostname
} }
// GetPubSub returns the pubsub instance.
func (p *SimpleGrainPool[V]) GetPubSub() *PubSub {
return p.pubsub
}
func (p *SimpleGrainPool[V]) SetPubSub(pubsub *PubSub) {
p.pubsub = pubsub
}
func (p *SimpleGrainPool[V]) Publish(event Event) {
if p.pubsub != nil {
p.pubsub.Publish(event)
}
}
// Close notifies remotes that this host is shutting down. // Close notifies remotes that this host is shutting down.
func (p *SimpleGrainPool[V]) Close() { func (p *SimpleGrainPool[V]) Close() {

View File

@@ -3,9 +3,11 @@ package cart
import ( import (
"encoding/json" "encoding/json"
"slices" "slices"
"strings"
"sync" "sync"
"time" "time"
"git.tornberg.me/go-cart-actor/pkg/actor"
messages "git.tornberg.me/go-cart-actor/pkg/messages" messages "git.tornberg.me/go-cart-actor/pkg/messages"
"git.tornberg.me/go-cart-actor/pkg/voucher" "git.tornberg.me/go-cart-actor/pkg/voucher"
) )
@@ -184,6 +186,40 @@ func (c *CartGrain) GetCurrentState() (*CartGrain, error) {
return c, nil return c, nil
} }
// Notify handles incoming events, e.g., inventory changes.
func (c *CartGrain) Notify(event actor.Event) {
c.mu.Lock()
defer c.mu.Unlock()
// Example: if event is inventory change for a SKU in the cart
if strings.HasPrefix(event.Topic, "inventory:") {
sku := strings.TrimPrefix(event.Topic, "inventory:")
for _, item := range c.Items {
if item.Sku == sku {
// Update stock status based on payload, e.g., if payload is bool available
if available, ok := event.Payload.(bool); ok {
if available {
item.Stock = StockStatus(1) // assuming 1 is in stock
} else {
item.Stock = StockStatus(0) // out of stock
}
}
break
}
}
}
}
func (c *CartGrain) UpdateSubscriptions(pubsub *actor.PubSub) {
pubsub.UnsubscribeAll(c.GetId())
skuSet := make(map[string]bool)
for _, item := range c.Items {
skuSet[item.Sku] = true
}
for sku := range skuSet {
pubsub.Subscribe("inventory:"+sku, c.GetId())
}
}
func (c *CartGrain) GetState() ([]byte, error) { func (c *CartGrain) GetState() ([]byte, error) {
return json.Marshal(c) return json.Marshal(c)
} }