feature/pubsub #7

Merged
mats merged 67 commits from feature/pubsub into main 2025-11-28 17:45:22 +01:00
6 changed files with 80 additions and 142 deletions
Showing only changes of commit d5d2b3e711 - Show all commits

View File

@@ -80,6 +80,8 @@ func main() {
log.Printf("loaded %d promotions", len(promotionData.State.Promotions)) log.Printf("loaded %d promotions", len(promotionData.State.Promotions))
inventoryPubSub := actor.NewPubSub[inventory.InventoryChange]()
promotionService := promotions.NewPromotionService(nil) promotionService := promotions.NewPromotionService(nil)
reg := cart.NewCartMultationRegistry() reg := cart.NewCartMultationRegistry()
@@ -109,11 +111,20 @@ func main() {
grainSpawns.Inc() grainSpawns.Inc()
ret := cart.NewCartGrain(id, time.Now()) ret := cart.NewCartGrain(id, time.Now())
// Set baseline lastChange at spawn; replay may update it to last event timestamp. // Set baseline lastChange at spawn; replay may update it to last event timestamp.
inventoryPubSub.Subscribe(ret.HandleInventoryChange)
err := diskStorage.LoadEvents(ctx, id, ret) err := diskStorage.LoadEvents(ctx, id, ret)
return ret, err return ret, err
}, },
Destroy: func(grain actor.Grain[cart.CartGrain]) error {
cart, err := grain.GetCurrentState()
if err != nil {
return err
}
inventoryPubSub.Unsubscribe(cart.HandleInventoryChange)
return nil
},
SpawnHost: func(host string) (actor.Host, error) { SpawnHost: func(host string) (actor.Host, error) {
return proxy.NewRemoteHost(host) return proxy.NewRemoteHost(host)
}, },
@@ -127,13 +138,6 @@ func main() {
log.Fatalf("Error creating cart pool: %v\n", err) log.Fatalf("Error creating cart pool: %v\n", err)
} }
pool.SetPubSub(actor.NewPubSub(func(id uint64, event actor.Event) {
grain, _ := pool.Get(context.Background(), id)
if sub, ok := any(grain).(actor.Subscribable); ok {
sub.Notify(event)
}
}))
klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD"))
rdb := redis.NewClient(&redis.Options{ rdb := redis.NewClient(&redis.Options{
Addr: redisAddress, Addr: redisAddress,
@@ -245,6 +249,15 @@ func main() {
srvErr <- srv.ListenAndServe() srvErr <- srv.ListenAndServe()
}() }()
listener := inventory.NewInventoryChangeListener(rdb, context.Background(), func(changes []inventory.InventoryChange) {
for _, change := range changes {
log.Printf("inventory change: %v", change)
inventoryPubSub.Publish(change)
}
})
go listener.Start()
log.Print("Server started at port 8080") log.Print("Server started at port 8080")
go http.ListenAndServe(":8081", debugMux) go http.ListenAndServe(":8081", debugMux)

View File

@@ -43,10 +43,9 @@ var (
type PoolServer struct { type PoolServer struct {
actor.GrainPool[*cart.CartGrain] actor.GrainPool[*cart.CartGrain]
pod_name string pod_name string
klarnaClient *KlarnaClient klarnaClient *KlarnaClient
inventoryService inventory.InventoryService inventoryService inventory.InventoryService
inventoryListener *inventory.InventoryChangeListener
} }
func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient, inventoryService inventory.InventoryService, inventoryRedisClient *redis.Client) *PoolServer { func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient, inventoryService inventory.InventoryService, inventoryRedisClient *redis.Client) *PoolServer {
@@ -56,16 +55,7 @@ func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarn
klarnaClient: klarnaClient, klarnaClient: klarnaClient,
inventoryService: inventoryService, inventoryService: inventoryService,
} }
listener := inventory.NewInventoryChangeListener(inventoryRedisClient, context.Background(), func(changes []inventory.InventoryChange) {
for _, change := range changes {
srv.GrainPool.GetPubSub().Publish(actor.Event{
Topic: fmt.Sprintf("inventory:%s", change.SKU),
Payload: change,
})
}
})
srv.inventoryListener = listener
go listener.Start()
return srv return srv
} }

View File

@@ -26,7 +26,6 @@ type GrainPool[V any] interface {
AddRemoteHost(host string) AddRemoteHost(host string)
IsHealthy() bool IsHealthy() bool
IsKnown(string) bool IsKnown(string) bool
GetPubSub() *PubSub
Close() Close()
} }

View File

@@ -1,93 +1,79 @@
package actor package actor
import ( import (
"iter"
"log"
"slices"
"sync" "sync"
) )
// Event represents an event to be published. type ReceiverFunc[V any] func(event V)
type Event struct {
Topic string
Payload interface{}
}
// NotifyFunc is a function to notify a grain of an event. type PubSub[V any] struct {
type NotifyFunc func(grainID uint64, event Event) subscribers []*ReceiverFunc[V]
// Subscribable is an interface for grains that can receive notifications.
type Subscribable interface {
Notify(event Event)
UpdateSubscriptions(pubsub *PubSub)
}
// PubSub manages subscriptions for grains to topics.
// Topics are strings, e.g., "sku:12345"
// Subscribers are grain IDs (uint64)
type PubSub struct {
subscribers map[string][]uint64
mu sync.RWMutex mu sync.RWMutex
notify NotifyFunc
} }
// NewPubSub creates a new PubSub instance. // NewPubSub creates a new PubSub instance.
func NewPubSub(notify NotifyFunc) *PubSub { func NewPubSub[V any]() *PubSub[V] {
return &PubSub{ return &PubSub[V]{
subscribers: make(map[string][]uint64), subscribers: make([]*ReceiverFunc[V], 0),
notify: notify,
} }
} }
// Subscribe adds a grain ID to the subscribers of a topic. // Subscribe adds a grain ID to the subscribers of a topic.
func (p *PubSub) Subscribe(topic string, grainID uint64) { func (p *PubSub[V]) Subscribe(receiver ReceiverFunc[V]) {
if receiver == nil {
return
}
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
p.subscribers[topic] = append(p.subscribers[topic], grainID) log.Printf("adding subscriber")
p.subscribers = append(p.subscribers, &receiver)
} }
// Unsubscribe removes a grain ID from the subscribers of a topic. // Unsubscribe removes a grain ID from the subscribers of a topic.
func (p *PubSub) Unsubscribe(topic string, grainID uint64) { func (p *PubSub[V]) Unsubscribe(receiver ReceiverFunc[V]) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
list := p.subscribers[topic] list := p.subscribers
for i, id := range list { prt := &receiver
if id == grainID { for i, sub := range list {
p.subscribers[topic] = append(list[:i], list[i+1:]...) if sub == nil {
continue
}
if sub == prt {
log.Printf("removing subscriber")
p.subscribers = append(list[:i], list[i+1:]...)
break break
} }
} }
p.subscribers = slices.DeleteFunc(p.subscribers, func(fn *ReceiverFunc[V]) bool {
return fn == nil
})
// If list is empty, could delete, but not necessary // If list is empty, could delete, but not necessary
} }
// UnsubscribeAll removes the grain ID from all topics.
func (p *PubSub) UnsubscribeAll(grainID uint64) {
p.mu.Lock()
defer p.mu.Unlock()
for topic, list := range p.subscribers {
newList := make([]uint64, 0, len(list))
for _, id := range list {
if id != grainID {
newList = append(newList, id)
}
}
if len(newList) == 0 {
delete(p.subscribers, topic)
} else {
p.subscribers[topic] = newList
}
}
}
// GetSubscribers returns a copy of the subscriber IDs for a topic. // GetSubscribers returns a copy of the subscriber IDs for a topic.
func (p *PubSub) GetSubscribers(topic string) []uint64 { func (p *PubSub[V]) GetSubscribers() iter.Seq[ReceiverFunc[V]] {
p.mu.RLock() p.mu.RLock()
defer p.mu.RUnlock() defer p.mu.RUnlock()
list := p.subscribers[topic]
return append([]uint64(nil), list...) return func(yield func(ReceiverFunc[V]) bool) {
for _, sub := range p.subscribers {
if sub == nil {
continue
}
if !yield(*sub) {
return
}
}
}
} }
// Publish sends an event to all subscribers of the topic. // Publish sends an event to all subscribers of the topic.
func (p *PubSub) Publish(event Event) { func (p *PubSub[V]) Publish(event V) {
subs := p.GetSubscribers(event.Topic) for notify := range p.GetSubscribers() {
for _, id := range subs { notify(event)
p.notify(id, event)
} }
} }

View File

@@ -17,12 +17,12 @@ type SimpleGrainPool[V any] struct {
grains map[uint64]Grain[V] grains map[uint64]Grain[V]
mutationRegistry MutationRegistry mutationRegistry MutationRegistry
spawn func(ctx context.Context, id uint64) (Grain[V], error) spawn func(ctx context.Context, id uint64) (Grain[V], error)
destroy func(grain Grain[V]) error
spawnHost func(host string) (Host, error) spawnHost func(host string) (Host, error)
listeners []LogListener listeners []LogListener
storage LogStorage[V] storage LogStorage[V]
ttl time.Duration ttl time.Duration
poolSize int poolSize int
pubsub *PubSub
// Cluster coordination -------------------------------------------------- // Cluster coordination --------------------------------------------------
hostname string hostname string
@@ -39,6 +39,7 @@ type GrainPoolConfig[V any] struct {
Hostname string Hostname string
Spawn func(ctx context.Context, id uint64) (Grain[V], error) Spawn func(ctx context.Context, id uint64) (Grain[V], error)
SpawnHost func(host string) (Host, error) SpawnHost func(host string) (Host, error)
Destroy func(grain Grain[V]) error
TTL time.Duration TTL time.Duration
PoolSize int PoolSize int
MutationRegistry MutationRegistry MutationRegistry MutationRegistry
@@ -52,6 +53,7 @@ func NewSimpleGrainPool[V any](config GrainPoolConfig[V]) (*SimpleGrainPool[V],
storage: config.Storage, storage: config.Storage,
spawn: config.Spawn, spawn: config.Spawn,
spawnHost: config.SpawnHost, spawnHost: config.SpawnHost,
destroy: config.Destroy,
ttl: config.TTL, ttl: config.TTL,
poolSize: config.PoolSize, poolSize: config.PoolSize,
hostname: config.Hostname, hostname: config.Hostname,
@@ -89,9 +91,10 @@ func (p *SimpleGrainPool[V]) purge() {
for id, grain := range p.grains { for id, grain := range p.grains {
if grain.GetLastAccess().Before(purgeLimit) { if grain.GetLastAccess().Before(purgeLimit) {
purgedIds = append(purgedIds, id) purgedIds = append(purgedIds, id)
if p.pubsub != nil { if err := p.destroy(grain); err != nil {
p.pubsub.UnsubscribeAll(id) log.Printf("failed to destroy grain %d: %v", id, err)
} }
delete(p.grains, id) delete(p.grains, id)
} }
} }
@@ -417,11 +420,7 @@ func (p *SimpleGrainPool[V]) Apply(ctx context.Context, id uint64, mutation ...p
if err != nil { if err != nil {
return nil, err return nil, err
} }
if p.pubsub != nil {
if sub, ok := any(grain).(Subscribable); ok {
sub.UpdateSubscriptions(p.pubsub)
}
}
return &MutationResult[*V]{ return &MutationResult[*V]{
Result: result, Result: result,
Mutations: mutations, Mutations: mutations,
@@ -450,21 +449,6 @@ func (p *SimpleGrainPool[V]) Hostname() string {
return p.hostname return p.hostname
} }
// GetPubSub returns the pubsub instance.
func (p *SimpleGrainPool[V]) GetPubSub() *PubSub {
return p.pubsub
}
func (p *SimpleGrainPool[V]) SetPubSub(pubsub *PubSub) {
p.pubsub = pubsub
}
func (p *SimpleGrainPool[V]) Publish(event Event) {
if p.pubsub != nil {
p.pubsub.Publish(event)
}
}
// Close notifies remotes that this host is shutting down. // Close notifies remotes that this host is shutting down.
func (p *SimpleGrainPool[V]) Close() { func (p *SimpleGrainPool[V]) Close() {

View File

@@ -2,13 +2,10 @@ package cart
import ( import (
"encoding/json" "encoding/json"
"log"
"slices" "slices"
"strings"
"sync" "sync"
"time" "time"
"git.k6n.net/go-cart-actor/pkg/actor"
messages "git.k6n.net/go-cart-actor/pkg/messages" messages "git.k6n.net/go-cart-actor/pkg/messages"
"git.k6n.net/go-cart-actor/pkg/voucher" "git.k6n.net/go-cart-actor/pkg/voucher"
"github.com/matst80/go-redis-inventory/pkg/inventory" "github.com/matst80/go-redis-inventory/pkg/inventory"
@@ -230,43 +227,12 @@ func (c *CartGrain) GetCurrentState() (*CartGrain, error) {
return c, nil return c, nil
} }
// Notify handles incoming events, e.g., inventory changes. func (c *CartGrain) HandleInventoryChange(change inventory.InventoryChange) {
func (c *CartGrain) Notify(event actor.Event) {
c.mu.Lock()
defer c.mu.Unlock()
// Example: if event is inventory change for a SKU in the cart
if strings.HasPrefix(event.Topic, "inventory:") {
sku := strings.TrimPrefix(event.Topic, "inventory:")
log.Printf("cart grain got inventory update: %s", event.Topic)
update, ok := event.Payload.(inventory.InventoryChange)
if !ok {
log.Printf("cart grain inventory update has invalid payload")
return
}
for _, item := range c.Items {
if item.Sku == sku && update.StockLocationID == *item.StoreId {
// Update stock status based on payload, e.g., if payload is bool available
log.Printf("cart grain got item stock update %d", update.Value)
item.Stock = uint16(update.Value)
break
}
}
}
}
func (c *CartGrain) UpdateSubscriptions(pubsub *actor.PubSub) {
pubsub.UnsubscribeAll(c.GetId())
skuSet := make(map[string]bool)
for _, item := range c.Items { for _, item := range c.Items {
skuSet[item.Sku] = true if item.Sku == change.SKU && change.StockLocationID == *item.StoreId {
} item.Stock = uint16(change.Value)
for sku := range skuSet { break
pubsub.Subscribe("inventory:"+sku, c.GetId()) }
} }
} }