diff --git a/cmd/cart/cart-grain.go b/cmd/cart/cart-grain.go index a4ad103..ca4faca 100644 --- a/cmd/cart/cart-grain.go +++ b/cmd/cart/cart-grain.go @@ -100,7 +100,7 @@ func getInt(data float64, ok bool) (int, error) { return int(data), nil } -func getItemData(sku string, qty int, country string) (*messages.AddItem, error) { +func GetItemAddMessage(sku string, qty int, country string, storeId *string) (*messages.AddItem, error) { item, err := FetchItem(sku, country) if err != nil { return nil, err @@ -162,17 +162,18 @@ func getItemData(sku string, qty int, country string) (*messages.AddItem, error) Disclaimer: item.Disclaimer, Country: country, Outlet: outlet, + StoreId: storeId, }, nil } -func (c *CartGrain) AddItem(sku string, qty int, country string, storeId *string) (*CartGrain, error) { - cartItem, err := getItemData(sku, qty, country) - if err != nil { - return nil, err - } - cartItem.StoreId = storeId - return c.Apply(cartItem, false) -} +// func (c *CartGrain) AddItem(sku string, qty int, country string, storeId *string) (*CartGrain, error) { +// cartItem, err := getItemData(sku, qty, country) +// if err != nil { +// return nil, err +// } +// cartItem.StoreId = storeId +// return c.Apply(cartItem, false) +// } func (c *CartGrain) GetState() ([]byte, error) { return json.Marshal(c) @@ -221,25 +222,25 @@ func GetTaxAmount(total int64, tax int) int64 { return int64(float64(total) / float64((1 + taxD))) } -func (c *CartGrain) Apply(content interface{}, isReplay bool) (*CartGrain, error) { +// func (c *CartGrain) Apply(content proto.Message, isReplay bool) (*CartGrain, error) { - updated, err := ApplyRegistered(c, content) - if err != nil { - if err == ErrMutationNotRegistered { - return nil, fmt.Errorf("unsupported mutation type %T (not registered)", content) - } - return nil, err - } +// updated, err := ApplyRegistered(c, content) +// if err != nil { +// if err == ErrMutationNotRegistered { +// return nil, fmt.Errorf("unsupported mutation type %T (not registered)", content) +// } +// return nil, err +// } - // Sliding TTL: update lastChange only for non-replay successful mutations. - if updated != nil && !isReplay { - c.lastChange = time.Now() - c.lastAccess = time.Now() - go AppendCartEvent(c.Id, content) - } +// // Sliding TTL: update lastChange only for non-replay successful mutations. +// if updated != nil && !isReplay { +// c.lastChange = time.Now() +// c.lastAccess = time.Now() +// go AppendCartEvent(c.Id, content) +// } - return updated, nil -} +// return updated, nil +// } func (c *CartGrain) UpdateTotals() { c.TotalPrice = 0 diff --git a/cmd/cart/disk-storage.go b/cmd/cart/disk-storage.go deleted file mode 100644 index b525a91..0000000 --- a/cmd/cart/disk-storage.go +++ /dev/null @@ -1,73 +0,0 @@ -package main - -import ( - "encoding/gob" - "time" -) - -func init() { - - gob.Register(map[uint64]int64{}) -} - -type DiskStorage struct { - stateFile string - lastSave time.Time - LastSaves map[uint64]time.Time -} - -func NewDiskStorage(stateFile string) (*DiskStorage, error) { - ret := &DiskStorage{ - stateFile: stateFile, - LastSaves: make(map[uint64]time.Time), - } - //err := ret.loadState() - return ret, nil -} - -// func saveMessages(_ interface{}, _ CartId) error { -// // No-op: legacy event log persistence removed in oneof refactor. -// return nil -// } - -// func getCartPath(id string) string { -// return fmt.Sprintf("data/%s.prot", id) -// } - -// func loadMessages(_ Grain, _ CartId) error { -// // No-op: legacy replay removed in oneof refactor. -// return nil -// } - -// func (s *DiskStorage) saveState() error { -// tmpFile := s.stateFile + "_tmp" -// file, err := os.Create(tmpFile) -// if err != nil { -// return err -// } -// defer file.Close() -// err = gob.NewEncoder(file).Encode(s.LastSaves) -// if err != nil { -// return err -// } -// os.Remove(s.stateFile + ".bak") -// os.Rename(s.stateFile, s.stateFile+".bak") -// return os.Rename(tmpFile, s.stateFile) -// } - -// func (s *DiskStorage) loadState() error { -// file, err := os.Open(s.stateFile) -// if err != nil { -// return err -// } -// defer file.Close() -// return gob.NewDecoder(file).Decode(&s.LastSaves) -// } - -func (s *DiskStorage) Store(id CartId, _ *CartGrain) error { - // With the removal of the legacy message log, we only update the timestamp. - ts := time.Now() - s.LastSaves[uint64(id)] = ts - s.lastSave = ts - return nil -} diff --git a/cmd/cart/event_log.go b/cmd/cart/event_log.go index 356b3f9..a62123f 100644 --- a/cmd/cart/event_log.go +++ b/cmd/cart/event_log.go @@ -5,12 +5,14 @@ import ( "encoding/json" "errors" "fmt" + "log" "os" "path/filepath" "reflect" "sync" "time" + "git.tornberg.me/go-cart-actor/pkg/actor" messages "git.tornberg.me/go-cart-actor/pkg/messages" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -205,7 +207,7 @@ func AppendCartEvent(id CartId, mutation interface{}) error { // ReplayCartEvents replays an existing cart's event log into the provided grain. // It applies mutation payloads in order, skipping unknown types. -func ReplayCartEvents(grain *CartGrain, id CartId) error { +func ReplayCartEvents(grain *CartGrain, id CartId, registry actor.MutationRegistry) error { start := time.Now() path := EventLogPath(id) if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { @@ -237,18 +239,19 @@ func ReplayCartEvents(grain *CartGrain, id CartId) error { eventReplayFailuresTotal.Inc() continue // skip malformed line } - factory, ok := eventTypeFactories[raw.Type] + + instance, ok := registry.Create(raw.Type) if !ok { - eventUnknownTypesTotal.Inc() - continue // skip unknown mutation type + log.Printf("loading failed for unknown mutation type: %s", raw.Type) + eventReplayFailuresTotal.Inc() + continue // skip unknown type } - instance := factory() if err := json.Unmarshal(raw.Payload, instance); err != nil { eventMutationErrorsTotal.Inc() continue } // Apply mutation directly using internal registration (bypass AppendCartEvent recursion). - if _, applyErr := ApplyRegistered(grain, instance); applyErr != nil { + if applyErr := registry.Apply(grain, instance); applyErr != nil { eventMutationErrorsTotal.Inc() continue } else { diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 3ec7c8f..5d67636 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -39,34 +39,12 @@ var ( }) ) -func spawn(id uint64) (actor.Grain[CartGrain], error) { - grainSpawns.Inc() - ret := &CartGrain{ - lastItemId: 0, - lastDeliveryId: 0, - Deliveries: []*CartDelivery{}, - Id: CartId(id), - Items: []*CartItem{}, - TotalPrice: 0, - } - // Set baseline lastChange at spawn; replay may update it to last event timestamp. - ret.lastChange = time.Now() - ret.lastAccess = time.Now() - - // Legacy loadMessages (no-op) retained; then replay append-only event log - //_ = loadMessages(ret, id) - err := ReplayCartEvents(ret, CartId(id)) - - return ret, err -} - func init() { os.Mkdir("data", 0755) } type App struct { - pool *actor.SimpleGrainPool[CartGrain] - storage *DiskStorage + pool *actor.SimpleGrainPool[CartGrain] } var podIp = os.Getenv("POD_IP") @@ -91,7 +69,10 @@ func getCountryFromHost(host string) string { if strings.Contains(strings.ToLower(host), "-no") { return "no" } - return "se" + if strings.Contains(strings.ToLower(host), "-se") { + return "se" + } + return "" } func GetDiscovery() discovery.Discovery { @@ -112,21 +93,71 @@ func GetDiscovery() discovery.Discovery { } func main() { + controlPlaneConfig := actor.DefaultServerConfig() - storage, err := NewDiskStorage(fmt.Sprintf("data/s_%s.gob", name)) - if err != nil { - log.Printf("Error loading state: %v\n", err) + + reg := actor.NewMutationRegistry() + reg.RegisterMutations( + actor.NewMutation(AddItem, func() *messages.AddItem { + return &messages.AddItem{} + }), + actor.NewMutation(ChangeQuantity, func() *messages.ChangeQuantity { + return &messages.ChangeQuantity{} + }), + actor.NewMutation(RemoveItem, func() *messages.RemoveItem { + return &messages.RemoveItem{} + }), + actor.NewMutation(InitializeCheckout, func() *messages.InitializeCheckout { + return &messages.InitializeCheckout{} + }), + actor.NewMutation(RemoveDelivery, func() *messages.RemoveDelivery { + return &messages.RemoveDelivery{} + }), + actor.NewMutation(SetDelivery, func() *messages.SetDelivery { + return &messages.SetDelivery{} + }), + actor.NewMutation(SetPickupPoint, func() *messages.SetPickupPoint { + return &messages.SetPickupPoint{} + }), + ) + diskStorage := actor.NewDiskStorage[CartGrain]("data", reg) + poolConfig := actor.GrainPoolConfig[CartGrain]{ + MutationRegistry: reg, + Storage: diskStorage, + Spawn: func(id uint64) (actor.Grain[CartGrain], error) { + grainSpawns.Inc() + ret := &CartGrain{ + lastItemId: 0, + lastDeliveryId: 0, + Deliveries: []*CartDelivery{}, + Id: CartId(id), + Items: []*CartItem{}, + TotalPrice: 0, + } + // Set baseline lastChange at spawn; replay may update it to last event timestamp. + ret.lastChange = time.Now() + ret.lastAccess = time.Now() + + // Legacy loadMessages (no-op) retained; then replay append-only event log + //_ = loadMessages(ret, id) + err := diskStorage.LoadEvents(id, ret) + + return ret, err + }, + SpawnHost: func(host string) (actor.Host, error) { + return proxy.NewRemoteHost(host) + }, + TTL: 15 * time.Minute, + PoolSize: 2 * 65535, + Hostname: podIp, } - pool, err := actor.NewSimpleGrainPool(2*65535, 15*time.Minute, podIp, spawn, func(host string) (actor.Host, error) { - return proxy.NewRemoteHost(host) - }) + pool, err := actor.NewSimpleGrainPool(poolConfig) if err != nil { log.Fatalf("Error creating cart pool: %v\n", err) } app := &App{ - pool: pool, - storage: storage, + pool: pool, } grpcSrv, err := actor.NewControlServer[*CartGrain](controlPlaneConfig, pool) diff --git a/cmd/cart/mutation_add_item.go b/cmd/cart/mutation_add_item.go index 031367e..1bae75a 100644 --- a/cmd/cart/mutation_add_item.go +++ b/cmd/cart/mutation_add_item.go @@ -20,63 +20,57 @@ import ( // NOTE: Any future field additions in messages.AddItem that affect pricing / tax // must keep this handler in sync. -func init() { - RegisterMutation[messages.AddItem]( - "AddItem", - func(g *CartGrain, m *messages.AddItem) error { - if m == nil { - return fmt.Errorf("AddItem: nil payload") - } - if m.Quantity < 1 { - return fmt.Errorf("AddItem: invalid quantity %d", m.Quantity) - } +func AddItem(g *CartGrain, m *messages.AddItem) error { + if m == nil { + return fmt.Errorf("AddItem: nil payload") + } + if m.Quantity < 1 { + return fmt.Errorf("AddItem: invalid quantity %d", m.Quantity) + } - // Fast path: merge with existing item having same SKU - if existing, found := g.FindItemWithSku(m.Sku); found { - existing.Quantity += int(m.Quantity) - return nil - } + // Fast path: merge with existing item having same SKU + if existing, found := g.FindItemWithSku(m.Sku); found { + existing.Quantity += int(m.Quantity) + return nil + } - g.mu.Lock() - defer g.mu.Unlock() + g.mu.Lock() + defer g.mu.Unlock() - g.lastItemId++ - taxRate := 2500 - if m.Tax > 0 { - taxRate = int(m.Tax) - } - taxAmountPerUnit := GetTaxAmount(m.Price, taxRate) + g.lastItemId++ + taxRate := 2500 + if m.Tax > 0 { + taxRate = int(m.Tax) + } + taxAmountPerUnit := GetTaxAmount(m.Price, taxRate) - g.Items = append(g.Items, &CartItem{ - Id: g.lastItemId, - ItemId: int(m.ItemId), - Quantity: int(m.Quantity), - Sku: m.Sku, - Name: m.Name, - Price: m.Price, - TotalPrice: m.Price * int64(m.Quantity), - TotalTax: int64(taxAmountPerUnit * int64(m.Quantity)), - Image: m.Image, - Stock: StockStatus(m.Stock), - Disclaimer: m.Disclaimer, - Brand: m.Brand, - Category: m.Category, - Category2: m.Category2, - Category3: m.Category3, - Category4: m.Category4, - Category5: m.Category5, - OrgPrice: m.OrgPrice, - ArticleType: m.ArticleType, - Outlet: m.Outlet, - SellerId: m.SellerId, - SellerName: m.SellerName, - Tax: int(taxAmountPerUnit), - TaxRate: taxRate, - StoreId: m.StoreId, - }) + g.Items = append(g.Items, &CartItem{ + Id: g.lastItemId, + ItemId: int(m.ItemId), + Quantity: int(m.Quantity), + Sku: m.Sku, + Name: m.Name, + Price: m.Price, + TotalPrice: m.Price * int64(m.Quantity), + TotalTax: int64(taxAmountPerUnit * int64(m.Quantity)), + Image: m.Image, + Stock: StockStatus(m.Stock), + Disclaimer: m.Disclaimer, + Brand: m.Brand, + Category: m.Category, + Category2: m.Category2, + Category3: m.Category3, + Category4: m.Category4, + Category5: m.Category5, + OrgPrice: m.OrgPrice, + ArticleType: m.ArticleType, + Outlet: m.Outlet, + SellerId: m.SellerId, + SellerName: m.SellerName, + Tax: int(taxAmountPerUnit), + TaxRate: taxRate, + StoreId: m.StoreId, + }) - return nil - }, - WithTotals(), // Recalculate totals after successful mutation - ) + return nil } diff --git a/cmd/cart/mutation_add_request.go b/cmd/cart/mutation_add_request.go index 0b04f39..554546f 100644 --- a/cmd/cart/mutation_add_request.go +++ b/cmd/cart/mutation_add_request.go @@ -1,11 +1,5 @@ package main -import ( - "fmt" - - messages "git.tornberg.me/go-cart-actor/pkg/messages" -) - // mutation_add_request.go // // Registers the AddRequest mutation. This mutation is a higher-level intent @@ -30,32 +24,28 @@ import ( // - Stock validation before increasing quantity // - Reservation logic or concurrency guards around stock updates // - Coupon / pricing rules applied conditionally during add-by-sku -func init() { - RegisterMutation[messages.AddRequest]( - "AddRequest", - func(g *CartGrain, m *messages.AddRequest) error { - if m == nil { - return fmt.Errorf("AddRequest: nil payload") - } - if m.Sku == "" { - return fmt.Errorf("AddRequest: sku is empty") - } - if m.Quantity < 1 { - return fmt.Errorf("AddRequest: invalid quantity %d", m.Quantity) - } - // Existing line: accumulate quantity only. - if existing, found := g.FindItemWithSku(m.Sku); found { - existing.Quantity += int(m.Quantity) - return nil - } +// func HandleAddRequest(g *CartGrain, m *messages.AddRequest) error { +// if m == nil { +// return fmt.Errorf("AddRequest: nil payload") +// } +// if m.Sku == "" { +// return fmt.Errorf("AddRequest: sku is empty") +// } +// if m.Quantity < 1 { +// return fmt.Errorf("AddRequest: invalid quantity %d", m.Quantity) +// } - // New line: delegate to higher-level AddItem flow (product lookup). - // We intentionally ignore the returned *CartGrain; registry will - // do totals again after this handler returns (harmless). - _, err := g.AddItem(m.Sku, int(m.Quantity), m.Country, m.StoreId) - return err - }, - WithTotals(), - ) -} +// // Existing line: accumulate quantity only. +// if existing, found := g.FindItemWithSku(m.Sku); found { +// existing.Quantity += int(m.Quantity) +// return nil +// } +// data, err := GetItemAddMessage(m.Sku, int(m.Quantity), m.Country, m.StoreId) +// if err != nil { +// return err +// } +// return AddItem(g, data) + +// return err +// } diff --git a/cmd/cart/mutation_change_quantity.go b/cmd/cart/mutation_change_quantity.go index cc275e6..56efc1e 100644 --- a/cmd/cart/mutation_change_quantity.go +++ b/cmd/cart/mutation_change_quantity.go @@ -25,34 +25,29 @@ import ( // the grain's implicit expectation that higher layers control access. // (If strict locking is required around every mutation, wrap logic in // an explicit g.mu.Lock()/Unlock(), but current model mirrors prior code.) -func init() { - RegisterMutation[messages.ChangeQuantity]( - "ChangeQuantity", - func(g *CartGrain, m *messages.ChangeQuantity) error { - if m == nil { - return fmt.Errorf("ChangeQuantity: nil payload") - } - foundIndex := -1 - for i, it := range g.Items { - if it.Id == int(m.Id) { - foundIndex = i - break - } - } - if foundIndex == -1 { - return fmt.Errorf("ChangeQuantity: item id %d not found", m.Id) - } +func ChangeQuantity(g *CartGrain, m *messages.ChangeQuantity) error { + if m == nil { + return fmt.Errorf("ChangeQuantity: nil payload") + } - if m.Quantity <= 0 { - // Remove the item - g.Items = append(g.Items[:foundIndex], g.Items[foundIndex+1:]...) - return nil - } + foundIndex := -1 + for i, it := range g.Items { + if it.Id == int(m.Id) { + foundIndex = i + break + } + } + if foundIndex == -1 { + return fmt.Errorf("ChangeQuantity: item id %d not found", m.Id) + } - g.Items[foundIndex].Quantity = int(m.Quantity) - return nil - }, - WithTotals(), - ) + if m.Quantity <= 0 { + // Remove the item + g.Items = append(g.Items[:foundIndex], g.Items[foundIndex+1:]...) + return nil + } + + g.Items[foundIndex].Quantity = int(m.Quantity) + return nil } diff --git a/cmd/cart/mutation_initialize_checkout.go b/cmd/cart/mutation_initialize_checkout.go index 0d64988..dcc2d50 100644 --- a/cmd/cart/mutation_initialize_checkout.go +++ b/cmd/cart/mutation_initialize_checkout.go @@ -28,22 +28,17 @@ import ( // parallel checkout attempts are possible, add higher-level guards // (e.g. reject if PaymentInProgress already true unless reusing // the same OrderReference). -func init() { - RegisterMutation[messages.InitializeCheckout]( - "InitializeCheckout", - func(g *CartGrain, m *messages.InitializeCheckout) error { - if m == nil { - return fmt.Errorf("InitializeCheckout: nil payload") - } - if m.OrderId == "" { - return fmt.Errorf("InitializeCheckout: missing orderId") - } - g.OrderReference = m.OrderId - g.PaymentStatus = m.Status - g.PaymentInProgress = m.PaymentInProgress - return nil - }, - // No WithTotals(): monetary aggregates are unaffected. - ) +func InitializeCheckout(g *CartGrain, m *messages.InitializeCheckout) error { + if m == nil { + return fmt.Errorf("InitializeCheckout: nil payload") + } + if m.OrderId == "" { + return fmt.Errorf("InitializeCheckout: missing orderId") + } + + g.OrderReference = m.OrderId + g.PaymentStatus = m.Status + g.PaymentInProgress = m.PaymentInProgress + return nil } diff --git a/cmd/cart/mutation_order_created.go b/cmd/cart/mutation_order_created.go index aa978ae..a197929 100644 --- a/cmd/cart/mutation_order_created.go +++ b/cmd/cart/mutation_order_created.go @@ -32,22 +32,17 @@ import ( // - Relies on the higher-level guarantee that Apply() calls are serialized // per grain. If out-of-order events are possible, embed versioning or // timestamps in the mutation and compare before applying changes. -func init() { - RegisterMutation[messages.OrderCreated]( - "OrderCreated", - func(g *CartGrain, m *messages.OrderCreated) error { - if m == nil { - return fmt.Errorf("OrderCreated: nil payload") - } - if m.OrderId == "" { - return fmt.Errorf("OrderCreated: missing orderId") - } - g.OrderReference = m.OrderId - g.PaymentStatus = m.Status - g.PaymentInProgress = false - return nil - }, - // No WithTotals(): order completion does not modify pricing or taxes. - ) +func OrderCreated(g *CartGrain, m *messages.OrderCreated) error { + if m == nil { + return fmt.Errorf("OrderCreated: nil payload") + } + if m.OrderId == "" { + return fmt.Errorf("OrderCreated: missing orderId") + } + + g.OrderReference = m.OrderId + g.PaymentStatus = m.Status + g.PaymentInProgress = false + return nil } diff --git a/cmd/cart/mutation_registry.go b/cmd/cart/mutation_registry.go deleted file mode 100644 index 975ec22..0000000 --- a/cmd/cart/mutation_registry.go +++ /dev/null @@ -1,301 +0,0 @@ -package main - -import ( - "fmt" - "reflect" - "sync" -) - -// mutation_registry.go -// -// Mutation Registry Infrastructure -// -------------------------------- -// This file introduces a generic registry for cart mutations that: -// -// 1. Decouples mutation logic from the large type-switch inside CartGrain.Apply. -// 2. Enforces (at registration time) that every mutation handler has the correct -// signature: func(*CartGrain, *T) error -// 3. Optionally auto-updates cart totals after a mutation if flagged. -// 4. Provides a single authoritative list of registered mutations for -// introspection / coverage testing. -// 5. Allows incremental migration: you can first register new mutations here, -// and later prune the legacy switch cases. -// -// Usage Pattern -// ------------- -// // Define your mutation proto message (e.g. messages.ApplyCoupon in messages.proto) -// // Regenerate protobufs. -// -// // In an init() (ideally in a small file like mutations_apply_coupon.go) -// func init() { -// RegisterMutation[*messages.ApplyCoupon]( -// "ApplyCoupon", -// func(g *CartGrain, m *messages.ApplyCoupon) error { -// // domain logic ... -// discount := int64(5000) -// if g.TotalPrice < discount { -// discount = g.TotalPrice -// } -// g.TotalDiscount += discount -// g.TotalPrice -= discount -// return nil -// }, -// WithTotals(), // we changed price-related fields; recalc totals -// ) -// } -// -// // To invoke dynamically (alternative to the current switch): -// if updated, err := ApplyRegistered(grain, incomingMessage); err == nil { -// grain = updated -// } else if errors.Is(err, ErrMutationNotRegistered) { -// // fallback to legacy switch logic -// } -// -// Migration Strategy -// ------------------ -// 1. For each existing mutation handled in CartGrain.Apply, add a registry -// registration with equivalent logic. -// 2. Add a test that enumerates all *expected* mutation proto types and asserts -// they are present in RegisteredMutationTypes(). -// 3. Once coverage is 100%, replace the switch in CartGrain.Apply with a call -// to ApplyRegistered (and optionally keep a minimal default to produce an -// "unsupported mutation" error). -// -// Thread Safety -// ------------- -// Registration is typically done at init() time; a RWMutex provides safety -// should late dynamic registration ever be introduced. -// -// Auto Totals -// ----------- -// Many mutations require recomputing totals. To avoid forgetting this, pass -// WithTotals() when registering. This will invoke grain.UpdateTotals() after -// the handler returns successfully. -// -// Error Semantics -// --------------- -// - If a handler returns an error, totals are NOT recalculated (even if -// WithTotals() was specified). -// - ApplyRegistered returns (nil, ErrMutationNotRegistered) if the message type -// is absent. -// -// Extensibility -// ------------- -// It is straightforward to add options like audit hooks, metrics wrappers, -// or optimistic concurrency guards by extending MutationOption. -// -// NOTE: Generics require Go 1.18+. If constrained to earlier Go versions, -// replace the generic registration with a non-generic RegisterMutationType -// that accepts reflect.Type and an adapter function. -// -// --------------------------------------------------------------------------- - -var ( - mutationRegistryMu sync.RWMutex - mutationRegistry = make(map[reflect.Type]*registeredMutation) - - // ErrMutationNotRegistered is returned when no handler exists for a given mutation type. - ErrMutationNotRegistered = fmt.Errorf("mutation not registered") -) - -// MutationOption configures additional behavior for a registered mutation. -type MutationOption func(*mutationOptions) - -// mutationOptions holds flags adjustable per registration. -type mutationOptions struct { - updateTotals bool -} - -// WithTotals ensures CartGrain.UpdateTotals() is called after a successful handler. -func WithTotals() MutationOption { - return func(o *mutationOptions) { - o.updateTotals = true - } -} - -// registeredMutation stores metadata + the execution closure. -type registeredMutation struct { - name string - handler func(*CartGrain, interface{}) error - updateTotals bool - msgType reflect.Type -} - -// RegisterMutation registers a mutation handler for a specific message type T. -// -// Parameters: -// -// name - a human-readable identifier (used for diagnostics / coverage tests). -// handler - business logic operating on the cart grain & strongly typed message. -// options - optional behavior flags (e.g., WithTotals()). -// -// Panics if: -// - name is empty -// - handler is nil -// - duplicate registration for the same message type T -// -// Typical call is placed in an init() function. -func RegisterMutation[T any](name string, handler func(*CartGrain, *T) error, options ...MutationOption) { - if name == "" { - panic("RegisterMutation: name is required") - } - if handler == nil { - panic("RegisterMutation: handler is nil") - } - - // Derive the reflect.Type for *T then its Elem (T) for mapping. - var zero *T - rtPtr := reflect.TypeOf(zero) - if rtPtr.Kind() != reflect.Ptr { - panic("RegisterMutation: expected pointer type for generic parameter") - } - rt := rtPtr.Elem() - - opts := mutationOptions{} - for _, opt := range options { - opt(&opts) - } - - wrapped := func(g *CartGrain, m interface{}) error { - typed, ok := m.(*T) - if !ok { - return fmt.Errorf("mutation type mismatch: have %T want *%s", m, rt.Name()) - } - return handler(g, typed) - } - - mutationRegistryMu.Lock() - defer mutationRegistryMu.Unlock() - - if _, exists := mutationRegistry[rt]; exists { - panic(fmt.Sprintf("RegisterMutation: duplicate registration for type %s", rt.String())) - } - - mutationRegistry[rt] = ®isteredMutation{ - name: name, - handler: wrapped, - updateTotals: opts.updateTotals, - msgType: rt, - } -} - -// ApplyRegistered attempts to apply a registered mutation. -// Returns updated grain if successful. -// -// If the mutation is not registered, returns (nil, ErrMutationNotRegistered). -func ApplyRegistered(grain *CartGrain, msg interface{}) (*CartGrain, error) { - if grain == nil { - return nil, fmt.Errorf("nil grain") - } - if msg == nil { - return nil, fmt.Errorf("nil mutation message") - } - - rt := indirectType(reflect.TypeOf(msg)) - mutationRegistryMu.RLock() - entry, ok := mutationRegistry[rt] - mutationRegistryMu.RUnlock() - - if !ok { - return nil, ErrMutationNotRegistered - } - - if err := entry.handler(grain, msg); err != nil { - return nil, err - } - - if entry.updateTotals { - grain.UpdateTotals() - } - - return grain, nil -} - -// RegisteredMutations returns metadata for all registered mutations (snapshot). -func RegisteredMutations() []string { - mutationRegistryMu.RLock() - defer mutationRegistryMu.RUnlock() - out := make([]string, 0, len(mutationRegistry)) - for _, entry := range mutationRegistry { - out = append(out, entry.name) - } - return out -} - -// RegisteredMutationTypes returns the reflect.Type list of all registered messages. -// Useful for coverage tests ensuring expected set matches actual set. -func RegisteredMutationTypes() []reflect.Type { - mutationRegistryMu.RLock() - defer mutationRegistryMu.RUnlock() - out := make([]reflect.Type, 0, len(mutationRegistry)) - for t := range mutationRegistry { - out = append(out, t) - } - return out -} - -// MustAssertMutationCoverage can be called at startup to ensure every expected -// mutation type has been registered. It panics with a descriptive message if any -// are missing. Provide a slice of prototype pointers (e.g. []*messages.AddItem{nil} ...) -func MustAssertMutationCoverage(expected []interface{}) { - mutationRegistryMu.RLock() - defer mutationRegistryMu.RUnlock() - - missing := make([]string, 0) - for _, ex := range expected { - if ex == nil { - continue - } - t := indirectType(reflect.TypeOf(ex)) - if _, ok := mutationRegistry[t]; !ok { - missing = append(missing, t.String()) - } - } - if len(missing) > 0 { - panic(fmt.Sprintf("mutation registry missing handlers for: %v", missing)) - } -} - -// indirectType returns the element type if given a pointer; otherwise the type itself. -func indirectType(t reflect.Type) reflect.Type { - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - return t -} - -/* -Integration Guide ------------------ - -1. Register all existing mutations: - - func init() { - RegisterMutation[*messages.AddItem]("AddItem", - func(g *CartGrain, m *messages.AddItem) error { - // (port logic from existing switch branch) - // ... - return nil - }, - WithTotals(), - ) - // ... repeat for others - } - -2. In CartGrain.Apply (early in the method) add: - - if updated, err := ApplyRegistered(c, content); err == nil { - return updated, nil - } else if err != ErrMutationNotRegistered { - return nil, err - } - - // existing switch fallback below - -3. Once all mutations are registered, remove the legacy switch cases - and leave a single ErrMutationNotRegistered path for unknown types. - -4. Add a coverage test (see docs for example; removed from source for clarity). -5. (Optional) Add metrics / tracing wrappers for handlers. - -*/ diff --git a/cmd/cart/mutation_remove_delivery.go b/cmd/cart/mutation_remove_delivery.go index 5f4f562..afea6b6 100644 --- a/cmd/cart/mutation_remove_delivery.go +++ b/cmd/cart/mutation_remove_delivery.go @@ -25,29 +25,24 @@ import ( // Future considerations: // - If delivery pricing logic changes (e.g., dynamic taxes per delivery), // UpdateTotals() may need enhancement to incorporate delivery tax properly. -func init() { - RegisterMutation[messages.RemoveDelivery]( - "RemoveDelivery", - func(g *CartGrain, m *messages.RemoveDelivery) error { - if m == nil { - return fmt.Errorf("RemoveDelivery: nil payload") - } - targetID := int(m.Id) - index := -1 - for i, d := range g.Deliveries { - if d.Id == targetID { - index = i - break - } - } - if index == -1 { - return fmt.Errorf("RemoveDelivery: delivery id %d not found", m.Id) - } - // Remove delivery (order not preserved beyond necessity) - g.Deliveries = append(g.Deliveries[:index], g.Deliveries[index+1:]...) - return nil - }, - WithTotals(), - ) +func RemoveDelivery(g *CartGrain, m *messages.RemoveDelivery) error { + if m == nil { + return fmt.Errorf("RemoveDelivery: nil payload") + } + targetID := int(m.Id) + index := -1 + for i, d := range g.Deliveries { + if d.Id == targetID { + index = i + break + } + } + if index == -1 { + return fmt.Errorf("RemoveDelivery: delivery id %d not found", m.Id) + } + + // Remove delivery (order not preserved beyond necessity) + g.Deliveries = append(g.Deliveries[:index], g.Deliveries[index+1:]...) + return nil } diff --git a/cmd/cart/mutation_remove_item.go b/cmd/cart/mutation_remove_item.go index 0e9913d..ebd77fe 100644 --- a/cmd/cart/mutation_remove_item.go +++ b/cmd/cart/mutation_remove_item.go @@ -21,29 +21,24 @@ import ( // semantics require pruning delivery.item_ids you can extend this handler. // - If multiple lines somehow shared the same Id (should not happen), only // the first match would be removed—data integrity relies on unique line Ids. -func init() { - RegisterMutation[messages.RemoveItem]( - "RemoveItem", - func(g *CartGrain, m *messages.RemoveItem) error { - if m == nil { - return fmt.Errorf("RemoveItem: nil payload") - } - targetID := int(m.Id) - index := -1 - for i, it := range g.Items { - if it.Id == targetID { - index = i - break - } - } - if index == -1 { - return fmt.Errorf("RemoveItem: item id %d not found", m.Id) - } +func RemoveItem(g *CartGrain, m *messages.RemoveItem) error { + if m == nil { + return fmt.Errorf("RemoveItem: nil payload") + } + targetID := int(m.Id) - g.Items = append(g.Items[:index], g.Items[index+1:]...) - return nil - }, - WithTotals(), - ) + index := -1 + for i, it := range g.Items { + if it.Id == targetID { + index = i + break + } + } + if index == -1 { + return fmt.Errorf("RemoveItem: item id %d not found", m.Id) + } + + g.Items = append(g.Items[:index], g.Items[index+1:]...) + return nil } diff --git a/cmd/cart/mutation_set_cart_items.go b/cmd/cart/mutation_set_cart_items.go index edaf352..edba61d 100644 --- a/cmd/cart/mutation_set_cart_items.go +++ b/cmd/cart/mutation_set_cart_items.go @@ -1,11 +1,5 @@ package main -import ( - "fmt" - - messages "git.tornberg.me/go-cart-actor/pkg/messages" -) - // mutation_set_cart_items.go // // Registers the SetCartRequest mutation. This mutation replaces the entire list @@ -25,33 +19,28 @@ import ( // - Deliveries might reference item IDs that are now invalid—original logic // also left deliveries untouched. If that becomes an issue, add a cleanup // pass to remove deliveries whose item IDs no longer exist. -func init() { - RegisterMutation[messages.SetCartRequest]( - "SetCartRequest", - func(g *CartGrain, m *messages.SetCartRequest) error { - if m == nil { - return fmt.Errorf("SetCartRequest: nil payload") - } - // Clear current items (keep deliveries) - g.mu.Lock() - g.Items = make([]*CartItem, 0, len(m.Items)) - g.mu.Unlock() +// func HandleSetCartRequest(g *CartGrain, m *messages.SetCartRequest) error { +// if m == nil { +// return fmt.Errorf("SetCartRequest: nil payload") +// } - for _, it := range m.Items { - if it == nil { - continue - } - if it.Sku == "" || it.Quantity < 1 { - return fmt.Errorf("SetCartRequest: invalid item (sku='%s' qty=%d)", it.Sku, it.Quantity) - } - _, err := g.AddItem(it.Sku, int(it.Quantity), it.Country, it.StoreId) - if err != nil { - return fmt.Errorf("SetCartRequest: add sku '%s' failed: %w", it.Sku, err) - } - } - return nil - }, - WithTotals(), - ) -} +// // Clear current items (keep deliveries) +// g.mu.Lock() +// g.Items = make([]*CartItem, 0, len(m.Items)) +// g.mu.Unlock() + +// for _, it := range m.Items { +// if it == nil { +// continue +// } +// if it.Sku == "" || it.Quantity < 1 { +// return fmt.Errorf("SetCartRequest: invalid item (sku='%s' qty=%d)", it.Sku, it.Quantity) +// } +// _, err := g.AddItem(it.Sku, int(it.Quantity), it.Country, it.StoreId) +// if err != nil { +// return fmt.Errorf("SetCartRequest: add sku '%s' failed: %w", it.Sku, err) +// } +// } +// return nil +// } diff --git a/cmd/cart/mutation_set_delivery.go b/cmd/cart/mutation_set_delivery.go index 72b5293..462e869 100644 --- a/cmd/cart/mutation_set_delivery.go +++ b/cmd/cart/mutation_set_delivery.go @@ -39,63 +39,58 @@ import ( // - Variable delivery pricing (based on weight, distance, provider, etc.) // - Validation of provider codes // - Multi-currency delivery pricing -func init() { - RegisterMutation[messages.SetDelivery]( - "SetDelivery", - func(g *CartGrain, m *messages.SetDelivery) error { - if m == nil { - return fmt.Errorf("SetDelivery: nil payload") - } - if m.Provider == "" { - return fmt.Errorf("SetDelivery: provider is empty") - } - withDelivery := g.ItemsWithDelivery() - targetItems := make([]int, 0) +func SetDelivery(g *CartGrain, m *messages.SetDelivery) error { + if m == nil { + return fmt.Errorf("SetDelivery: nil payload") + } + if m.Provider == "" { + return fmt.Errorf("SetDelivery: provider is empty") + } - if len(m.Items) == 0 { - // Use every item currently without a delivery - targetItems = append(targetItems, g.ItemsWithoutDelivery()...) - } else { - // Validate explicit list - for _, id64 := range m.Items { - id := int(id64) - found := false - for _, it := range g.Items { - if it.Id == id { - found = true - break - } - } - if !found { - return fmt.Errorf("SetDelivery: item id %d not found", id) - } - if slices.Contains(withDelivery, id) { - return fmt.Errorf("SetDelivery: item id %d already has a delivery", id) - } - targetItems = append(targetItems, id) + withDelivery := g.ItemsWithDelivery() + targetItems := make([]int, 0) + + if len(m.Items) == 0 { + // Use every item currently without a delivery + targetItems = append(targetItems, g.ItemsWithoutDelivery()...) + } else { + // Validate explicit list + for _, id64 := range m.Items { + id := int(id64) + found := false + for _, it := range g.Items { + if it.Id == id { + found = true + break } } - - if len(targetItems) == 0 { - return fmt.Errorf("SetDelivery: no eligible items to attach") + if !found { + return fmt.Errorf("SetDelivery: item id %d not found", id) } + if slices.Contains(withDelivery, id) { + return fmt.Errorf("SetDelivery: item id %d already has a delivery", id) + } + targetItems = append(targetItems, id) + } + } - // Append new delivery - g.mu.Lock() - g.lastDeliveryId++ - newId := g.lastDeliveryId - g.Deliveries = append(g.Deliveries, &CartDelivery{ - Id: newId, - Provider: m.Provider, - PickupPoint: m.PickupPoint, - Price: 4900, // TODO: externalize pricing - Items: targetItems, - }) - g.mu.Unlock() + if len(targetItems) == 0 { + return fmt.Errorf("SetDelivery: no eligible items to attach") + } - return nil - }, - WithTotals(), - ) + // Append new delivery + g.mu.Lock() + g.lastDeliveryId++ + newId := g.lastDeliveryId + g.Deliveries = append(g.Deliveries, &CartDelivery{ + Id: newId, + Provider: m.Provider, + PickupPoint: m.PickupPoint, + Price: 4900, // TODO: externalize pricing + Items: targetItems, + }) + g.mu.Unlock() + + return nil } diff --git a/cmd/cart/mutation_set_pickup_point.go b/cmd/cart/mutation_set_pickup_point.go index 5bbe8b9..71c5be8 100644 --- a/cmd/cart/mutation_set_pickup_point.go +++ b/cmd/cart/mutation_set_pickup_point.go @@ -28,29 +28,24 @@ import ( // - Validate pickup point fields (country code, zip format, etc.) // - Track history / audit of pickup point changes // - Trigger delivery price adjustments (which would then require WithTotals()). -func init() { - RegisterMutation[messages.SetPickupPoint]( - "SetPickupPoint", - func(g *CartGrain, m *messages.SetPickupPoint) error { - if m == nil { - return fmt.Errorf("SetPickupPoint: nil payload") - } - for _, d := range g.Deliveries { - if d.Id == int(m.DeliveryId) { - d.PickupPoint = &messages.PickupPoint{ - Id: m.Id, - Name: m.Name, - Address: m.Address, - City: m.City, - Zip: m.Zip, - Country: m.Country, - } - return nil - } +func SetPickupPoint(g *CartGrain, m *messages.SetPickupPoint) error { + if m == nil { + return fmt.Errorf("SetPickupPoint: nil payload") + } + + for _, d := range g.Deliveries { + if d.Id == int(m.DeliveryId) { + d.PickupPoint = &messages.PickupPoint{ + Id: m.Id, + Name: m.Name, + Address: m.Address, + City: m.City, + Zip: m.Zip, + Country: m.Country, } - return fmt.Errorf("SetPickupPoint: delivery id %d not found", m.DeliveryId) - }, - // No WithTotals(): pickup point does not change pricing / tax. - ) + return nil + } + } + return fmt.Errorf("SetPickupPoint: delivery id %d not found", m.DeliveryId) } diff --git a/cmd/cart/pool-server.go b/cmd/cart/pool-server.go index a344f40..5dc825a 100644 --- a/cmd/cart/pool-server.go +++ b/cmd/cart/pool-server.go @@ -11,6 +11,7 @@ import ( "git.tornberg.me/go-cart-actor/pkg/actor" messages "git.tornberg.me/go-cart-actor/pkg/messages" + "github.com/gogo/protobuf/proto" ) type PoolServer struct { @@ -27,7 +28,7 @@ func NewPoolServer(pool actor.GrainPool[*CartGrain], pod_name string, klarnaClie } } -func (s *PoolServer) ApplyLocal(id CartId, mutation interface{}) (*CartGrain, error) { +func (s *PoolServer) ApplyLocal(id CartId, mutation proto.Message) (*CartGrain, error) { return s.pool.Apply(uint64(id), mutation) } @@ -42,7 +43,11 @@ func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request, id CartId func (s *PoolServer) HandleAddSku(w http.ResponseWriter, r *http.Request, id CartId) error { sku := r.PathValue("sku") - data, err := s.ApplyLocal(id, &messages.AddRequest{Sku: sku, Quantity: 1}) + msg, err := GetItemAddMessage(sku, 1, getCountryFromHost(r.Host), nil) + if err != nil { + return err + } + data, err := s.ApplyLocal(id, msg) if err != nil { return err } @@ -80,7 +85,7 @@ func (s *PoolServer) HandleDeleteItem(w http.ResponseWriter, r *http.Request, id return s.WriteResult(w, data) } -type SetDelivery struct { +type SetDeliveryRequest struct { Provider string `json:"provider"` Items []int64 `json:"items"` PickupPoint *messages.PickupPoint `json:"pickupPoint,omitempty"` @@ -88,7 +93,7 @@ type SetDelivery struct { func (s *PoolServer) HandleSetDelivery(w http.ResponseWriter, r *http.Request, id CartId) error { - delivery := SetDelivery{} + delivery := SetDeliveryRequest{} err := json.NewDecoder(r.Body).Decode(&delivery) if err != nil { return err @@ -269,8 +274,7 @@ func (s *PoolServer) ApplyCheckoutStarted(klarnaOrder *CheckoutOrder, id CartId) // w.Header().Set("Content-Type", "application/json") // return json.NewEncoder(w).Encode(klarnaOrder) // } -// - +// func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { diff --git a/data/14958337247011543113.events.log b/data/14958337247011543113.events.log new file mode 100644 index 0000000..dd07e7b --- /dev/null +++ b/data/14958337247011543113.events.log @@ -0,0 +1 @@ +{"type":"AddItem","timestamp":"2025-10-13T15:25:09.772277+02:00","mutation":{"item_id":789396,"quantity":1,"price":18600,"sku":"789396","name":"Samsung Galaxy Z Fold6 Slim S-Pen fodral (grått)","image":"/image/dv_web_D18000128131832/789396/samsung-galaxy-z-fold6-slim-s-pen-fodral-gratt.jpg","tax":2500,"brand":"Samsung","category":"Mobiler, Tablets \u0026 Smartklockor","category2":"Mobiltillbehör","category3":"Mobilskal \u0026 Mobilfodral","articleType":"ZHAW","sellerId":"152","sellerName":"Elgiganten"}} diff --git a/pkg/actor/disk_storage.go b/pkg/actor/disk_storage.go new file mode 100644 index 0000000..7ee9207 --- /dev/null +++ b/pkg/actor/disk_storage.go @@ -0,0 +1,62 @@ +package actor + +import ( + "errors" + "fmt" + "log" + "os" + "path/filepath" + + "github.com/gogo/protobuf/proto" +) + +type DiskStorage[V any] struct { + *StateStorage + path string +} + +type LogStorage[V any] interface { + LoadEvents(id uint64, grain Grain[V]) error + AppendEvent(id uint64, msg proto.Message) error +} + +func NewDiskStorage[V any](path string, registry MutationRegistry) LogStorage[V] { + return &DiskStorage[V]{ + StateStorage: NewState(registry), + path: path, + } +} + +func (s *DiskStorage[V]) logPath(id uint64) string { + return filepath.Join(s.path, fmt.Sprintf("%d.events.log", id)) +} + +func (s *DiskStorage[V]) LoadEvents(id uint64, grain Grain[V]) error { + path := s.logPath(id) + if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { + // No log -> nothing to replay + return nil + } + + fh, err := os.Open(path) + if err != nil { + return fmt.Errorf("open replay file: %w", err) + } + defer fh.Close() + return s.Load(fh, func(msg proto.Message) { + s.registry.Apply(grain, msg) + }) +} + +func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error { + path := s.logPath(id) + fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Printf("failed to open event log file: %v", err) + return err + } + defer fh.Close() + + return s.Append(fh, msg) + +} diff --git a/pkg/actor/grain.go b/pkg/actor/grain.go index c5d7967..0d9c611 100644 --- a/pkg/actor/grain.go +++ b/pkg/actor/grain.go @@ -1,10 +1,13 @@ package actor -import "time" +import ( + "time" + //"github.com/gogo/protobuf/proto" +) type Grain[V any] interface { GetId() uint64 - Apply(content any, isReplay bool) (*V, error) + //Apply(content proto.Message, isReplay bool) (*V, error) GetLastAccess() time.Time GetLastChange() time.Time GetCurrentState() (*V, error) diff --git a/pkg/actor/grain_pool.go b/pkg/actor/grain_pool.go index 3a40c95..98366f7 100644 --- a/pkg/actor/grain_pool.go +++ b/pkg/actor/grain_pool.go @@ -2,10 +2,12 @@ package actor import ( "net/http" + + "github.com/gogo/protobuf/proto" ) type GrainPool[V any] interface { - Apply(id uint64, mutation any) (V, error) + Apply(id uint64, mutation proto.Message) (V, error) Get(id uint64) (V, error) OwnerHost(id uint64) (Host, bool) Hostname() string diff --git a/pkg/actor/mutation_registry.go b/pkg/actor/mutation_registry.go new file mode 100644 index 0000000..e4f1172 --- /dev/null +++ b/pkg/actor/mutation_registry.go @@ -0,0 +1,204 @@ +package actor + +import ( + "fmt" + "log" + "reflect" + "sync" + + "github.com/gogo/protobuf/proto" +) + +type MutationRegistry interface { + Apply(grain any, msg proto.Message) error + RegisterMutations(handlers ...MutationHandler) + Create(typeName string) (proto.Message, bool) + GetTypeName(msg proto.Message) (string, bool) + //GetStorageEvent(msg proto.Message) StorageEvent + //FromStorageEvent(event StorageEvent) (proto.Message, error) +} + +type ProtoMutationRegistry struct { + mutationRegistryMu sync.RWMutex + mutationRegistry map[reflect.Type]MutationHandler +} + +var ( + ErrMutationNotRegistered = fmt.Errorf("mutation not registered") +) + +// MutationOption configures additional behavior for a registered mutation. +type MutationOption func(*mutationOptions) + +// mutationOptions holds flags adjustable per registration. +type mutationOptions struct { + updateTotals bool +} + +// WithTotals ensures CartGrain.UpdateTotals() is called after a successful handler. +func WithTotals() MutationOption { + return func(o *mutationOptions) { + o.updateTotals = true + } +} + +type MutationHandler interface { + Handle(state any, msg proto.Message) error + Name() string + Type() reflect.Type + Create() proto.Message +} + +// RegisteredMutation stores metadata + the execution closure. +type RegisteredMutation[V any, T proto.Message] struct { + name string + handler func(*V, T) error + create func() T + msgType reflect.Type +} + +func NewMutation[V any, T proto.Message](handler func(*V, T) error, create func() T) *RegisteredMutation[V, T] { + // Derive the name and message type from a concrete instance produced by create(). + // This avoids relying on reflect.TypeFor (which can yield unexpected results in some toolchains) + // and ensures we always peel off the pointer layer for proto messages. + instance := create() + rt := reflect.TypeOf(instance) + if rt.Kind() == reflect.Ptr { + rt = rt.Elem() + } + return &RegisteredMutation[V, T]{ + name: rt.Name(), + handler: handler, + create: create, + msgType: rt, + } +} + +func (m *RegisteredMutation[V, T]) Handle(state any, msg proto.Message) error { + return m.handler(state.(*V), msg.(T)) +} + +func (m *RegisteredMutation[V, T]) Name() string { + return m.name +} + +func (m *RegisteredMutation[V, T]) Create() proto.Message { + return m.create() +} + +func (m *RegisteredMutation[V, T]) Type() reflect.Type { + return m.msgType +} + +func NewMutationRegistry() MutationRegistry { + return &ProtoMutationRegistry{ + mutationRegistry: make(map[reflect.Type]MutationHandler), + mutationRegistryMu: sync.RWMutex{}, + } +} + +func (r *ProtoMutationRegistry) RegisterMutations(handlers ...MutationHandler) { + r.mutationRegistryMu.Lock() + defer r.mutationRegistryMu.Unlock() + + for _, handler := range handlers { + r.mutationRegistry[handler.Type()] = handler + } +} + +func (r *ProtoMutationRegistry) GetTypeName(msg proto.Message) (string, bool) { + r.mutationRegistryMu.RLock() + defer r.mutationRegistryMu.RUnlock() + + rt := indirectType(reflect.TypeOf(msg)) + if handler, ok := r.mutationRegistry[rt]; ok { + return handler.Name(), true + } + return "", false +} + +func (r *ProtoMutationRegistry) getHandler(typeName string) MutationHandler { + r.mutationRegistryMu.Lock() + defer r.mutationRegistryMu.Unlock() + + for _, handler := range r.mutationRegistry { + if handler.Name() == typeName { + return handler + } + } + + return nil +} + +func (r *ProtoMutationRegistry) Create(typeName string) (proto.Message, bool) { + + handler := r.getHandler(typeName) + if handler == nil { + log.Printf("missing handler for %s", typeName) + return nil, false + } + + return handler.Create(), true +} + +// ApplyRegistered attempts to apply a registered mutation. +// Returns updated grain if successful. +// +// If the mutation is not registered, returns (nil, ErrMutationNotRegistered). +func (r *ProtoMutationRegistry) Apply(grain any, msg proto.Message) error { + if grain == nil { + return fmt.Errorf("nil grain") + } + if msg == nil { + return fmt.Errorf("nil mutation message") + } + + rt := indirectType(reflect.TypeOf(msg)) + r.mutationRegistryMu.RLock() + entry, ok := r.mutationRegistry[rt] + r.mutationRegistryMu.RUnlock() + + if !ok { + return ErrMutationNotRegistered + } + + if err := entry.Handle(grain, msg); err != nil { + return err + } + + // if entry.updateTotals { + // grain.UpdateTotals() + // } + + return nil +} + +// RegisteredMutations returns metadata for all registered mutations (snapshot). +func (r *ProtoMutationRegistry) RegisteredMutations() []string { + r.mutationRegistryMu.RLock() + defer r.mutationRegistryMu.RUnlock() + out := make([]string, 0, len(r.mutationRegistry)) + for _, entry := range r.mutationRegistry { + out = append(out, entry.Name()) + } + return out +} + +// RegisteredMutationTypes returns the reflect.Type list of all registered messages. +// Useful for coverage tests ensuring expected set matches actual set. +func (r *ProtoMutationRegistry) RegisteredMutationTypes() []reflect.Type { + r.mutationRegistryMu.RLock() + defer r.mutationRegistryMu.RUnlock() + out := make([]reflect.Type, 0, len(r.mutationRegistry)) + for _, entry := range r.mutationRegistry { + out = append(out, entry.Type()) + } + return out +} + +func indirectType(t reflect.Type) reflect.Type { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + return t +} diff --git a/pkg/actor/mutation_registry_test.go b/pkg/actor/mutation_registry_test.go new file mode 100644 index 0000000..99720b8 --- /dev/null +++ b/pkg/actor/mutation_registry_test.go @@ -0,0 +1,151 @@ +package actor + +import ( + "errors" + "reflect" + "testing" + + "git.tornberg.me/go-cart-actor/pkg/messages" +) + +type cartState struct { + calls int + lastAdded *messages.AddItem +} + +func TestRegisteredMutationBasics(t *testing.T) { + reg := NewMutationRegistry().(*ProtoMutationRegistry) + + addItemMutation := NewMutation[cartState, *messages.AddItem]( + func(state *cartState, msg *messages.AddItem) error { + state.calls++ + // copy to avoid external mutation side-effects (not strictly necessary for the test) + cp := *msg + state.lastAdded = &cp + return nil + }, + func() *messages.AddItem { return &messages.AddItem{} }, + ) + + // Sanity check on mutation metadata + if addItemMutation.Name() != "AddItem" { + t.Fatalf("expected mutation Name() == AddItem, got %s", addItemMutation.Name()) + } + if got, want := addItemMutation.Type(), reflect.TypeOf(messages.AddItem{}); got != want { + t.Fatalf("expected Type() == %v, got %v", want, got) + } + + reg.RegisterMutations(addItemMutation) + + // RegisteredMutations: membership (order not guaranteed) + names := reg.RegisteredMutations() + if !stringSliceContains(names, "AddItem") { + t.Fatalf("RegisteredMutations missing AddItem, got %v", names) + } + + // RegisteredMutationTypes: membership (order not guaranteed) + types := reg.RegisteredMutationTypes() + if !typeSliceContains(types, reflect.TypeOf(messages.AddItem{})) { + t.Fatalf("RegisteredMutationTypes missing AddItem type, got %v", types) + } + + // GetTypeName should resolve for a pointer instance + name, ok := reg.GetTypeName(&messages.AddItem{}) + if !ok || name != "AddItem" { + t.Fatalf("GetTypeName returned (%q,%v), expected (AddItem,true)", name, ok) + } + + // GetTypeName should fail for unregistered type + if name, ok := reg.GetTypeName(&messages.Noop{}); ok || name != "" { + t.Fatalf("expected GetTypeName to fail for unregistered message, got (%q,%v)", name, ok) + } + + // Create by name + msg, ok := reg.Create("AddItem") + if !ok { + t.Fatalf("Create failed for registered mutation") + } + if _, isAddItem := msg.(*messages.AddItem); !isAddItem { + t.Fatalf("Create returned wrong concrete type: %T", msg) + } + + // Create unknown + if m2, ok := reg.Create("Unknown"); ok || m2 != nil { + t.Fatalf("Create should fail for unknown mutation, got (%T,%v)", m2, ok) + } + + // Apply happy path + state := &cartState{} + add := &messages.AddItem{ItemId: 42, Quantity: 3, Sku: "ABC"} + if err := reg.Apply(state, add); err != nil { + t.Fatalf("Apply returned error: %v", err) + } + if state.calls != 1 { + t.Fatalf("handler not invoked expected calls=1 got=%d", state.calls) + } + if state.lastAdded == nil || state.lastAdded.ItemId != 42 || state.lastAdded.Quantity != 3 { + t.Fatalf("state not updated correctly: %+v", state.lastAdded) + } + + // Apply nil grain + if err := reg.Apply(nil, add); err == nil { + t.Fatalf("expected error for nil grain") + } + + // Apply nil message + if err := reg.Apply(state, nil); err == nil { + t.Fatalf("expected error for nil mutation message") + } + + // Apply unregistered message + if err := reg.Apply(state, &messages.Noop{}); !errors.Is(err, ErrMutationNotRegistered) { + t.Fatalf("expected ErrMutationNotRegistered, got %v", err) + } +} + +// func TestConcurrentSafeRegistrationLookup(t *testing.T) { +// // This test is light-weight; it ensures locks don't deadlock under simple concurrent access. +// reg := NewMutationRegistry().(*ProtoMutationRegistry) +// mut := NewMutation[cartState, *messages.Noop]( +// func(state *cartState, msg *messages.Noop) error { state.calls++; return nil }, +// func() *messages.Noop { return &messages.Noop{} }, +// ) +// reg.RegisterMutations(mut) + +// done := make(chan struct{}) +// const workers = 25 +// for i := 0; i < workers; i++ { +// go func() { +// for j := 0; j < 100; j++ { +// _, _ = reg.Create("Noop") +// _, _ = reg.GetTypeName(&messages.Noop{}) +// _ = reg.Apply(&cartState{}, &messages.Noop{}) +// } +// done <- struct{}{} +// }() +// } + +// for i := 0; i < workers; i++ { +// <-done +// } +// } + +// Helpers + +func stringSliceContains(list []string, target string) bool { + for _, s := range list { + if s == target { + return true + } + } + return false +} + +func typeSliceContains(list []reflect.Type, target reflect.Type) bool { + for _, t := range list { + if t == target { + return true + } + } + return false +} diff --git a/pkg/actor/simple_grain_pool.go b/pkg/actor/simple_grain_pool.go index 68a8a2b..5b0a21c 100644 --- a/pkg/actor/simple_grain_pool.go +++ b/pkg/actor/simple_grain_pool.go @@ -6,17 +6,20 @@ import ( "maps" "sync" "time" + + "github.com/gogo/protobuf/proto" ) type SimpleGrainPool[V any] struct { // fields and methods - localMu sync.RWMutex - grains map[uint64]Grain[V] - - spawn func(id uint64) (Grain[V], error) - spawnHost func(host string) (Host, error) - ttl time.Duration - poolSize int + localMu sync.RWMutex + grains map[uint64]Grain[V] + mutationRegistry MutationRegistry + spawn func(id uint64) (Grain[V], error) + spawnHost func(host string) (Host, error) + storage LogStorage[V] + ttl time.Duration + poolSize int // Cluster coordination -------------------------------------------------- hostname string @@ -29,17 +32,28 @@ type SimpleGrainPool[V any] struct { purgeTicker *time.Ticker } -func NewSimpleGrainPool[V any](size int, ttl time.Duration, hostname string, spawn func(id uint64) (Grain[V], error), spawnHost func(host string) (Host, error)) (*SimpleGrainPool[V], error) { - p := &SimpleGrainPool[V]{ - grains: make(map[uint64]Grain[V]), +type GrainPoolConfig[V any] struct { + Hostname string + Spawn func(id uint64) (Grain[V], error) + SpawnHost func(host string) (Host, error) + TTL time.Duration + PoolSize int + MutationRegistry MutationRegistry + Storage LogStorage[V] +} - spawn: spawn, - spawnHost: spawnHost, - ttl: ttl, - poolSize: size, - hostname: hostname, - remoteOwners: make(map[uint64]Host), - remoteHosts: make(map[string]Host), +func NewSimpleGrainPool[V any](config GrainPoolConfig[V]) (*SimpleGrainPool[V], error) { + p := &SimpleGrainPool[V]{ + grains: make(map[uint64]Grain[V]), + mutationRegistry: config.MutationRegistry, + storage: config.Storage, + spawn: config.Spawn, + spawnHost: config.SpawnHost, + ttl: config.TTL, + poolSize: config.PoolSize, + hostname: config.Hostname, + remoteOwners: make(map[uint64]Host), + remoteHosts: make(map[string]Host), } p.purgeTicker = time.NewTicker(time.Minute) @@ -344,38 +358,22 @@ func (p *SimpleGrainPool[V]) getOrClaimGrain(id uint64) (Grain[V], error) { return grain, nil } -// ErrNotOwner is returned when a cart belongs to another host. -var ErrNotOwner = fmt.Errorf("not owner") +// // ErrNotOwner is returned when a cart belongs to another host. +// var ErrNotOwner = fmt.Errorf("not owner") // Apply applies a mutation to a grain. -func (p *SimpleGrainPool[V]) Apply(id uint64, mutation any) (*V, error) { +func (p *SimpleGrainPool[V]) Apply(id uint64, mutation proto.Message) (*V, error) { grain, err := p.getOrClaimGrain(id) if err != nil { return nil, err } - //start := time.Now() - result, applyErr := grain.Apply(mutation, false) - //mutationType := "unknown" - // if mutation != nil { - // if t := reflect.TypeOf(mutation); t != nil { - // if t.Kind() == reflect.Pointer { - // t = t.Elem() - // } - // if t.Name() != "" { - // mutationType = t.Name() - // } - // } - // } - // cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds()) - - // if applyErr == nil && result != nil { - // cartMutationsTotal.Inc() - - // } else if applyErr != nil { - // cartMutationFailuresTotal.Inc() - // } - - return result, applyErr + if applyErr := p.mutationRegistry.Apply(grain, mutation); applyErr != nil { + return nil, applyErr + } + if err := p.storage.AppendEvent(id, mutation); err != nil { + log.Printf("failed to store mutation for grain %d: %v", id, err) + } + return grain.GetCurrentState() } // Get returns the current state of a grain. diff --git a/pkg/actor/state.go b/pkg/actor/state.go new file mode 100644 index 0000000..6303b30 --- /dev/null +++ b/pkg/actor/state.go @@ -0,0 +1,86 @@ +package actor + +import ( + "encoding/json" + "errors" + "io" + "time" + + "github.com/gogo/protobuf/proto" +) + +type StateStorage struct { + registry MutationRegistry +} + +type StorageEvent struct { + Type string `json:"type"` + TimeStamp time.Time `json:"timestamp"` + Mutation proto.Message `json:"mutation"` +} + +type rawEvent struct { + Type string `json:"type"` + TimeStamp time.Time `json:"timestamp"` + Mutation json.RawMessage `json:"mutation"` +} + +func NewState(registry MutationRegistry) *StateStorage { + return &StateStorage{ + registry: registry, + } +} + +var ErrUnknownType = errors.New("unknown type") + +func (s *StateStorage) Load(r io.Reader, onMessage func(msg proto.Message)) error { + var err error + for err == nil { + evt, err := s.Read(r) + if err == nil { + onMessage(evt.Mutation) + } + + } + return err +} + +func (s *StateStorage) Append(io io.Writer, mutation proto.Message) error { + typeName, ok := s.registry.GetTypeName(mutation) + if !ok { + return ErrUnknownType + } + event := &StorageEvent{ + Type: typeName, + TimeStamp: time.Now(), + Mutation: mutation, + } + jsonBytes, err := json.Marshal(event) + if err != nil { + return err + } + if _, err := io.Write(jsonBytes); err != nil { + return err + } + return nil +} + +func (s *StateStorage) Read(r io.Reader) (*StorageEvent, error) { + var event rawEvent + if err := json.NewDecoder(r).Decode(&event); err != nil { + return nil, err + } + typeName := event.Type + mutation, ok := s.registry.Create(typeName) + if !ok { + return nil, ErrUnknownType + } + if err := json.Unmarshal(event.Mutation, mutation); err != nil { + return nil, err + } + return &StorageEvent{ + Type: typeName, + TimeStamp: event.TimeStamp, + Mutation: mutation, + }, nil +} diff --git a/pkg/proxy/remotehost.go b/pkg/proxy/remotehost.go index 8f41f99..84cc095 100644 --- a/pkg/proxy/remotehost.go +++ b/pkg/proxy/remotehost.go @@ -17,13 +17,13 @@ import ( // RemoteHost mirrors the lightweight controller used for remote node // interaction. type RemoteHost struct { - Host string + host string httpBase string conn *grpc.ClientConn transport *http.Transport client *http.Client controlClient messages.ControlPlaneClient - MissedPings int + missedPings int } func NewRemoteHost(host string) (*RemoteHost, error) { @@ -38,22 +38,6 @@ func NewRemoteHost(host string) (*RemoteHost, error) { } controlClient := messages.NewControlPlaneClient(conn) - // go func() { - // for retries := range 3 { - // ctx, pingCancel := context.WithTimeout(context.Background(), time.Second) - // _, pingErr := controlClient.Ping(ctx, &messages.Empty{}) - // pingCancel() - // if pingErr == nil { - // break - // } - // if retries == 2 { - // log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr) - // conn.Close() - // p - // } - // time.Sleep(500 * time.Millisecond) - // } - // }() transport := &http.Transport{ MaxIdleConns: 100, @@ -64,18 +48,18 @@ func NewRemoteHost(host string) (*RemoteHost, error) { client := &http.Client{Transport: transport, Timeout: 10 * time.Second} return &RemoteHost{ - Host: host, + host: host, httpBase: fmt.Sprintf("http://%s:8080/cart", host), conn: conn, transport: transport, client: client, controlClient: controlClient, - MissedPings: 0, + missedPings: 0, }, nil } func (h *RemoteHost) Name() string { - return h.Host + return h.host } func (h *RemoteHost) Close() error { @@ -92,8 +76,8 @@ func (h *RemoteHost) Ping() bool { _, err = h.controlClient.Ping(ctx, &messages.Empty{}) cancel() if err != nil { - h.MissedPings++ - log.Printf("Ping %s failed (%d) %v", h.Host, h.MissedPings, err) + h.missedPings++ + log.Printf("Ping %s failed (%d) %v", h.host, h.missedPings, err) } if !h.IsHealthy() { return false @@ -101,7 +85,7 @@ func (h *RemoteHost) Ping() bool { time.Sleep(time.Millisecond * 200) } - h.MissedPings = 0 + h.missedPings = 0 return true } @@ -113,11 +97,11 @@ func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) { KnownHosts: knownHosts, }) if err != nil { - h.MissedPings++ - log.Printf("Negotiate %s failed: %v", h.Host, err) + h.missedPings++ + log.Printf("Negotiate %s failed: %v", h.host, err) return nil, err } - h.MissedPings = 0 + h.missedPings = 0 return resp.Hosts, nil } @@ -126,8 +110,8 @@ func (h *RemoteHost) GetActorIds() []uint64 { defer cancel() reply, err := h.controlClient.GetLocalActorIds(ctx, &messages.Empty{}) if err != nil { - log.Printf("Init remote %s: GetCartIds error: %v", h.Host, err) - h.MissedPings++ + log.Printf("Init remote %s: GetCartIds error: %v", h.host, err) + h.missedPings++ return []uint64{} } return reply.GetIds() @@ -135,48 +119,33 @@ func (h *RemoteHost) GetActorIds() []uint64 { func (h *RemoteHost) AnnounceOwnership(uids []uint64) { _, err := h.controlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{ - Host: h.Host, + Host: h.host, Ids: uids, }) if err != nil { - log.Printf("ownership announce to %s failed: %v", h.Host, err) - h.MissedPings++ + log.Printf("ownership announce to %s failed: %v", h.host, err) + h.missedPings++ return } - h.MissedPings = 0 + h.missedPings = 0 } func (h *RemoteHost) AnnounceExpiry(uids []uint64) { _, err := h.controlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{ - Host: h.Host, + Host: h.host, Ids: uids, }) if err != nil { - log.Printf("expiry announce to %s failed: %v", h.Host, err) - h.MissedPings++ + log.Printf("expiry announce to %s failed: %v", h.host, err) + h.missedPings++ return } - h.MissedPings = 0 + h.missedPings = 0 } func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (bool, error) { target := fmt.Sprintf("%s%s", h.httpBase, r.URL.RequestURI()) - // var bodyCopy []byte - // if r.Body != nil && r.Body != http.NoBody { - // var err error - // bodyCopy, err = io.ReadAll(r.Body) - // if err != nil { - // http.Error(w, "proxy read error", http.StatusBadGateway) - // return false, err - // } - // } - // if r.Body != nil { - // r.Body.Close() - // } - // var reqBody io.Reader - // if len(bodyCopy) > 0 { - // reqBody = bytes.NewReader(bodyCopy) - // } + req, err := http.NewRequestWithContext(r.Context(), r.Method, target, r.Body) if err != nil { http.Error(w, "proxy build error", http.StatusBadGateway) @@ -214,5 +183,5 @@ func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (b } func (r *RemoteHost) IsHealthy() bool { - return r.MissedPings < 3 + return r.missedPings < 3 } diff --git a/proto/control_plane.proto b/proto/control_plane.proto index 2649d94..69570fd 100644 --- a/proto/control_plane.proto +++ b/proto/control_plane.proto @@ -12,7 +12,7 @@ option go_package = "git.tornberg.me/go-cart-actor/proto;messages"; // - Liveness (Ping) // - Membership negotiation (Negotiate) // - Deterministic ring-based ownership (ConfirmOwner RPC removed) -// - Cart ID listing for remote grain spawning (GetCartIds) +// - Actor ID listing for remote grain spawning (GetActorIds) // - Graceful shutdown notifications (Closing) // No authentication / TLS is defined initially (can be added later). // -----------------------------------------------------------------------------