Compare commits
2 Commits
82650bd3dd
...
d5d2b3e711
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d5d2b3e711 | ||
|
|
f640cd7d2c |
@@ -80,6 +80,8 @@ func main() {
|
||||
|
||||
log.Printf("loaded %d promotions", len(promotionData.State.Promotions))
|
||||
|
||||
inventoryPubSub := actor.NewPubSub[inventory.InventoryChange]()
|
||||
|
||||
promotionService := promotions.NewPromotionService(nil)
|
||||
|
||||
reg := cart.NewCartMultationRegistry()
|
||||
@@ -109,11 +111,20 @@ func main() {
|
||||
grainSpawns.Inc()
|
||||
ret := cart.NewCartGrain(id, time.Now())
|
||||
// Set baseline lastChange at spawn; replay may update it to last event timestamp.
|
||||
|
||||
inventoryPubSub.Subscribe(ret.HandleInventoryChange)
|
||||
err := diskStorage.LoadEvents(ctx, id, ret)
|
||||
|
||||
return ret, err
|
||||
},
|
||||
Destroy: func(grain actor.Grain[cart.CartGrain]) error {
|
||||
cart, err := grain.GetCurrentState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
inventoryPubSub.Unsubscribe(cart.HandleInventoryChange)
|
||||
|
||||
return nil
|
||||
},
|
||||
SpawnHost: func(host string) (actor.Host, error) {
|
||||
return proxy.NewRemoteHost(host)
|
||||
},
|
||||
@@ -127,13 +138,6 @@ func main() {
|
||||
log.Fatalf("Error creating cart pool: %v\n", err)
|
||||
}
|
||||
|
||||
pool.SetPubSub(actor.NewPubSub(func(id uint64, event actor.Event) {
|
||||
grain, _ := pool.Get(context.Background(), id)
|
||||
if sub, ok := any(grain).(actor.Subscribable); ok {
|
||||
sub.Notify(event)
|
||||
}
|
||||
}))
|
||||
|
||||
klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD"))
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: redisAddress,
|
||||
@@ -245,6 +249,15 @@ func main() {
|
||||
srvErr <- srv.ListenAndServe()
|
||||
}()
|
||||
|
||||
listener := inventory.NewInventoryChangeListener(rdb, context.Background(), func(changes []inventory.InventoryChange) {
|
||||
for _, change := range changes {
|
||||
log.Printf("inventory change: %v", change)
|
||||
inventoryPubSub.Publish(change)
|
||||
}
|
||||
})
|
||||
|
||||
go listener.Start()
|
||||
|
||||
log.Print("Server started at port 8080")
|
||||
|
||||
go http.ListenAndServe(":8081", debugMux)
|
||||
|
||||
@@ -43,10 +43,9 @@ var (
|
||||
|
||||
type PoolServer struct {
|
||||
actor.GrainPool[*cart.CartGrain]
|
||||
pod_name string
|
||||
klarnaClient *KlarnaClient
|
||||
inventoryService inventory.InventoryService
|
||||
inventoryListener *inventory.InventoryChangeListener
|
||||
pod_name string
|
||||
klarnaClient *KlarnaClient
|
||||
inventoryService inventory.InventoryService
|
||||
}
|
||||
|
||||
func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient, inventoryService inventory.InventoryService, inventoryRedisClient *redis.Client) *PoolServer {
|
||||
@@ -56,16 +55,7 @@ func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarn
|
||||
klarnaClient: klarnaClient,
|
||||
inventoryService: inventoryService,
|
||||
}
|
||||
listener := inventory.NewInventoryChangeListener(inventoryRedisClient, context.Background(), func(changes []inventory.InventoryChange) {
|
||||
for _, change := range changes {
|
||||
srv.GrainPool.GetPubSub().Publish(actor.Event{
|
||||
Topic: fmt.Sprintf("inventory:%s", change.SKU),
|
||||
Payload: change,
|
||||
})
|
||||
}
|
||||
})
|
||||
srv.inventoryListener = listener
|
||||
go listener.Start()
|
||||
|
||||
return srv
|
||||
}
|
||||
|
||||
|
||||
@@ -26,7 +26,6 @@ type GrainPool[V any] interface {
|
||||
AddRemoteHost(host string)
|
||||
IsHealthy() bool
|
||||
IsKnown(string) bool
|
||||
GetPubSub() *PubSub
|
||||
Close()
|
||||
}
|
||||
|
||||
|
||||
@@ -1,93 +1,79 @@
|
||||
package actor
|
||||
|
||||
import (
|
||||
"iter"
|
||||
"log"
|
||||
"slices"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Event represents an event to be published.
|
||||
type Event struct {
|
||||
Topic string
|
||||
Payload interface{}
|
||||
}
|
||||
type ReceiverFunc[V any] func(event V)
|
||||
|
||||
// NotifyFunc is a function to notify a grain of an event.
|
||||
type NotifyFunc func(grainID uint64, event Event)
|
||||
|
||||
// Subscribable is an interface for grains that can receive notifications.
|
||||
type Subscribable interface {
|
||||
Notify(event Event)
|
||||
UpdateSubscriptions(pubsub *PubSub)
|
||||
}
|
||||
|
||||
// PubSub manages subscriptions for grains to topics.
|
||||
// Topics are strings, e.g., "sku:12345"
|
||||
// Subscribers are grain IDs (uint64)
|
||||
type PubSub struct {
|
||||
subscribers map[string][]uint64
|
||||
type PubSub[V any] struct {
|
||||
subscribers []*ReceiverFunc[V]
|
||||
mu sync.RWMutex
|
||||
notify NotifyFunc
|
||||
}
|
||||
|
||||
// NewPubSub creates a new PubSub instance.
|
||||
func NewPubSub(notify NotifyFunc) *PubSub {
|
||||
return &PubSub{
|
||||
subscribers: make(map[string][]uint64),
|
||||
notify: notify,
|
||||
func NewPubSub[V any]() *PubSub[V] {
|
||||
return &PubSub[V]{
|
||||
subscribers: make([]*ReceiverFunc[V], 0),
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe adds a grain ID to the subscribers of a topic.
|
||||
func (p *PubSub) Subscribe(topic string, grainID uint64) {
|
||||
func (p *PubSub[V]) Subscribe(receiver ReceiverFunc[V]) {
|
||||
if receiver == nil {
|
||||
return
|
||||
}
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.subscribers[topic] = append(p.subscribers[topic], grainID)
|
||||
log.Printf("adding subscriber")
|
||||
p.subscribers = append(p.subscribers, &receiver)
|
||||
}
|
||||
|
||||
// Unsubscribe removes a grain ID from the subscribers of a topic.
|
||||
func (p *PubSub) Unsubscribe(topic string, grainID uint64) {
|
||||
func (p *PubSub[V]) Unsubscribe(receiver ReceiverFunc[V]) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
list := p.subscribers[topic]
|
||||
for i, id := range list {
|
||||
if id == grainID {
|
||||
p.subscribers[topic] = append(list[:i], list[i+1:]...)
|
||||
list := p.subscribers
|
||||
prt := &receiver
|
||||
for i, sub := range list {
|
||||
if sub == nil {
|
||||
continue
|
||||
}
|
||||
if sub == prt {
|
||||
log.Printf("removing subscriber")
|
||||
p.subscribers = append(list[:i], list[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
p.subscribers = slices.DeleteFunc(p.subscribers, func(fn *ReceiverFunc[V]) bool {
|
||||
return fn == nil
|
||||
})
|
||||
// If list is empty, could delete, but not necessary
|
||||
}
|
||||
|
||||
// UnsubscribeAll removes the grain ID from all topics.
|
||||
func (p *PubSub) UnsubscribeAll(grainID uint64) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
for topic, list := range p.subscribers {
|
||||
newList := make([]uint64, 0, len(list))
|
||||
for _, id := range list {
|
||||
if id != grainID {
|
||||
newList = append(newList, id)
|
||||
}
|
||||
}
|
||||
if len(newList) == 0 {
|
||||
delete(p.subscribers, topic)
|
||||
} else {
|
||||
p.subscribers[topic] = newList
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetSubscribers returns a copy of the subscriber IDs for a topic.
|
||||
func (p *PubSub) GetSubscribers(topic string) []uint64 {
|
||||
func (p *PubSub[V]) GetSubscribers() iter.Seq[ReceiverFunc[V]] {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
list := p.subscribers[topic]
|
||||
return append([]uint64(nil), list...)
|
||||
|
||||
return func(yield func(ReceiverFunc[V]) bool) {
|
||||
for _, sub := range p.subscribers {
|
||||
if sub == nil {
|
||||
continue
|
||||
}
|
||||
if !yield(*sub) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Publish sends an event to all subscribers of the topic.
|
||||
func (p *PubSub) Publish(event Event) {
|
||||
subs := p.GetSubscribers(event.Topic)
|
||||
for _, id := range subs {
|
||||
p.notify(id, event)
|
||||
func (p *PubSub[V]) Publish(event V) {
|
||||
for notify := range p.GetSubscribers() {
|
||||
notify(event)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,12 +17,12 @@ type SimpleGrainPool[V any] struct {
|
||||
grains map[uint64]Grain[V]
|
||||
mutationRegistry MutationRegistry
|
||||
spawn func(ctx context.Context, id uint64) (Grain[V], error)
|
||||
destroy func(grain Grain[V]) error
|
||||
spawnHost func(host string) (Host, error)
|
||||
listeners []LogListener
|
||||
storage LogStorage[V]
|
||||
ttl time.Duration
|
||||
poolSize int
|
||||
pubsub *PubSub
|
||||
|
||||
// Cluster coordination --------------------------------------------------
|
||||
hostname string
|
||||
@@ -39,6 +39,7 @@ type GrainPoolConfig[V any] struct {
|
||||
Hostname string
|
||||
Spawn func(ctx context.Context, id uint64) (Grain[V], error)
|
||||
SpawnHost func(host string) (Host, error)
|
||||
Destroy func(grain Grain[V]) error
|
||||
TTL time.Duration
|
||||
PoolSize int
|
||||
MutationRegistry MutationRegistry
|
||||
@@ -52,6 +53,7 @@ func NewSimpleGrainPool[V any](config GrainPoolConfig[V]) (*SimpleGrainPool[V],
|
||||
storage: config.Storage,
|
||||
spawn: config.Spawn,
|
||||
spawnHost: config.SpawnHost,
|
||||
destroy: config.Destroy,
|
||||
ttl: config.TTL,
|
||||
poolSize: config.PoolSize,
|
||||
hostname: config.Hostname,
|
||||
@@ -89,9 +91,10 @@ func (p *SimpleGrainPool[V]) purge() {
|
||||
for id, grain := range p.grains {
|
||||
if grain.GetLastAccess().Before(purgeLimit) {
|
||||
purgedIds = append(purgedIds, id)
|
||||
if p.pubsub != nil {
|
||||
p.pubsub.UnsubscribeAll(id)
|
||||
if err := p.destroy(grain); err != nil {
|
||||
log.Printf("failed to destroy grain %d: %v", id, err)
|
||||
}
|
||||
|
||||
delete(p.grains, id)
|
||||
}
|
||||
}
|
||||
@@ -417,11 +420,7 @@ func (p *SimpleGrainPool[V]) Apply(ctx context.Context, id uint64, mutation ...p
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if p.pubsub != nil {
|
||||
if sub, ok := any(grain).(Subscribable); ok {
|
||||
sub.UpdateSubscriptions(p.pubsub)
|
||||
}
|
||||
}
|
||||
|
||||
return &MutationResult[*V]{
|
||||
Result: result,
|
||||
Mutations: mutations,
|
||||
@@ -450,21 +449,6 @@ func (p *SimpleGrainPool[V]) Hostname() string {
|
||||
return p.hostname
|
||||
}
|
||||
|
||||
// GetPubSub returns the pubsub instance.
|
||||
func (p *SimpleGrainPool[V]) GetPubSub() *PubSub {
|
||||
return p.pubsub
|
||||
}
|
||||
|
||||
func (p *SimpleGrainPool[V]) SetPubSub(pubsub *PubSub) {
|
||||
p.pubsub = pubsub
|
||||
}
|
||||
|
||||
func (p *SimpleGrainPool[V]) Publish(event Event) {
|
||||
if p.pubsub != nil {
|
||||
p.pubsub.Publish(event)
|
||||
}
|
||||
}
|
||||
|
||||
// Close notifies remotes that this host is shutting down.
|
||||
func (p *SimpleGrainPool[V]) Close() {
|
||||
|
||||
|
||||
@@ -2,13 +2,10 @@ package cart
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.k6n.net/go-cart-actor/pkg/actor"
|
||||
messages "git.k6n.net/go-cart-actor/pkg/messages"
|
||||
"git.k6n.net/go-cart-actor/pkg/voucher"
|
||||
"github.com/matst80/go-redis-inventory/pkg/inventory"
|
||||
@@ -230,43 +227,12 @@ func (c *CartGrain) GetCurrentState() (*CartGrain, error) {
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Notify handles incoming events, e.g., inventory changes.
|
||||
func (c *CartGrain) Notify(event actor.Event) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
// Example: if event is inventory change for a SKU in the cart
|
||||
if strings.HasPrefix(event.Topic, "inventory:") {
|
||||
sku := strings.TrimPrefix(event.Topic, "inventory:")
|
||||
|
||||
log.Printf("cart grain got inventory update: %s", event.Topic)
|
||||
update, ok := event.Payload.(inventory.InventoryChange)
|
||||
if !ok {
|
||||
log.Printf("cart grain inventory update has invalid payload")
|
||||
return
|
||||
}
|
||||
for _, item := range c.Items {
|
||||
|
||||
if item.Sku == sku && update.StockLocationID == *item.StoreId {
|
||||
// Update stock status based on payload, e.g., if payload is bool available
|
||||
|
||||
log.Printf("cart grain got item stock update %d", update.Value)
|
||||
|
||||
item.Stock = uint16(update.Value)
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CartGrain) UpdateSubscriptions(pubsub *actor.PubSub) {
|
||||
pubsub.UnsubscribeAll(c.GetId())
|
||||
skuSet := make(map[string]bool)
|
||||
func (c *CartGrain) HandleInventoryChange(change inventory.InventoryChange) {
|
||||
for _, item := range c.Items {
|
||||
skuSet[item.Sku] = true
|
||||
}
|
||||
for sku := range skuSet {
|
||||
pubsub.Subscribe("inventory:"+sku, c.GetId())
|
||||
if item.Sku == change.SKU && change.StockLocationID == *item.StoreId {
|
||||
item.Stock = uint16(change.Value)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -507,10 +507,10 @@ var file_control_plane_proto_rawDesc = string([]byte{
|
||||
0x69, 0x6e, 0x67, 0x12, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x43,
|
||||
0x6c, 0x6f, 0x73, 0x69, 0x6e, 0x67, 0x4e, 0x6f, 0x74, 0x69, 0x63, 0x65, 0x1a, 0x18, 0x2e, 0x6d,
|
||||
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61,
|
||||
0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x2e, 0x74, 0x6f,
|
||||
0x72, 0x6e, 0x62, 0x65, 0x72, 0x67, 0x2e, 0x6d, 0x65, 0x2f, 0x67, 0x6f, 0x2d, 0x63, 0x61, 0x72,
|
||||
0x74, 0x2d, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x6d, 0x65,
|
||||
0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x2e, 0x6b, 0x36,
|
||||
0x6e, 0x2e, 0x6e, 0x65, 0x74, 0x2f, 0x67, 0x6f, 0x2d, 0x63, 0x61, 0x72, 0x74, 0x2d, 0x61, 0x63,
|
||||
0x74, 0x6f, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
|
||||
0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
})
|
||||
|
||||
var (
|
||||
|
||||
@@ -1911,11 +1911,10 @@ var file_messages_proto_rawDesc = string([]byte{
|
||||
0x2e, 0x47, 0x69, 0x66, 0x74, 0x63, 0x61, 0x72, 0x64, 0x49, 0x74, 0x65, 0x6d, 0x52, 0x08, 0x67,
|
||||
0x69, 0x66, 0x74, 0x63, 0x61, 0x72, 0x64, 0x22, 0x20, 0x0a, 0x0e, 0x52, 0x65, 0x6d, 0x6f, 0x76,
|
||||
0x65, 0x47, 0x69, 0x66, 0x74, 0x63, 0x61, 0x72, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18,
|
||||
0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x74,
|
||||
0x2e, 0x74, 0x6f, 0x72, 0x6e, 0x62, 0x65, 0x72, 0x67, 0x2e, 0x6d, 0x65, 0x2f, 0x67, 0x6f, 0x2d,
|
||||
0x63, 0x61, 0x72, 0x74, 0x2d, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x3b, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x33,
|
||||
0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74,
|
||||
0x2e, 0x6b, 0x36, 0x6e, 0x2e, 0x6e, 0x65, 0x74, 0x2f, 0x67, 0x6f, 0x2d, 0x63, 0x61, 0x72, 0x74,
|
||||
0x2d, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x6d, 0x65, 0x73,
|
||||
0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
})
|
||||
|
||||
var (
|
||||
|
||||
Reference in New Issue
Block a user