This commit is contained in:
matst80
2025-10-13 15:39:41 +02:00
parent 6094da99f3
commit 9fc3871e84
26 changed files with 927 additions and 848 deletions

View File

@@ -100,7 +100,7 @@ func getInt(data float64, ok bool) (int, error) {
return int(data), nil return int(data), nil
} }
func getItemData(sku string, qty int, country string) (*messages.AddItem, error) { func GetItemAddMessage(sku string, qty int, country string, storeId *string) (*messages.AddItem, error) {
item, err := FetchItem(sku, country) item, err := FetchItem(sku, country)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -162,17 +162,18 @@ func getItemData(sku string, qty int, country string) (*messages.AddItem, error)
Disclaimer: item.Disclaimer, Disclaimer: item.Disclaimer,
Country: country, Country: country,
Outlet: outlet, Outlet: outlet,
StoreId: storeId,
}, nil }, nil
} }
func (c *CartGrain) AddItem(sku string, qty int, country string, storeId *string) (*CartGrain, error) { // func (c *CartGrain) AddItem(sku string, qty int, country string, storeId *string) (*CartGrain, error) {
cartItem, err := getItemData(sku, qty, country) // cartItem, err := getItemData(sku, qty, country)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
cartItem.StoreId = storeId // cartItem.StoreId = storeId
return c.Apply(cartItem, false) // return c.Apply(cartItem, false)
} // }
func (c *CartGrain) GetState() ([]byte, error) { func (c *CartGrain) GetState() ([]byte, error) {
return json.Marshal(c) return json.Marshal(c)
@@ -221,25 +222,25 @@ func GetTaxAmount(total int64, tax int) int64 {
return int64(float64(total) / float64((1 + taxD))) return int64(float64(total) / float64((1 + taxD)))
} }
func (c *CartGrain) Apply(content interface{}, isReplay bool) (*CartGrain, error) { // func (c *CartGrain) Apply(content proto.Message, isReplay bool) (*CartGrain, error) {
updated, err := ApplyRegistered(c, content) // updated, err := ApplyRegistered(c, content)
if err != nil { // if err != nil {
if err == ErrMutationNotRegistered { // if err == ErrMutationNotRegistered {
return nil, fmt.Errorf("unsupported mutation type %T (not registered)", content) // return nil, fmt.Errorf("unsupported mutation type %T (not registered)", content)
} // }
return nil, err // return nil, err
} // }
// Sliding TTL: update lastChange only for non-replay successful mutations. // // Sliding TTL: update lastChange only for non-replay successful mutations.
if updated != nil && !isReplay { // if updated != nil && !isReplay {
c.lastChange = time.Now() // c.lastChange = time.Now()
c.lastAccess = time.Now() // c.lastAccess = time.Now()
go AppendCartEvent(c.Id, content) // go AppendCartEvent(c.Id, content)
} // }
return updated, nil // return updated, nil
} // }
func (c *CartGrain) UpdateTotals() { func (c *CartGrain) UpdateTotals() {
c.TotalPrice = 0 c.TotalPrice = 0

View File

@@ -1,73 +0,0 @@
package main
import (
"encoding/gob"
"time"
)
func init() {
gob.Register(map[uint64]int64{})
}
type DiskStorage struct {
stateFile string
lastSave time.Time
LastSaves map[uint64]time.Time
}
func NewDiskStorage(stateFile string) (*DiskStorage, error) {
ret := &DiskStorage{
stateFile: stateFile,
LastSaves: make(map[uint64]time.Time),
}
//err := ret.loadState()
return ret, nil
}
// func saveMessages(_ interface{}, _ CartId) error {
// // No-op: legacy event log persistence removed in oneof refactor.
// return nil
// }
// func getCartPath(id string) string {
// return fmt.Sprintf("data/%s.prot", id)
// }
// func loadMessages(_ Grain, _ CartId) error {
// // No-op: legacy replay removed in oneof refactor.
// return nil
// }
// func (s *DiskStorage) saveState() error {
// tmpFile := s.stateFile + "_tmp"
// file, err := os.Create(tmpFile)
// if err != nil {
// return err
// }
// defer file.Close()
// err = gob.NewEncoder(file).Encode(s.LastSaves)
// if err != nil {
// return err
// }
// os.Remove(s.stateFile + ".bak")
// os.Rename(s.stateFile, s.stateFile+".bak")
// return os.Rename(tmpFile, s.stateFile)
// }
// func (s *DiskStorage) loadState() error {
// file, err := os.Open(s.stateFile)
// if err != nil {
// return err
// }
// defer file.Close()
// return gob.NewDecoder(file).Decode(&s.LastSaves)
// }
func (s *DiskStorage) Store(id CartId, _ *CartGrain) error {
// With the removal of the legacy message log, we only update the timestamp.
ts := time.Now()
s.LastSaves[uint64(id)] = ts
s.lastSave = ts
return nil
}

View File

@@ -5,12 +5,14 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"log"
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
"sync" "sync"
"time" "time"
"git.tornberg.me/go-cart-actor/pkg/actor"
messages "git.tornberg.me/go-cart-actor/pkg/messages" messages "git.tornberg.me/go-cart-actor/pkg/messages"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
@@ -205,7 +207,7 @@ func AppendCartEvent(id CartId, mutation interface{}) error {
// ReplayCartEvents replays an existing cart's event log into the provided grain. // ReplayCartEvents replays an existing cart's event log into the provided grain.
// It applies mutation payloads in order, skipping unknown types. // It applies mutation payloads in order, skipping unknown types.
func ReplayCartEvents(grain *CartGrain, id CartId) error { func ReplayCartEvents(grain *CartGrain, id CartId, registry actor.MutationRegistry) error {
start := time.Now() start := time.Now()
path := EventLogPath(id) path := EventLogPath(id)
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
@@ -237,18 +239,19 @@ func ReplayCartEvents(grain *CartGrain, id CartId) error {
eventReplayFailuresTotal.Inc() eventReplayFailuresTotal.Inc()
continue // skip malformed line continue // skip malformed line
} }
factory, ok := eventTypeFactories[raw.Type]
instance, ok := registry.Create(raw.Type)
if !ok { if !ok {
eventUnknownTypesTotal.Inc() log.Printf("loading failed for unknown mutation type: %s", raw.Type)
continue // skip unknown mutation type eventReplayFailuresTotal.Inc()
continue // skip unknown type
} }
instance := factory()
if err := json.Unmarshal(raw.Payload, instance); err != nil { if err := json.Unmarshal(raw.Payload, instance); err != nil {
eventMutationErrorsTotal.Inc() eventMutationErrorsTotal.Inc()
continue continue
} }
// Apply mutation directly using internal registration (bypass AppendCartEvent recursion). // Apply mutation directly using internal registration (bypass AppendCartEvent recursion).
if _, applyErr := ApplyRegistered(grain, instance); applyErr != nil { if applyErr := registry.Apply(grain, instance); applyErr != nil {
eventMutationErrorsTotal.Inc() eventMutationErrorsTotal.Inc()
continue continue
} else { } else {

View File

@@ -39,34 +39,12 @@ var (
}) })
) )
func spawn(id uint64) (actor.Grain[CartGrain], error) {
grainSpawns.Inc()
ret := &CartGrain{
lastItemId: 0,
lastDeliveryId: 0,
Deliveries: []*CartDelivery{},
Id: CartId(id),
Items: []*CartItem{},
TotalPrice: 0,
}
// Set baseline lastChange at spawn; replay may update it to last event timestamp.
ret.lastChange = time.Now()
ret.lastAccess = time.Now()
// Legacy loadMessages (no-op) retained; then replay append-only event log
//_ = loadMessages(ret, id)
err := ReplayCartEvents(ret, CartId(id))
return ret, err
}
func init() { func init() {
os.Mkdir("data", 0755) os.Mkdir("data", 0755)
} }
type App struct { type App struct {
pool *actor.SimpleGrainPool[CartGrain] pool *actor.SimpleGrainPool[CartGrain]
storage *DiskStorage
} }
var podIp = os.Getenv("POD_IP") var podIp = os.Getenv("POD_IP")
@@ -91,7 +69,10 @@ func getCountryFromHost(host string) string {
if strings.Contains(strings.ToLower(host), "-no") { if strings.Contains(strings.ToLower(host), "-no") {
return "no" return "no"
} }
return "se" if strings.Contains(strings.ToLower(host), "-se") {
return "se"
}
return ""
} }
func GetDiscovery() discovery.Discovery { func GetDiscovery() discovery.Discovery {
@@ -112,21 +93,71 @@ func GetDiscovery() discovery.Discovery {
} }
func main() { func main() {
controlPlaneConfig := actor.DefaultServerConfig() controlPlaneConfig := actor.DefaultServerConfig()
storage, err := NewDiskStorage(fmt.Sprintf("data/s_%s.gob", name))
if err != nil { reg := actor.NewMutationRegistry()
log.Printf("Error loading state: %v\n", err) reg.RegisterMutations(
actor.NewMutation(AddItem, func() *messages.AddItem {
return &messages.AddItem{}
}),
actor.NewMutation(ChangeQuantity, func() *messages.ChangeQuantity {
return &messages.ChangeQuantity{}
}),
actor.NewMutation(RemoveItem, func() *messages.RemoveItem {
return &messages.RemoveItem{}
}),
actor.NewMutation(InitializeCheckout, func() *messages.InitializeCheckout {
return &messages.InitializeCheckout{}
}),
actor.NewMutation(RemoveDelivery, func() *messages.RemoveDelivery {
return &messages.RemoveDelivery{}
}),
actor.NewMutation(SetDelivery, func() *messages.SetDelivery {
return &messages.SetDelivery{}
}),
actor.NewMutation(SetPickupPoint, func() *messages.SetPickupPoint {
return &messages.SetPickupPoint{}
}),
)
diskStorage := actor.NewDiskStorage[CartGrain]("data", reg)
poolConfig := actor.GrainPoolConfig[CartGrain]{
MutationRegistry: reg,
Storage: diskStorage,
Spawn: func(id uint64) (actor.Grain[CartGrain], error) {
grainSpawns.Inc()
ret := &CartGrain{
lastItemId: 0,
lastDeliveryId: 0,
Deliveries: []*CartDelivery{},
Id: CartId(id),
Items: []*CartItem{},
TotalPrice: 0,
}
// Set baseline lastChange at spawn; replay may update it to last event timestamp.
ret.lastChange = time.Now()
ret.lastAccess = time.Now()
// Legacy loadMessages (no-op) retained; then replay append-only event log
//_ = loadMessages(ret, id)
err := diskStorage.LoadEvents(id, ret)
return ret, err
},
SpawnHost: func(host string) (actor.Host, error) {
return proxy.NewRemoteHost(host)
},
TTL: 15 * time.Minute,
PoolSize: 2 * 65535,
Hostname: podIp,
} }
pool, err := actor.NewSimpleGrainPool(2*65535, 15*time.Minute, podIp, spawn, func(host string) (actor.Host, error) { pool, err := actor.NewSimpleGrainPool(poolConfig)
return proxy.NewRemoteHost(host)
})
if err != nil { if err != nil {
log.Fatalf("Error creating cart pool: %v\n", err) log.Fatalf("Error creating cart pool: %v\n", err)
} }
app := &App{ app := &App{
pool: pool, pool: pool,
storage: storage,
} }
grpcSrv, err := actor.NewControlServer[*CartGrain](controlPlaneConfig, pool) grpcSrv, err := actor.NewControlServer[*CartGrain](controlPlaneConfig, pool)

View File

@@ -20,63 +20,57 @@ import (
// NOTE: Any future field additions in messages.AddItem that affect pricing / tax // NOTE: Any future field additions in messages.AddItem that affect pricing / tax
// must keep this handler in sync. // must keep this handler in sync.
func init() { func AddItem(g *CartGrain, m *messages.AddItem) error {
RegisterMutation[messages.AddItem]( if m == nil {
"AddItem", return fmt.Errorf("AddItem: nil payload")
func(g *CartGrain, m *messages.AddItem) error { }
if m == nil { if m.Quantity < 1 {
return fmt.Errorf("AddItem: nil payload") return fmt.Errorf("AddItem: invalid quantity %d", m.Quantity)
} }
if m.Quantity < 1 {
return fmt.Errorf("AddItem: invalid quantity %d", m.Quantity)
}
// Fast path: merge with existing item having same SKU // Fast path: merge with existing item having same SKU
if existing, found := g.FindItemWithSku(m.Sku); found { if existing, found := g.FindItemWithSku(m.Sku); found {
existing.Quantity += int(m.Quantity) existing.Quantity += int(m.Quantity)
return nil return nil
} }
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
g.lastItemId++ g.lastItemId++
taxRate := 2500 taxRate := 2500
if m.Tax > 0 { if m.Tax > 0 {
taxRate = int(m.Tax) taxRate = int(m.Tax)
} }
taxAmountPerUnit := GetTaxAmount(m.Price, taxRate) taxAmountPerUnit := GetTaxAmount(m.Price, taxRate)
g.Items = append(g.Items, &CartItem{ g.Items = append(g.Items, &CartItem{
Id: g.lastItemId, Id: g.lastItemId,
ItemId: int(m.ItemId), ItemId: int(m.ItemId),
Quantity: int(m.Quantity), Quantity: int(m.Quantity),
Sku: m.Sku, Sku: m.Sku,
Name: m.Name, Name: m.Name,
Price: m.Price, Price: m.Price,
TotalPrice: m.Price * int64(m.Quantity), TotalPrice: m.Price * int64(m.Quantity),
TotalTax: int64(taxAmountPerUnit * int64(m.Quantity)), TotalTax: int64(taxAmountPerUnit * int64(m.Quantity)),
Image: m.Image, Image: m.Image,
Stock: StockStatus(m.Stock), Stock: StockStatus(m.Stock),
Disclaimer: m.Disclaimer, Disclaimer: m.Disclaimer,
Brand: m.Brand, Brand: m.Brand,
Category: m.Category, Category: m.Category,
Category2: m.Category2, Category2: m.Category2,
Category3: m.Category3, Category3: m.Category3,
Category4: m.Category4, Category4: m.Category4,
Category5: m.Category5, Category5: m.Category5,
OrgPrice: m.OrgPrice, OrgPrice: m.OrgPrice,
ArticleType: m.ArticleType, ArticleType: m.ArticleType,
Outlet: m.Outlet, Outlet: m.Outlet,
SellerId: m.SellerId, SellerId: m.SellerId,
SellerName: m.SellerName, SellerName: m.SellerName,
Tax: int(taxAmountPerUnit), Tax: int(taxAmountPerUnit),
TaxRate: taxRate, TaxRate: taxRate,
StoreId: m.StoreId, StoreId: m.StoreId,
}) })
return nil return nil
},
WithTotals(), // Recalculate totals after successful mutation
)
} }

View File

@@ -1,11 +1,5 @@
package main package main
import (
"fmt"
messages "git.tornberg.me/go-cart-actor/pkg/messages"
)
// mutation_add_request.go // mutation_add_request.go
// //
// Registers the AddRequest mutation. This mutation is a higher-level intent // Registers the AddRequest mutation. This mutation is a higher-level intent
@@ -30,32 +24,28 @@ import (
// - Stock validation before increasing quantity // - Stock validation before increasing quantity
// - Reservation logic or concurrency guards around stock updates // - Reservation logic or concurrency guards around stock updates
// - Coupon / pricing rules applied conditionally during add-by-sku // - Coupon / pricing rules applied conditionally during add-by-sku
func init() {
RegisterMutation[messages.AddRequest](
"AddRequest",
func(g *CartGrain, m *messages.AddRequest) error {
if m == nil {
return fmt.Errorf("AddRequest: nil payload")
}
if m.Sku == "" {
return fmt.Errorf("AddRequest: sku is empty")
}
if m.Quantity < 1 {
return fmt.Errorf("AddRequest: invalid quantity %d", m.Quantity)
}
// Existing line: accumulate quantity only. // func HandleAddRequest(g *CartGrain, m *messages.AddRequest) error {
if existing, found := g.FindItemWithSku(m.Sku); found { // if m == nil {
existing.Quantity += int(m.Quantity) // return fmt.Errorf("AddRequest: nil payload")
return nil // }
} // if m.Sku == "" {
// return fmt.Errorf("AddRequest: sku is empty")
// }
// if m.Quantity < 1 {
// return fmt.Errorf("AddRequest: invalid quantity %d", m.Quantity)
// }
// New line: delegate to higher-level AddItem flow (product lookup). // // Existing line: accumulate quantity only.
// We intentionally ignore the returned *CartGrain; registry will // if existing, found := g.FindItemWithSku(m.Sku); found {
// do totals again after this handler returns (harmless). // existing.Quantity += int(m.Quantity)
_, err := g.AddItem(m.Sku, int(m.Quantity), m.Country, m.StoreId) // return nil
return err // }
}, // data, err := GetItemAddMessage(m.Sku, int(m.Quantity), m.Country, m.StoreId)
WithTotals(), // if err != nil {
) // return err
} // }
// return AddItem(g, data)
// return err
// }

View File

@@ -25,34 +25,29 @@ import (
// the grain's implicit expectation that higher layers control access. // the grain's implicit expectation that higher layers control access.
// (If strict locking is required around every mutation, wrap logic in // (If strict locking is required around every mutation, wrap logic in
// an explicit g.mu.Lock()/Unlock(), but current model mirrors prior code.) // an explicit g.mu.Lock()/Unlock(), but current model mirrors prior code.)
func init() {
RegisterMutation[messages.ChangeQuantity](
"ChangeQuantity",
func(g *CartGrain, m *messages.ChangeQuantity) error {
if m == nil {
return fmt.Errorf("ChangeQuantity: nil payload")
}
foundIndex := -1 func ChangeQuantity(g *CartGrain, m *messages.ChangeQuantity) error {
for i, it := range g.Items { if m == nil {
if it.Id == int(m.Id) { return fmt.Errorf("ChangeQuantity: nil payload")
foundIndex = i }
break
}
}
if foundIndex == -1 {
return fmt.Errorf("ChangeQuantity: item id %d not found", m.Id)
}
if m.Quantity <= 0 { foundIndex := -1
// Remove the item for i, it := range g.Items {
g.Items = append(g.Items[:foundIndex], g.Items[foundIndex+1:]...) if it.Id == int(m.Id) {
return nil foundIndex = i
} break
}
}
if foundIndex == -1 {
return fmt.Errorf("ChangeQuantity: item id %d not found", m.Id)
}
g.Items[foundIndex].Quantity = int(m.Quantity) if m.Quantity <= 0 {
return nil // Remove the item
}, g.Items = append(g.Items[:foundIndex], g.Items[foundIndex+1:]...)
WithTotals(), return nil
) }
g.Items[foundIndex].Quantity = int(m.Quantity)
return nil
} }

View File

@@ -28,22 +28,17 @@ import (
// parallel checkout attempts are possible, add higher-level guards // parallel checkout attempts are possible, add higher-level guards
// (e.g. reject if PaymentInProgress already true unless reusing // (e.g. reject if PaymentInProgress already true unless reusing
// the same OrderReference). // the same OrderReference).
func init() {
RegisterMutation[messages.InitializeCheckout](
"InitializeCheckout",
func(g *CartGrain, m *messages.InitializeCheckout) error {
if m == nil {
return fmt.Errorf("InitializeCheckout: nil payload")
}
if m.OrderId == "" {
return fmt.Errorf("InitializeCheckout: missing orderId")
}
g.OrderReference = m.OrderId func InitializeCheckout(g *CartGrain, m *messages.InitializeCheckout) error {
g.PaymentStatus = m.Status if m == nil {
g.PaymentInProgress = m.PaymentInProgress return fmt.Errorf("InitializeCheckout: nil payload")
return nil }
}, if m.OrderId == "" {
// No WithTotals(): monetary aggregates are unaffected. return fmt.Errorf("InitializeCheckout: missing orderId")
) }
g.OrderReference = m.OrderId
g.PaymentStatus = m.Status
g.PaymentInProgress = m.PaymentInProgress
return nil
} }

View File

@@ -32,22 +32,17 @@ import (
// - Relies on the higher-level guarantee that Apply() calls are serialized // - Relies on the higher-level guarantee that Apply() calls are serialized
// per grain. If out-of-order events are possible, embed versioning or // per grain. If out-of-order events are possible, embed versioning or
// timestamps in the mutation and compare before applying changes. // timestamps in the mutation and compare before applying changes.
func init() {
RegisterMutation[messages.OrderCreated](
"OrderCreated",
func(g *CartGrain, m *messages.OrderCreated) error {
if m == nil {
return fmt.Errorf("OrderCreated: nil payload")
}
if m.OrderId == "" {
return fmt.Errorf("OrderCreated: missing orderId")
}
g.OrderReference = m.OrderId func OrderCreated(g *CartGrain, m *messages.OrderCreated) error {
g.PaymentStatus = m.Status if m == nil {
g.PaymentInProgress = false return fmt.Errorf("OrderCreated: nil payload")
return nil }
}, if m.OrderId == "" {
// No WithTotals(): order completion does not modify pricing or taxes. return fmt.Errorf("OrderCreated: missing orderId")
) }
g.OrderReference = m.OrderId
g.PaymentStatus = m.Status
g.PaymentInProgress = false
return nil
} }

View File

@@ -1,301 +0,0 @@
package main
import (
"fmt"
"reflect"
"sync"
)
// mutation_registry.go
//
// Mutation Registry Infrastructure
// --------------------------------
// This file introduces a generic registry for cart mutations that:
//
// 1. Decouples mutation logic from the large type-switch inside CartGrain.Apply.
// 2. Enforces (at registration time) that every mutation handler has the correct
// signature: func(*CartGrain, *T) error
// 3. Optionally auto-updates cart totals after a mutation if flagged.
// 4. Provides a single authoritative list of registered mutations for
// introspection / coverage testing.
// 5. Allows incremental migration: you can first register new mutations here,
// and later prune the legacy switch cases.
//
// Usage Pattern
// -------------
// // Define your mutation proto message (e.g. messages.ApplyCoupon in messages.proto)
// // Regenerate protobufs.
//
// // In an init() (ideally in a small file like mutations_apply_coupon.go)
// func init() {
// RegisterMutation[*messages.ApplyCoupon](
// "ApplyCoupon",
// func(g *CartGrain, m *messages.ApplyCoupon) error {
// // domain logic ...
// discount := int64(5000)
// if g.TotalPrice < discount {
// discount = g.TotalPrice
// }
// g.TotalDiscount += discount
// g.TotalPrice -= discount
// return nil
// },
// WithTotals(), // we changed price-related fields; recalc totals
// )
// }
//
// // To invoke dynamically (alternative to the current switch):
// if updated, err := ApplyRegistered(grain, incomingMessage); err == nil {
// grain = updated
// } else if errors.Is(err, ErrMutationNotRegistered) {
// // fallback to legacy switch logic
// }
//
// Migration Strategy
// ------------------
// 1. For each existing mutation handled in CartGrain.Apply, add a registry
// registration with equivalent logic.
// 2. Add a test that enumerates all *expected* mutation proto types and asserts
// they are present in RegisteredMutationTypes().
// 3. Once coverage is 100%, replace the switch in CartGrain.Apply with a call
// to ApplyRegistered (and optionally keep a minimal default to produce an
// "unsupported mutation" error).
//
// Thread Safety
// -------------
// Registration is typically done at init() time; a RWMutex provides safety
// should late dynamic registration ever be introduced.
//
// Auto Totals
// -----------
// Many mutations require recomputing totals. To avoid forgetting this, pass
// WithTotals() when registering. This will invoke grain.UpdateTotals() after
// the handler returns successfully.
//
// Error Semantics
// ---------------
// - If a handler returns an error, totals are NOT recalculated (even if
// WithTotals() was specified).
// - ApplyRegistered returns (nil, ErrMutationNotRegistered) if the message type
// is absent.
//
// Extensibility
// -------------
// It is straightforward to add options like audit hooks, metrics wrappers,
// or optimistic concurrency guards by extending MutationOption.
//
// NOTE: Generics require Go 1.18+. If constrained to earlier Go versions,
// replace the generic registration with a non-generic RegisterMutationType
// that accepts reflect.Type and an adapter function.
//
// ---------------------------------------------------------------------------
var (
mutationRegistryMu sync.RWMutex
mutationRegistry = make(map[reflect.Type]*registeredMutation)
// ErrMutationNotRegistered is returned when no handler exists for a given mutation type.
ErrMutationNotRegistered = fmt.Errorf("mutation not registered")
)
// MutationOption configures additional behavior for a registered mutation.
type MutationOption func(*mutationOptions)
// mutationOptions holds flags adjustable per registration.
type mutationOptions struct {
updateTotals bool
}
// WithTotals ensures CartGrain.UpdateTotals() is called after a successful handler.
func WithTotals() MutationOption {
return func(o *mutationOptions) {
o.updateTotals = true
}
}
// registeredMutation stores metadata + the execution closure.
type registeredMutation struct {
name string
handler func(*CartGrain, interface{}) error
updateTotals bool
msgType reflect.Type
}
// RegisterMutation registers a mutation handler for a specific message type T.
//
// Parameters:
//
// name - a human-readable identifier (used for diagnostics / coverage tests).
// handler - business logic operating on the cart grain & strongly typed message.
// options - optional behavior flags (e.g., WithTotals()).
//
// Panics if:
// - name is empty
// - handler is nil
// - duplicate registration for the same message type T
//
// Typical call is placed in an init() function.
func RegisterMutation[T any](name string, handler func(*CartGrain, *T) error, options ...MutationOption) {
if name == "" {
panic("RegisterMutation: name is required")
}
if handler == nil {
panic("RegisterMutation: handler is nil")
}
// Derive the reflect.Type for *T then its Elem (T) for mapping.
var zero *T
rtPtr := reflect.TypeOf(zero)
if rtPtr.Kind() != reflect.Ptr {
panic("RegisterMutation: expected pointer type for generic parameter")
}
rt := rtPtr.Elem()
opts := mutationOptions{}
for _, opt := range options {
opt(&opts)
}
wrapped := func(g *CartGrain, m interface{}) error {
typed, ok := m.(*T)
if !ok {
return fmt.Errorf("mutation type mismatch: have %T want *%s", m, rt.Name())
}
return handler(g, typed)
}
mutationRegistryMu.Lock()
defer mutationRegistryMu.Unlock()
if _, exists := mutationRegistry[rt]; exists {
panic(fmt.Sprintf("RegisterMutation: duplicate registration for type %s", rt.String()))
}
mutationRegistry[rt] = &registeredMutation{
name: name,
handler: wrapped,
updateTotals: opts.updateTotals,
msgType: rt,
}
}
// ApplyRegistered attempts to apply a registered mutation.
// Returns updated grain if successful.
//
// If the mutation is not registered, returns (nil, ErrMutationNotRegistered).
func ApplyRegistered(grain *CartGrain, msg interface{}) (*CartGrain, error) {
if grain == nil {
return nil, fmt.Errorf("nil grain")
}
if msg == nil {
return nil, fmt.Errorf("nil mutation message")
}
rt := indirectType(reflect.TypeOf(msg))
mutationRegistryMu.RLock()
entry, ok := mutationRegistry[rt]
mutationRegistryMu.RUnlock()
if !ok {
return nil, ErrMutationNotRegistered
}
if err := entry.handler(grain, msg); err != nil {
return nil, err
}
if entry.updateTotals {
grain.UpdateTotals()
}
return grain, nil
}
// RegisteredMutations returns metadata for all registered mutations (snapshot).
func RegisteredMutations() []string {
mutationRegistryMu.RLock()
defer mutationRegistryMu.RUnlock()
out := make([]string, 0, len(mutationRegistry))
for _, entry := range mutationRegistry {
out = append(out, entry.name)
}
return out
}
// RegisteredMutationTypes returns the reflect.Type list of all registered messages.
// Useful for coverage tests ensuring expected set matches actual set.
func RegisteredMutationTypes() []reflect.Type {
mutationRegistryMu.RLock()
defer mutationRegistryMu.RUnlock()
out := make([]reflect.Type, 0, len(mutationRegistry))
for t := range mutationRegistry {
out = append(out, t)
}
return out
}
// MustAssertMutationCoverage can be called at startup to ensure every expected
// mutation type has been registered. It panics with a descriptive message if any
// are missing. Provide a slice of prototype pointers (e.g. []*messages.AddItem{nil} ...)
func MustAssertMutationCoverage(expected []interface{}) {
mutationRegistryMu.RLock()
defer mutationRegistryMu.RUnlock()
missing := make([]string, 0)
for _, ex := range expected {
if ex == nil {
continue
}
t := indirectType(reflect.TypeOf(ex))
if _, ok := mutationRegistry[t]; !ok {
missing = append(missing, t.String())
}
}
if len(missing) > 0 {
panic(fmt.Sprintf("mutation registry missing handlers for: %v", missing))
}
}
// indirectType returns the element type if given a pointer; otherwise the type itself.
func indirectType(t reflect.Type) reflect.Type {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t
}
/*
Integration Guide
-----------------
1. Register all existing mutations:
func init() {
RegisterMutation[*messages.AddItem]("AddItem",
func(g *CartGrain, m *messages.AddItem) error {
// (port logic from existing switch branch)
// ...
return nil
},
WithTotals(),
)
// ... repeat for others
}
2. In CartGrain.Apply (early in the method) add:
if updated, err := ApplyRegistered(c, content); err == nil {
return updated, nil
} else if err != ErrMutationNotRegistered {
return nil, err
}
// existing switch fallback below
3. Once all mutations are registered, remove the legacy switch cases
and leave a single ErrMutationNotRegistered path for unknown types.
4. Add a coverage test (see docs for example; removed from source for clarity).
5. (Optional) Add metrics / tracing wrappers for handlers.
*/

View File

@@ -25,29 +25,24 @@ import (
// Future considerations: // Future considerations:
// - If delivery pricing logic changes (e.g., dynamic taxes per delivery), // - If delivery pricing logic changes (e.g., dynamic taxes per delivery),
// UpdateTotals() may need enhancement to incorporate delivery tax properly. // UpdateTotals() may need enhancement to incorporate delivery tax properly.
func init() {
RegisterMutation[messages.RemoveDelivery](
"RemoveDelivery",
func(g *CartGrain, m *messages.RemoveDelivery) error {
if m == nil {
return fmt.Errorf("RemoveDelivery: nil payload")
}
targetID := int(m.Id)
index := -1
for i, d := range g.Deliveries {
if d.Id == targetID {
index = i
break
}
}
if index == -1 {
return fmt.Errorf("RemoveDelivery: delivery id %d not found", m.Id)
}
// Remove delivery (order not preserved beyond necessity) func RemoveDelivery(g *CartGrain, m *messages.RemoveDelivery) error {
g.Deliveries = append(g.Deliveries[:index], g.Deliveries[index+1:]...) if m == nil {
return nil return fmt.Errorf("RemoveDelivery: nil payload")
}, }
WithTotals(), targetID := int(m.Id)
) index := -1
for i, d := range g.Deliveries {
if d.Id == targetID {
index = i
break
}
}
if index == -1 {
return fmt.Errorf("RemoveDelivery: delivery id %d not found", m.Id)
}
// Remove delivery (order not preserved beyond necessity)
g.Deliveries = append(g.Deliveries[:index], g.Deliveries[index+1:]...)
return nil
} }

View File

@@ -21,29 +21,24 @@ import (
// semantics require pruning delivery.item_ids you can extend this handler. // semantics require pruning delivery.item_ids you can extend this handler.
// - If multiple lines somehow shared the same Id (should not happen), only // - If multiple lines somehow shared the same Id (should not happen), only
// the first match would be removed—data integrity relies on unique line Ids. // the first match would be removed—data integrity relies on unique line Ids.
func init() {
RegisterMutation[messages.RemoveItem](
"RemoveItem",
func(g *CartGrain, m *messages.RemoveItem) error {
if m == nil {
return fmt.Errorf("RemoveItem: nil payload")
}
targetID := int(m.Id)
index := -1 func RemoveItem(g *CartGrain, m *messages.RemoveItem) error {
for i, it := range g.Items { if m == nil {
if it.Id == targetID { return fmt.Errorf("RemoveItem: nil payload")
index = i }
break targetID := int(m.Id)
}
}
if index == -1 {
return fmt.Errorf("RemoveItem: item id %d not found", m.Id)
}
g.Items = append(g.Items[:index], g.Items[index+1:]...) index := -1
return nil for i, it := range g.Items {
}, if it.Id == targetID {
WithTotals(), index = i
) break
}
}
if index == -1 {
return fmt.Errorf("RemoveItem: item id %d not found", m.Id)
}
g.Items = append(g.Items[:index], g.Items[index+1:]...)
return nil
} }

View File

@@ -1,11 +1,5 @@
package main package main
import (
"fmt"
messages "git.tornberg.me/go-cart-actor/pkg/messages"
)
// mutation_set_cart_items.go // mutation_set_cart_items.go
// //
// Registers the SetCartRequest mutation. This mutation replaces the entire list // Registers the SetCartRequest mutation. This mutation replaces the entire list
@@ -25,33 +19,28 @@ import (
// - Deliveries might reference item IDs that are now invalid—original logic // - Deliveries might reference item IDs that are now invalid—original logic
// also left deliveries untouched. If that becomes an issue, add a cleanup // also left deliveries untouched. If that becomes an issue, add a cleanup
// pass to remove deliveries whose item IDs no longer exist. // pass to remove deliveries whose item IDs no longer exist.
func init() {
RegisterMutation[messages.SetCartRequest](
"SetCartRequest",
func(g *CartGrain, m *messages.SetCartRequest) error {
if m == nil {
return fmt.Errorf("SetCartRequest: nil payload")
}
// Clear current items (keep deliveries) // func HandleSetCartRequest(g *CartGrain, m *messages.SetCartRequest) error {
g.mu.Lock() // if m == nil {
g.Items = make([]*CartItem, 0, len(m.Items)) // return fmt.Errorf("SetCartRequest: nil payload")
g.mu.Unlock() // }
for _, it := range m.Items { // // Clear current items (keep deliveries)
if it == nil { // g.mu.Lock()
continue // g.Items = make([]*CartItem, 0, len(m.Items))
} // g.mu.Unlock()
if it.Sku == "" || it.Quantity < 1 {
return fmt.Errorf("SetCartRequest: invalid item (sku='%s' qty=%d)", it.Sku, it.Quantity) // for _, it := range m.Items {
} // if it == nil {
_, err := g.AddItem(it.Sku, int(it.Quantity), it.Country, it.StoreId) // continue
if err != nil { // }
return fmt.Errorf("SetCartRequest: add sku '%s' failed: %w", it.Sku, err) // if it.Sku == "" || it.Quantity < 1 {
} // return fmt.Errorf("SetCartRequest: invalid item (sku='%s' qty=%d)", it.Sku, it.Quantity)
} // }
return nil // _, err := g.AddItem(it.Sku, int(it.Quantity), it.Country, it.StoreId)
}, // if err != nil {
WithTotals(), // return fmt.Errorf("SetCartRequest: add sku '%s' failed: %w", it.Sku, err)
) // }
} // }
// return nil
// }

View File

@@ -39,63 +39,58 @@ import (
// - Variable delivery pricing (based on weight, distance, provider, etc.) // - Variable delivery pricing (based on weight, distance, provider, etc.)
// - Validation of provider codes // - Validation of provider codes
// - Multi-currency delivery pricing // - Multi-currency delivery pricing
func init() {
RegisterMutation[messages.SetDelivery](
"SetDelivery",
func(g *CartGrain, m *messages.SetDelivery) error {
if m == nil {
return fmt.Errorf("SetDelivery: nil payload")
}
if m.Provider == "" {
return fmt.Errorf("SetDelivery: provider is empty")
}
withDelivery := g.ItemsWithDelivery() func SetDelivery(g *CartGrain, m *messages.SetDelivery) error {
targetItems := make([]int, 0) if m == nil {
return fmt.Errorf("SetDelivery: nil payload")
}
if m.Provider == "" {
return fmt.Errorf("SetDelivery: provider is empty")
}
if len(m.Items) == 0 { withDelivery := g.ItemsWithDelivery()
// Use every item currently without a delivery targetItems := make([]int, 0)
targetItems = append(targetItems, g.ItemsWithoutDelivery()...)
} else { if len(m.Items) == 0 {
// Validate explicit list // Use every item currently without a delivery
for _, id64 := range m.Items { targetItems = append(targetItems, g.ItemsWithoutDelivery()...)
id := int(id64) } else {
found := false // Validate explicit list
for _, it := range g.Items { for _, id64 := range m.Items {
if it.Id == id { id := int(id64)
found = true found := false
break for _, it := range g.Items {
} if it.Id == id {
} found = true
if !found { break
return fmt.Errorf("SetDelivery: item id %d not found", id)
}
if slices.Contains(withDelivery, id) {
return fmt.Errorf("SetDelivery: item id %d already has a delivery", id)
}
targetItems = append(targetItems, id)
} }
} }
if !found {
if len(targetItems) == 0 { return fmt.Errorf("SetDelivery: item id %d not found", id)
return fmt.Errorf("SetDelivery: no eligible items to attach")
} }
if slices.Contains(withDelivery, id) {
return fmt.Errorf("SetDelivery: item id %d already has a delivery", id)
}
targetItems = append(targetItems, id)
}
}
// Append new delivery if len(targetItems) == 0 {
g.mu.Lock() return fmt.Errorf("SetDelivery: no eligible items to attach")
g.lastDeliveryId++ }
newId := g.lastDeliveryId
g.Deliveries = append(g.Deliveries, &CartDelivery{
Id: newId,
Provider: m.Provider,
PickupPoint: m.PickupPoint,
Price: 4900, // TODO: externalize pricing
Items: targetItems,
})
g.mu.Unlock()
return nil // Append new delivery
}, g.mu.Lock()
WithTotals(), g.lastDeliveryId++
) newId := g.lastDeliveryId
g.Deliveries = append(g.Deliveries, &CartDelivery{
Id: newId,
Provider: m.Provider,
PickupPoint: m.PickupPoint,
Price: 4900, // TODO: externalize pricing
Items: targetItems,
})
g.mu.Unlock()
return nil
} }

View File

@@ -28,29 +28,24 @@ import (
// - Validate pickup point fields (country code, zip format, etc.) // - Validate pickup point fields (country code, zip format, etc.)
// - Track history / audit of pickup point changes // - Track history / audit of pickup point changes
// - Trigger delivery price adjustments (which would then require WithTotals()). // - Trigger delivery price adjustments (which would then require WithTotals()).
func init() {
RegisterMutation[messages.SetPickupPoint](
"SetPickupPoint",
func(g *CartGrain, m *messages.SetPickupPoint) error {
if m == nil {
return fmt.Errorf("SetPickupPoint: nil payload")
}
for _, d := range g.Deliveries { func SetPickupPoint(g *CartGrain, m *messages.SetPickupPoint) error {
if d.Id == int(m.DeliveryId) { if m == nil {
d.PickupPoint = &messages.PickupPoint{ return fmt.Errorf("SetPickupPoint: nil payload")
Id: m.Id, }
Name: m.Name,
Address: m.Address, for _, d := range g.Deliveries {
City: m.City, if d.Id == int(m.DeliveryId) {
Zip: m.Zip, d.PickupPoint = &messages.PickupPoint{
Country: m.Country, Id: m.Id,
} Name: m.Name,
return nil Address: m.Address,
} City: m.City,
Zip: m.Zip,
Country: m.Country,
} }
return fmt.Errorf("SetPickupPoint: delivery id %d not found", m.DeliveryId) return nil
}, }
// No WithTotals(): pickup point does not change pricing / tax. }
) return fmt.Errorf("SetPickupPoint: delivery id %d not found", m.DeliveryId)
} }

View File

@@ -11,6 +11,7 @@ import (
"git.tornberg.me/go-cart-actor/pkg/actor" "git.tornberg.me/go-cart-actor/pkg/actor"
messages "git.tornberg.me/go-cart-actor/pkg/messages" messages "git.tornberg.me/go-cart-actor/pkg/messages"
"github.com/gogo/protobuf/proto"
) )
type PoolServer struct { type PoolServer struct {
@@ -27,7 +28,7 @@ func NewPoolServer(pool actor.GrainPool[*CartGrain], pod_name string, klarnaClie
} }
} }
func (s *PoolServer) ApplyLocal(id CartId, mutation interface{}) (*CartGrain, error) { func (s *PoolServer) ApplyLocal(id CartId, mutation proto.Message) (*CartGrain, error) {
return s.pool.Apply(uint64(id), mutation) return s.pool.Apply(uint64(id), mutation)
} }
@@ -42,7 +43,11 @@ func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request, id CartId
func (s *PoolServer) HandleAddSku(w http.ResponseWriter, r *http.Request, id CartId) error { func (s *PoolServer) HandleAddSku(w http.ResponseWriter, r *http.Request, id CartId) error {
sku := r.PathValue("sku") sku := r.PathValue("sku")
data, err := s.ApplyLocal(id, &messages.AddRequest{Sku: sku, Quantity: 1}) msg, err := GetItemAddMessage(sku, 1, getCountryFromHost(r.Host), nil)
if err != nil {
return err
}
data, err := s.ApplyLocal(id, msg)
if err != nil { if err != nil {
return err return err
} }
@@ -80,7 +85,7 @@ func (s *PoolServer) HandleDeleteItem(w http.ResponseWriter, r *http.Request, id
return s.WriteResult(w, data) return s.WriteResult(w, data)
} }
type SetDelivery struct { type SetDeliveryRequest struct {
Provider string `json:"provider"` Provider string `json:"provider"`
Items []int64 `json:"items"` Items []int64 `json:"items"`
PickupPoint *messages.PickupPoint `json:"pickupPoint,omitempty"` PickupPoint *messages.PickupPoint `json:"pickupPoint,omitempty"`
@@ -88,7 +93,7 @@ type SetDelivery struct {
func (s *PoolServer) HandleSetDelivery(w http.ResponseWriter, r *http.Request, id CartId) error { func (s *PoolServer) HandleSetDelivery(w http.ResponseWriter, r *http.Request, id CartId) error {
delivery := SetDelivery{} delivery := SetDeliveryRequest{}
err := json.NewDecoder(r.Body).Decode(&delivery) err := json.NewDecoder(r.Body).Decode(&delivery)
if err != nil { if err != nil {
return err return err
@@ -271,7 +276,6 @@ func (s *PoolServer) ApplyCheckoutStarted(klarnaOrder *CheckoutOrder, id CartId)
// } // }
// //
func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) { func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
var id CartId var id CartId

View File

@@ -0,0 +1 @@
{"type":"AddItem","timestamp":"2025-10-13T15:25:09.772277+02:00","mutation":{"item_id":789396,"quantity":1,"price":18600,"sku":"789396","name":"Samsung Galaxy Z Fold6 Slim S-Pen fodral (grått)","image":"/image/dv_web_D18000128131832/789396/samsung-galaxy-z-fold6-slim-s-pen-fodral-gratt.jpg","tax":2500,"brand":"Samsung","category":"Mobiler, Tablets \u0026 Smartklockor","category2":"Mobiltillbehör","category3":"Mobilskal \u0026 Mobilfodral","articleType":"ZHAW","sellerId":"152","sellerName":"Elgiganten"}}

62
pkg/actor/disk_storage.go Normal file
View File

@@ -0,0 +1,62 @@
package actor
import (
"errors"
"fmt"
"log"
"os"
"path/filepath"
"github.com/gogo/protobuf/proto"
)
type DiskStorage[V any] struct {
*StateStorage
path string
}
type LogStorage[V any] interface {
LoadEvents(id uint64, grain Grain[V]) error
AppendEvent(id uint64, msg proto.Message) error
}
func NewDiskStorage[V any](path string, registry MutationRegistry) LogStorage[V] {
return &DiskStorage[V]{
StateStorage: NewState(registry),
path: path,
}
}
func (s *DiskStorage[V]) logPath(id uint64) string {
return filepath.Join(s.path, fmt.Sprintf("%d.events.log", id))
}
func (s *DiskStorage[V]) LoadEvents(id uint64, grain Grain[V]) error {
path := s.logPath(id)
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
// No log -> nothing to replay
return nil
}
fh, err := os.Open(path)
if err != nil {
return fmt.Errorf("open replay file: %w", err)
}
defer fh.Close()
return s.Load(fh, func(msg proto.Message) {
s.registry.Apply(grain, msg)
})
}
func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error {
path := s.logPath(id)
fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Printf("failed to open event log file: %v", err)
return err
}
defer fh.Close()
return s.Append(fh, msg)
}

View File

@@ -1,10 +1,13 @@
package actor package actor
import "time" import (
"time"
//"github.com/gogo/protobuf/proto"
)
type Grain[V any] interface { type Grain[V any] interface {
GetId() uint64 GetId() uint64
Apply(content any, isReplay bool) (*V, error) //Apply(content proto.Message, isReplay bool) (*V, error)
GetLastAccess() time.Time GetLastAccess() time.Time
GetLastChange() time.Time GetLastChange() time.Time
GetCurrentState() (*V, error) GetCurrentState() (*V, error)

View File

@@ -2,10 +2,12 @@ package actor
import ( import (
"net/http" "net/http"
"github.com/gogo/protobuf/proto"
) )
type GrainPool[V any] interface { type GrainPool[V any] interface {
Apply(id uint64, mutation any) (V, error) Apply(id uint64, mutation proto.Message) (V, error)
Get(id uint64) (V, error) Get(id uint64) (V, error)
OwnerHost(id uint64) (Host, bool) OwnerHost(id uint64) (Host, bool)
Hostname() string Hostname() string

View File

@@ -0,0 +1,204 @@
package actor
import (
"fmt"
"log"
"reflect"
"sync"
"github.com/gogo/protobuf/proto"
)
type MutationRegistry interface {
Apply(grain any, msg proto.Message) error
RegisterMutations(handlers ...MutationHandler)
Create(typeName string) (proto.Message, bool)
GetTypeName(msg proto.Message) (string, bool)
//GetStorageEvent(msg proto.Message) StorageEvent
//FromStorageEvent(event StorageEvent) (proto.Message, error)
}
type ProtoMutationRegistry struct {
mutationRegistryMu sync.RWMutex
mutationRegistry map[reflect.Type]MutationHandler
}
var (
ErrMutationNotRegistered = fmt.Errorf("mutation not registered")
)
// MutationOption configures additional behavior for a registered mutation.
type MutationOption func(*mutationOptions)
// mutationOptions holds flags adjustable per registration.
type mutationOptions struct {
updateTotals bool
}
// WithTotals ensures CartGrain.UpdateTotals() is called after a successful handler.
func WithTotals() MutationOption {
return func(o *mutationOptions) {
o.updateTotals = true
}
}
type MutationHandler interface {
Handle(state any, msg proto.Message) error
Name() string
Type() reflect.Type
Create() proto.Message
}
// RegisteredMutation stores metadata + the execution closure.
type RegisteredMutation[V any, T proto.Message] struct {
name string
handler func(*V, T) error
create func() T
msgType reflect.Type
}
func NewMutation[V any, T proto.Message](handler func(*V, T) error, create func() T) *RegisteredMutation[V, T] {
// Derive the name and message type from a concrete instance produced by create().
// This avoids relying on reflect.TypeFor (which can yield unexpected results in some toolchains)
// and ensures we always peel off the pointer layer for proto messages.
instance := create()
rt := reflect.TypeOf(instance)
if rt.Kind() == reflect.Ptr {
rt = rt.Elem()
}
return &RegisteredMutation[V, T]{
name: rt.Name(),
handler: handler,
create: create,
msgType: rt,
}
}
func (m *RegisteredMutation[V, T]) Handle(state any, msg proto.Message) error {
return m.handler(state.(*V), msg.(T))
}
func (m *RegisteredMutation[V, T]) Name() string {
return m.name
}
func (m *RegisteredMutation[V, T]) Create() proto.Message {
return m.create()
}
func (m *RegisteredMutation[V, T]) Type() reflect.Type {
return m.msgType
}
func NewMutationRegistry() MutationRegistry {
return &ProtoMutationRegistry{
mutationRegistry: make(map[reflect.Type]MutationHandler),
mutationRegistryMu: sync.RWMutex{},
}
}
func (r *ProtoMutationRegistry) RegisterMutations(handlers ...MutationHandler) {
r.mutationRegistryMu.Lock()
defer r.mutationRegistryMu.Unlock()
for _, handler := range handlers {
r.mutationRegistry[handler.Type()] = handler
}
}
func (r *ProtoMutationRegistry) GetTypeName(msg proto.Message) (string, bool) {
r.mutationRegistryMu.RLock()
defer r.mutationRegistryMu.RUnlock()
rt := indirectType(reflect.TypeOf(msg))
if handler, ok := r.mutationRegistry[rt]; ok {
return handler.Name(), true
}
return "", false
}
func (r *ProtoMutationRegistry) getHandler(typeName string) MutationHandler {
r.mutationRegistryMu.Lock()
defer r.mutationRegistryMu.Unlock()
for _, handler := range r.mutationRegistry {
if handler.Name() == typeName {
return handler
}
}
return nil
}
func (r *ProtoMutationRegistry) Create(typeName string) (proto.Message, bool) {
handler := r.getHandler(typeName)
if handler == nil {
log.Printf("missing handler for %s", typeName)
return nil, false
}
return handler.Create(), true
}
// ApplyRegistered attempts to apply a registered mutation.
// Returns updated grain if successful.
//
// If the mutation is not registered, returns (nil, ErrMutationNotRegistered).
func (r *ProtoMutationRegistry) Apply(grain any, msg proto.Message) error {
if grain == nil {
return fmt.Errorf("nil grain")
}
if msg == nil {
return fmt.Errorf("nil mutation message")
}
rt := indirectType(reflect.TypeOf(msg))
r.mutationRegistryMu.RLock()
entry, ok := r.mutationRegistry[rt]
r.mutationRegistryMu.RUnlock()
if !ok {
return ErrMutationNotRegistered
}
if err := entry.Handle(grain, msg); err != nil {
return err
}
// if entry.updateTotals {
// grain.UpdateTotals()
// }
return nil
}
// RegisteredMutations returns metadata for all registered mutations (snapshot).
func (r *ProtoMutationRegistry) RegisteredMutations() []string {
r.mutationRegistryMu.RLock()
defer r.mutationRegistryMu.RUnlock()
out := make([]string, 0, len(r.mutationRegistry))
for _, entry := range r.mutationRegistry {
out = append(out, entry.Name())
}
return out
}
// RegisteredMutationTypes returns the reflect.Type list of all registered messages.
// Useful for coverage tests ensuring expected set matches actual set.
func (r *ProtoMutationRegistry) RegisteredMutationTypes() []reflect.Type {
r.mutationRegistryMu.RLock()
defer r.mutationRegistryMu.RUnlock()
out := make([]reflect.Type, 0, len(r.mutationRegistry))
for _, entry := range r.mutationRegistry {
out = append(out, entry.Type())
}
return out
}
func indirectType(t reflect.Type) reflect.Type {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t
}

View File

@@ -0,0 +1,151 @@
package actor
import (
"errors"
"reflect"
"testing"
"git.tornberg.me/go-cart-actor/pkg/messages"
)
type cartState struct {
calls int
lastAdded *messages.AddItem
}
func TestRegisteredMutationBasics(t *testing.T) {
reg := NewMutationRegistry().(*ProtoMutationRegistry)
addItemMutation := NewMutation[cartState, *messages.AddItem](
func(state *cartState, msg *messages.AddItem) error {
state.calls++
// copy to avoid external mutation side-effects (not strictly necessary for the test)
cp := *msg
state.lastAdded = &cp
return nil
},
func() *messages.AddItem { return &messages.AddItem{} },
)
// Sanity check on mutation metadata
if addItemMutation.Name() != "AddItem" {
t.Fatalf("expected mutation Name() == AddItem, got %s", addItemMutation.Name())
}
if got, want := addItemMutation.Type(), reflect.TypeOf(messages.AddItem{}); got != want {
t.Fatalf("expected Type() == %v, got %v", want, got)
}
reg.RegisterMutations(addItemMutation)
// RegisteredMutations: membership (order not guaranteed)
names := reg.RegisteredMutations()
if !stringSliceContains(names, "AddItem") {
t.Fatalf("RegisteredMutations missing AddItem, got %v", names)
}
// RegisteredMutationTypes: membership (order not guaranteed)
types := reg.RegisteredMutationTypes()
if !typeSliceContains(types, reflect.TypeOf(messages.AddItem{})) {
t.Fatalf("RegisteredMutationTypes missing AddItem type, got %v", types)
}
// GetTypeName should resolve for a pointer instance
name, ok := reg.GetTypeName(&messages.AddItem{})
if !ok || name != "AddItem" {
t.Fatalf("GetTypeName returned (%q,%v), expected (AddItem,true)", name, ok)
}
// GetTypeName should fail for unregistered type
if name, ok := reg.GetTypeName(&messages.Noop{}); ok || name != "" {
t.Fatalf("expected GetTypeName to fail for unregistered message, got (%q,%v)", name, ok)
}
// Create by name
msg, ok := reg.Create("AddItem")
if !ok {
t.Fatalf("Create failed for registered mutation")
}
if _, isAddItem := msg.(*messages.AddItem); !isAddItem {
t.Fatalf("Create returned wrong concrete type: %T", msg)
}
// Create unknown
if m2, ok := reg.Create("Unknown"); ok || m2 != nil {
t.Fatalf("Create should fail for unknown mutation, got (%T,%v)", m2, ok)
}
// Apply happy path
state := &cartState{}
add := &messages.AddItem{ItemId: 42, Quantity: 3, Sku: "ABC"}
if err := reg.Apply(state, add); err != nil {
t.Fatalf("Apply returned error: %v", err)
}
if state.calls != 1 {
t.Fatalf("handler not invoked expected calls=1 got=%d", state.calls)
}
if state.lastAdded == nil || state.lastAdded.ItemId != 42 || state.lastAdded.Quantity != 3 {
t.Fatalf("state not updated correctly: %+v", state.lastAdded)
}
// Apply nil grain
if err := reg.Apply(nil, add); err == nil {
t.Fatalf("expected error for nil grain")
}
// Apply nil message
if err := reg.Apply(state, nil); err == nil {
t.Fatalf("expected error for nil mutation message")
}
// Apply unregistered message
if err := reg.Apply(state, &messages.Noop{}); !errors.Is(err, ErrMutationNotRegistered) {
t.Fatalf("expected ErrMutationNotRegistered, got %v", err)
}
}
// func TestConcurrentSafeRegistrationLookup(t *testing.T) {
// // This test is light-weight; it ensures locks don't deadlock under simple concurrent access.
// reg := NewMutationRegistry().(*ProtoMutationRegistry)
// mut := NewMutation[cartState, *messages.Noop](
// func(state *cartState, msg *messages.Noop) error { state.calls++; return nil },
// func() *messages.Noop { return &messages.Noop{} },
// )
// reg.RegisterMutations(mut)
// done := make(chan struct{})
// const workers = 25
// for i := 0; i < workers; i++ {
// go func() {
// for j := 0; j < 100; j++ {
// _, _ = reg.Create("Noop")
// _, _ = reg.GetTypeName(&messages.Noop{})
// _ = reg.Apply(&cartState{}, &messages.Noop{})
// }
// done <- struct{}{}
// }()
// }
// for i := 0; i < workers; i++ {
// <-done
// }
// }
// Helpers
func stringSliceContains(list []string, target string) bool {
for _, s := range list {
if s == target {
return true
}
}
return false
}
func typeSliceContains(list []reflect.Type, target reflect.Type) bool {
for _, t := range list {
if t == target {
return true
}
}
return false
}

View File

@@ -6,17 +6,20 @@ import (
"maps" "maps"
"sync" "sync"
"time" "time"
"github.com/gogo/protobuf/proto"
) )
type SimpleGrainPool[V any] struct { type SimpleGrainPool[V any] struct {
// fields and methods // fields and methods
localMu sync.RWMutex localMu sync.RWMutex
grains map[uint64]Grain[V] grains map[uint64]Grain[V]
mutationRegistry MutationRegistry
spawn func(id uint64) (Grain[V], error) spawn func(id uint64) (Grain[V], error)
spawnHost func(host string) (Host, error) spawnHost func(host string) (Host, error)
ttl time.Duration storage LogStorage[V]
poolSize int ttl time.Duration
poolSize int
// Cluster coordination -------------------------------------------------- // Cluster coordination --------------------------------------------------
hostname string hostname string
@@ -29,17 +32,28 @@ type SimpleGrainPool[V any] struct {
purgeTicker *time.Ticker purgeTicker *time.Ticker
} }
func NewSimpleGrainPool[V any](size int, ttl time.Duration, hostname string, spawn func(id uint64) (Grain[V], error), spawnHost func(host string) (Host, error)) (*SimpleGrainPool[V], error) { type GrainPoolConfig[V any] struct {
p := &SimpleGrainPool[V]{ Hostname string
grains: make(map[uint64]Grain[V]), Spawn func(id uint64) (Grain[V], error)
SpawnHost func(host string) (Host, error)
TTL time.Duration
PoolSize int
MutationRegistry MutationRegistry
Storage LogStorage[V]
}
spawn: spawn, func NewSimpleGrainPool[V any](config GrainPoolConfig[V]) (*SimpleGrainPool[V], error) {
spawnHost: spawnHost, p := &SimpleGrainPool[V]{
ttl: ttl, grains: make(map[uint64]Grain[V]),
poolSize: size, mutationRegistry: config.MutationRegistry,
hostname: hostname, storage: config.Storage,
remoteOwners: make(map[uint64]Host), spawn: config.Spawn,
remoteHosts: make(map[string]Host), spawnHost: config.SpawnHost,
ttl: config.TTL,
poolSize: config.PoolSize,
hostname: config.Hostname,
remoteOwners: make(map[uint64]Host),
remoteHosts: make(map[string]Host),
} }
p.purgeTicker = time.NewTicker(time.Minute) p.purgeTicker = time.NewTicker(time.Minute)
@@ -344,38 +358,22 @@ func (p *SimpleGrainPool[V]) getOrClaimGrain(id uint64) (Grain[V], error) {
return grain, nil return grain, nil
} }
// ErrNotOwner is returned when a cart belongs to another host. // // ErrNotOwner is returned when a cart belongs to another host.
var ErrNotOwner = fmt.Errorf("not owner") // var ErrNotOwner = fmt.Errorf("not owner")
// Apply applies a mutation to a grain. // Apply applies a mutation to a grain.
func (p *SimpleGrainPool[V]) Apply(id uint64, mutation any) (*V, error) { func (p *SimpleGrainPool[V]) Apply(id uint64, mutation proto.Message) (*V, error) {
grain, err := p.getOrClaimGrain(id) grain, err := p.getOrClaimGrain(id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
//start := time.Now() if applyErr := p.mutationRegistry.Apply(grain, mutation); applyErr != nil {
result, applyErr := grain.Apply(mutation, false) return nil, applyErr
//mutationType := "unknown" }
// if mutation != nil { if err := p.storage.AppendEvent(id, mutation); err != nil {
// if t := reflect.TypeOf(mutation); t != nil { log.Printf("failed to store mutation for grain %d: %v", id, err)
// if t.Kind() == reflect.Pointer { }
// t = t.Elem() return grain.GetCurrentState()
// }
// if t.Name() != "" {
// mutationType = t.Name()
// }
// }
// }
// cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds())
// if applyErr == nil && result != nil {
// cartMutationsTotal.Inc()
// } else if applyErr != nil {
// cartMutationFailuresTotal.Inc()
// }
return result, applyErr
} }
// Get returns the current state of a grain. // Get returns the current state of a grain.

86
pkg/actor/state.go Normal file
View File

@@ -0,0 +1,86 @@
package actor
import (
"encoding/json"
"errors"
"io"
"time"
"github.com/gogo/protobuf/proto"
)
type StateStorage struct {
registry MutationRegistry
}
type StorageEvent struct {
Type string `json:"type"`
TimeStamp time.Time `json:"timestamp"`
Mutation proto.Message `json:"mutation"`
}
type rawEvent struct {
Type string `json:"type"`
TimeStamp time.Time `json:"timestamp"`
Mutation json.RawMessage `json:"mutation"`
}
func NewState(registry MutationRegistry) *StateStorage {
return &StateStorage{
registry: registry,
}
}
var ErrUnknownType = errors.New("unknown type")
func (s *StateStorage) Load(r io.Reader, onMessage func(msg proto.Message)) error {
var err error
for err == nil {
evt, err := s.Read(r)
if err == nil {
onMessage(evt.Mutation)
}
}
return err
}
func (s *StateStorage) Append(io io.Writer, mutation proto.Message) error {
typeName, ok := s.registry.GetTypeName(mutation)
if !ok {
return ErrUnknownType
}
event := &StorageEvent{
Type: typeName,
TimeStamp: time.Now(),
Mutation: mutation,
}
jsonBytes, err := json.Marshal(event)
if err != nil {
return err
}
if _, err := io.Write(jsonBytes); err != nil {
return err
}
return nil
}
func (s *StateStorage) Read(r io.Reader) (*StorageEvent, error) {
var event rawEvent
if err := json.NewDecoder(r).Decode(&event); err != nil {
return nil, err
}
typeName := event.Type
mutation, ok := s.registry.Create(typeName)
if !ok {
return nil, ErrUnknownType
}
if err := json.Unmarshal(event.Mutation, mutation); err != nil {
return nil, err
}
return &StorageEvent{
Type: typeName,
TimeStamp: event.TimeStamp,
Mutation: mutation,
}, nil
}

View File

@@ -17,13 +17,13 @@ import (
// RemoteHost mirrors the lightweight controller used for remote node // RemoteHost mirrors the lightweight controller used for remote node
// interaction. // interaction.
type RemoteHost struct { type RemoteHost struct {
Host string host string
httpBase string httpBase string
conn *grpc.ClientConn conn *grpc.ClientConn
transport *http.Transport transport *http.Transport
client *http.Client client *http.Client
controlClient messages.ControlPlaneClient controlClient messages.ControlPlaneClient
MissedPings int missedPings int
} }
func NewRemoteHost(host string) (*RemoteHost, error) { func NewRemoteHost(host string) (*RemoteHost, error) {
@@ -38,22 +38,6 @@ func NewRemoteHost(host string) (*RemoteHost, error) {
} }
controlClient := messages.NewControlPlaneClient(conn) controlClient := messages.NewControlPlaneClient(conn)
// go func() {
// for retries := range 3 {
// ctx, pingCancel := context.WithTimeout(context.Background(), time.Second)
// _, pingErr := controlClient.Ping(ctx, &messages.Empty{})
// pingCancel()
// if pingErr == nil {
// break
// }
// if retries == 2 {
// log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr)
// conn.Close()
// p
// }
// time.Sleep(500 * time.Millisecond)
// }
// }()
transport := &http.Transport{ transport := &http.Transport{
MaxIdleConns: 100, MaxIdleConns: 100,
@@ -64,18 +48,18 @@ func NewRemoteHost(host string) (*RemoteHost, error) {
client := &http.Client{Transport: transport, Timeout: 10 * time.Second} client := &http.Client{Transport: transport, Timeout: 10 * time.Second}
return &RemoteHost{ return &RemoteHost{
Host: host, host: host,
httpBase: fmt.Sprintf("http://%s:8080/cart", host), httpBase: fmt.Sprintf("http://%s:8080/cart", host),
conn: conn, conn: conn,
transport: transport, transport: transport,
client: client, client: client,
controlClient: controlClient, controlClient: controlClient,
MissedPings: 0, missedPings: 0,
}, nil }, nil
} }
func (h *RemoteHost) Name() string { func (h *RemoteHost) Name() string {
return h.Host return h.host
} }
func (h *RemoteHost) Close() error { func (h *RemoteHost) Close() error {
@@ -92,8 +76,8 @@ func (h *RemoteHost) Ping() bool {
_, err = h.controlClient.Ping(ctx, &messages.Empty{}) _, err = h.controlClient.Ping(ctx, &messages.Empty{})
cancel() cancel()
if err != nil { if err != nil {
h.MissedPings++ h.missedPings++
log.Printf("Ping %s failed (%d) %v", h.Host, h.MissedPings, err) log.Printf("Ping %s failed (%d) %v", h.host, h.missedPings, err)
} }
if !h.IsHealthy() { if !h.IsHealthy() {
return false return false
@@ -101,7 +85,7 @@ func (h *RemoteHost) Ping() bool {
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
} }
h.MissedPings = 0 h.missedPings = 0
return true return true
} }
@@ -113,11 +97,11 @@ func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) {
KnownHosts: knownHosts, KnownHosts: knownHosts,
}) })
if err != nil { if err != nil {
h.MissedPings++ h.missedPings++
log.Printf("Negotiate %s failed: %v", h.Host, err) log.Printf("Negotiate %s failed: %v", h.host, err)
return nil, err return nil, err
} }
h.MissedPings = 0 h.missedPings = 0
return resp.Hosts, nil return resp.Hosts, nil
} }
@@ -126,8 +110,8 @@ func (h *RemoteHost) GetActorIds() []uint64 {
defer cancel() defer cancel()
reply, err := h.controlClient.GetLocalActorIds(ctx, &messages.Empty{}) reply, err := h.controlClient.GetLocalActorIds(ctx, &messages.Empty{})
if err != nil { if err != nil {
log.Printf("Init remote %s: GetCartIds error: %v", h.Host, err) log.Printf("Init remote %s: GetCartIds error: %v", h.host, err)
h.MissedPings++ h.missedPings++
return []uint64{} return []uint64{}
} }
return reply.GetIds() return reply.GetIds()
@@ -135,48 +119,33 @@ func (h *RemoteHost) GetActorIds() []uint64 {
func (h *RemoteHost) AnnounceOwnership(uids []uint64) { func (h *RemoteHost) AnnounceOwnership(uids []uint64) {
_, err := h.controlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{ _, err := h.controlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{
Host: h.Host, Host: h.host,
Ids: uids, Ids: uids,
}) })
if err != nil { if err != nil {
log.Printf("ownership announce to %s failed: %v", h.Host, err) log.Printf("ownership announce to %s failed: %v", h.host, err)
h.MissedPings++ h.missedPings++
return return
} }
h.MissedPings = 0 h.missedPings = 0
} }
func (h *RemoteHost) AnnounceExpiry(uids []uint64) { func (h *RemoteHost) AnnounceExpiry(uids []uint64) {
_, err := h.controlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{ _, err := h.controlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{
Host: h.Host, Host: h.host,
Ids: uids, Ids: uids,
}) })
if err != nil { if err != nil {
log.Printf("expiry announce to %s failed: %v", h.Host, err) log.Printf("expiry announce to %s failed: %v", h.host, err)
h.MissedPings++ h.missedPings++
return return
} }
h.MissedPings = 0 h.missedPings = 0
} }
func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (bool, error) { func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (bool, error) {
target := fmt.Sprintf("%s%s", h.httpBase, r.URL.RequestURI()) target := fmt.Sprintf("%s%s", h.httpBase, r.URL.RequestURI())
// var bodyCopy []byte
// if r.Body != nil && r.Body != http.NoBody {
// var err error
// bodyCopy, err = io.ReadAll(r.Body)
// if err != nil {
// http.Error(w, "proxy read error", http.StatusBadGateway)
// return false, err
// }
// }
// if r.Body != nil {
// r.Body.Close()
// }
// var reqBody io.Reader
// if len(bodyCopy) > 0 {
// reqBody = bytes.NewReader(bodyCopy)
// }
req, err := http.NewRequestWithContext(r.Context(), r.Method, target, r.Body) req, err := http.NewRequestWithContext(r.Context(), r.Method, target, r.Body)
if err != nil { if err != nil {
http.Error(w, "proxy build error", http.StatusBadGateway) http.Error(w, "proxy build error", http.StatusBadGateway)
@@ -214,5 +183,5 @@ func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (b
} }
func (r *RemoteHost) IsHealthy() bool { func (r *RemoteHost) IsHealthy() bool {
return r.MissedPings < 3 return r.missedPings < 3
} }

View File

@@ -12,7 +12,7 @@ option go_package = "git.tornberg.me/go-cart-actor/proto;messages";
// - Liveness (Ping) // - Liveness (Ping)
// - Membership negotiation (Negotiate) // - Membership negotiation (Negotiate)
// - Deterministic ring-based ownership (ConfirmOwner RPC removed) // - Deterministic ring-based ownership (ConfirmOwner RPC removed)
// - Cart ID listing for remote grain spawning (GetCartIds) // - Actor ID listing for remote grain spawning (GetActorIds)
// - Graceful shutdown notifications (Closing) // - Graceful shutdown notifications (Closing)
// No authentication / TLS is defined initially (can be added later). // No authentication / TLS is defined initially (can be added later).
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------