diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index 69da05c..966506f 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -246,7 +246,7 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { // reconstruct state from event log if present grain := cart.NewCartGrain(id, time.Now()) - err := fs.storage.LoadEvents(id, grain) + err := fs.storage.LoadEvents(r.Context(), id, grain) if err != nil { writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()}) return diff --git a/cmd/cart/checkout_server.go b/cmd/cart/checkout_server.go index 44d7633..a247be1 100644 --- a/cmd/cart/checkout_server.go +++ b/cmd/cart/checkout_server.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "fmt" "log" @@ -27,12 +28,12 @@ var tpl = ` ` -func (a *App) getGrainFromOrder(order *CheckoutOrder) (*cart.CartGrain, error) { +func (a *App) getGrainFromOrder(ctx context.Context, order *CheckoutOrder) (*cart.CartGrain, error) { cartId, ok := cart.ParseCartId(order.MerchantReference1) if !ok { return nil, fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1) } - grain, err := a.pool.Get(uint64(cartId)) + grain, err := a.pool.Get(ctx, uint64(cartId)) if err != nil { return nil, fmt.Errorf("failed to get cart grain: %w", err) } @@ -73,14 +74,14 @@ func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux, invento return } - err = confirmOrder(order, orderHandler) + err = confirmOrder(r.Context(), order, orderHandler) if err != nil { log.Printf("Error confirming order: %v\n", err) w.WriteHeader(http.StatusInternalServerError) return } - err = triggerOrderCompleted(a.server, order) + err = triggerOrderCompleted(r.Context(), a.server, order) if err != nil { log.Printf("Error processing cart message: %v\n", err) w.WriteHeader(http.StatusInternalServerError) @@ -143,7 +144,7 @@ func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux, invento } log.Printf("Klarna order notification: %s", order.ID) logger.InfoContext(r.Context(), "Klarna order notification received", "order_id", order.ID) - grain, err := a.getGrainFromOrder(order) + grain, err := a.getGrainFromOrder(r.Context(), order) if err != nil { logger.ErrorContext(r.Context(), "Unable to get grain from klarna order", "error", err.Error()) w.WriteHeader(http.StatusInternalServerError) @@ -173,7 +174,7 @@ func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux, invento w.WriteHeader(http.StatusBadRequest) } logger.InfoContext(r.Context(), "Klarna order validation received", "order_id", order.ID, "cart_id", order.MerchantReference1) - grain, err := a.getGrainFromOrder(order) + grain, err := a.getGrainFromOrder(r.Context(), order) if err != nil { logger.ErrorContext(r.Context(), "Unable to get grain from klarna order", "error", err.Error()) w.WriteHeader(http.StatusInternalServerError) diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 66da5b7..c141f9b 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -84,10 +84,13 @@ func main() { reg := cart.NewCartMultationRegistry() reg.RegisterProcessor( - actor.NewMutationProcessor(func(g *cart.CartGrain) error { + actor.NewMutationProcessor(func(ctx context.Context, g *cart.CartGrain) error { + _, span := tracer.Start(ctx, "Totals and promotions") + defer span.End() g.UpdateTotals() - ctx := promotions.NewContextFromCart(g, promotions.WithNow(time.Now()), promotions.WithCustomerSegment("vip")) - _, actions := promotionService.EvaluateAll(promotionData.State.Promotions, ctx) + + promotionCtx := promotions.NewContextFromCart(g, promotions.WithNow(time.Now()), promotions.WithCustomerSegment("vip")) + _, actions := promotionService.EvaluateAll(promotionData.State.Promotions, promotionCtx) for _, action := range actions { log.Printf("apply: %+v", action) g.UpdateTotals() @@ -100,12 +103,14 @@ func main() { poolConfig := actor.GrainPoolConfig[cart.CartGrain]{ MutationRegistry: reg, Storage: diskStorage, - Spawn: func(id uint64) (actor.Grain[cart.CartGrain], error) { + Spawn: func(ctx context.Context, id uint64) (actor.Grain[cart.CartGrain], error) { + _, span := tracer.Start(ctx, fmt.Sprintf("Spawn cart id %d", id)) + defer span.End() grainSpawns.Inc() ret := cart.NewCartGrain(id, time.Now()) // Set baseline lastChange at spawn; replay may update it to last event timestamp. - err := diskStorage.LoadEvents(id, ret) + err := diskStorage.LoadEvents(ctx, id, ret) return ret, err }, @@ -249,7 +254,7 @@ func main() { } -func triggerOrderCompleted(syncedServer *PoolServer, order *CheckoutOrder) error { +func triggerOrderCompleted(ctx context.Context, syncedServer *PoolServer, order *CheckoutOrder) error { mutation := &messages.OrderCreated{ OrderId: order.ID, Status: order.Status, @@ -258,12 +263,12 @@ func triggerOrderCompleted(syncedServer *PoolServer, order *CheckoutOrder) error if !ok { return fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1) } - _, applyErr := syncedServer.Apply(uint64(cid), mutation) + _, applyErr := syncedServer.Apply(ctx, uint64(cid), mutation) return applyErr } -func confirmOrder(order *CheckoutOrder, orderHandler *AmqpOrderHandler) error { +func confirmOrder(ctx context.Context, order *CheckoutOrder, orderHandler *AmqpOrderHandler) error { orderToSend, err := json.Marshal(order) if err != nil { return err diff --git a/cmd/cart/pool-server.go b/cmd/cart/pool-server.go index 2705ea0..11855d7 100644 --- a/cmd/cart/pool-server.go +++ b/cmd/cart/pool-server.go @@ -55,12 +55,12 @@ func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarn } } -func (s *PoolServer) ApplyLocal(id cart.CartId, mutation ...proto.Message) (*actor.MutationResult[*cart.CartGrain], error) { - return s.Apply(uint64(id), mutation...) +func (s *PoolServer) ApplyLocal(ctx context.Context, id cart.CartId, mutation ...proto.Message) (*actor.MutationResult[*cart.CartGrain], error) { + return s.Apply(ctx, uint64(id), mutation...) } func (s *PoolServer) GetCartHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error { - grain, err := s.Get(uint64(id)) + grain, err := s.Get(r.Context(), uint64(id)) if err != nil { return err } @@ -74,7 +74,7 @@ func (s *PoolServer) AddSkuToCartHandler(w http.ResponseWriter, r *http.Request, if err != nil { return err } - data, err := s.ApplyLocal(id, msg) + data, err := s.ApplyLocal(r.Context(), id, msg) if err != nil { return err } @@ -106,7 +106,7 @@ func (s *PoolServer) DeleteItemHandler(w http.ResponseWriter, r *http.Request, i if err != nil { return err } - data, err := s.ApplyLocal(id, &messages.RemoveItem{Id: uint32(itemId)}) + data, err := s.ApplyLocal(r.Context(), id, &messages.RemoveItem{Id: uint32(itemId)}) if err != nil { return err } @@ -126,7 +126,7 @@ func (s *PoolServer) SetDeliveryHandler(w http.ResponseWriter, r *http.Request, if err != nil { return err } - data, err := s.ApplyLocal(id, &messages.SetDelivery{ + data, err := s.ApplyLocal(r.Context(), id, &messages.SetDelivery{ Provider: delivery.Provider, Items: delivery.Items, PickupPoint: delivery.PickupPoint, @@ -149,7 +149,7 @@ func (s *PoolServer) SetPickupPointHandler(w http.ResponseWriter, r *http.Reques if err != nil { return err } - reply, err := s.ApplyLocal(id, &messages.SetPickupPoint{ + reply, err := s.ApplyLocal(r.Context(), id, &messages.SetPickupPoint{ DeliveryId: uint32(deliveryId), Id: pickupPoint.Id, Name: pickupPoint.Name, @@ -171,7 +171,7 @@ func (s *PoolServer) RemoveDeliveryHandler(w http.ResponseWriter, r *http.Reques if err != nil { return err } - reply, err := s.ApplyLocal(id, &messages.RemoveDelivery{Id: uint32(deliveryId)}) + reply, err := s.ApplyLocal(r.Context(), id, &messages.RemoveDelivery{Id: uint32(deliveryId)}) if err != nil { return err } @@ -184,7 +184,7 @@ func (s *PoolServer) QuantityChangeHandler(w http.ResponseWriter, r *http.Reques if err != nil { return err } - reply, err := s.ApplyLocal(id, &changeQuantity) + reply, err := s.ApplyLocal(r.Context(), id, &changeQuantity) if err != nil { return err } @@ -234,7 +234,7 @@ func (s *PoolServer) SetCartItemsHandler(w http.ResponseWriter, r *http.Request, msgs = append(msgs, &messages.ClearCartRequest{}) msgs = append(msgs, getMultipleAddMessages(r.Context(), setCartItems.Items, setCartItems.Country)...) - reply, err := s.ApplyLocal(id, msgs...) + reply, err := s.ApplyLocal(r.Context(), id, msgs...) if err != nil { return err } @@ -248,7 +248,7 @@ func (s *PoolServer) AddMultipleItemHandler(w http.ResponseWriter, r *http.Reque return err } - reply, err := s.ApplyLocal(id, getMultipleAddMessages(r.Context(), setCartItems.Items, setCartItems.Country)...) + reply, err := s.ApplyLocal(r.Context(), id, getMultipleAddMessages(r.Context(), setCartItems.Items, setCartItems.Country)...) if err != nil { return err } @@ -272,7 +272,7 @@ func (s *PoolServer) AddSkuRequestHandler(w http.ResponseWriter, r *http.Request if err != nil { return err } - reply, err := s.ApplyLocal(id, msg) + reply, err := s.ApplyLocal(r.Context(), id, msg) if err != nil { return err } @@ -350,7 +350,7 @@ func (s *PoolServer) CreateOrUpdateCheckout(ctx context.Context, host string, id } // Get current grain state (may be local or remote) - grain, err := s.Get(uint64(id)) + grain, err := s.Get(ctx, uint64(id)) if err != nil { return nil, err } @@ -376,9 +376,9 @@ func (s *PoolServer) CreateOrUpdateCheckout(ctx context.Context, host string, id } } -func (s *PoolServer) ApplyCheckoutStarted(klarnaOrder *CheckoutOrder, id cart.CartId) (*actor.MutationResult[*cart.CartGrain], error) { +func (s *PoolServer) ApplyCheckoutStarted(ctx context.Context, klarnaOrder *CheckoutOrder, id cart.CartId) (*actor.MutationResult[*cart.CartGrain], error) { // Persist initialization state via mutation (best-effort) - return s.ApplyLocal(id, &messages.InitializeCheckout{ + return s.ApplyLocal(ctx, id, &messages.InitializeCheckout{ OrderId: klarnaOrder.ID, Status: klarnaOrder.Status, PaymentInProgress: true, @@ -544,7 +544,7 @@ func (s *PoolServer) AddVoucherHandler(w http.ResponseWriter, r *http.Request, c v := voucher.Service{} msg, err := v.GetVoucher(data.VoucherCode) if err != nil { - s.ApplyLocal(cartId, &messages.PreConditionFailed{ + s.ApplyLocal(r.Context(), cartId, &messages.PreConditionFailed{ Operation: "AddVoucher", Error: err.Error(), }) @@ -552,7 +552,7 @@ func (s *PoolServer) AddVoucherHandler(w http.ResponseWriter, r *http.Request, c w.Write([]byte(err.Error())) return err } - reply, err := s.ApplyLocal(cartId, msg) + reply, err := s.ApplyLocal(r.Context(), cartId, msg) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) @@ -570,7 +570,7 @@ func (s *PoolServer) SubscriptionDetailsHandler(w http.ResponseWriter, r *http.R w.Write([]byte(err.Error())) return err } - reply, err := s.ApplyLocal(cartId, data) + reply, err := s.ApplyLocal(r.Context(), cartId, data) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) @@ -589,7 +589,7 @@ func (s *PoolServer) CheckoutHandler(fn func(order *CheckoutOrder, w http.Respon logger.Error("unable to create klarna session", "error", err) return err } - s.ApplyCheckoutStarted(order, cartId) + s.ApplyCheckoutStarted(r.Context(), order, cartId) return fn(order, w) } order, err := s.klarnaClient.GetOrder(r.Context(), orderId) @@ -609,7 +609,7 @@ func (s *PoolServer) RemoveVoucherHandler(w http.ResponseWriter, r *http.Request w.Write([]byte(err.Error())) return err } - reply, err := s.ApplyLocal(cartId, &messages.RemoveVoucher{Id: uint32(id)}) + reply, err := s.ApplyLocal(r.Context(), cartId, &messages.RemoveVoucher{Id: uint32(id)}) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) diff --git a/pkg/actor/disk_storage.go b/pkg/actor/disk_storage.go index 5dba330..e65a601 100644 --- a/pkg/actor/disk_storage.go +++ b/pkg/actor/disk_storage.go @@ -1,6 +1,7 @@ package actor import ( + "context" "errors" "fmt" "log" @@ -25,7 +26,7 @@ type DiskStorage[V any] struct { } type LogStorage[V any] interface { - LoadEvents(id uint64, grain Grain[V]) error + LoadEvents(ctx context.Context, id uint64, grain Grain[V]) error AppendMutations(id uint64, msg ...proto.Message) error } @@ -86,7 +87,7 @@ func (s *DiskStorage[V]) logPath(id uint64) string { return filepath.Join(s.path, fmt.Sprintf("%d.events.log", id)) } -func (s *DiskStorage[V]) LoadEvents(id uint64, grain Grain[V]) error { +func (s *DiskStorage[V]) LoadEvents(ctx context.Context, id uint64, grain Grain[V]) error { path := s.logPath(id) if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { // No log -> nothing to replay @@ -99,7 +100,7 @@ func (s *DiskStorage[V]) LoadEvents(id uint64, grain Grain[V]) error { } defer fh.Close() return s.Load(fh, func(msg proto.Message) { - s.registry.Apply(grain, msg) + s.registry.Apply(ctx, grain, msg) }) } diff --git a/pkg/actor/grain_pool.go b/pkg/actor/grain_pool.go index 2878493..f51c220 100644 --- a/pkg/actor/grain_pool.go +++ b/pkg/actor/grain_pool.go @@ -1,6 +1,7 @@ package actor import ( + "context" "net/http" "github.com/gogo/protobuf/proto" @@ -12,8 +13,8 @@ type MutationResult[V any] struct { } type GrainPool[V any] interface { - Apply(id uint64, mutation ...proto.Message) (*MutationResult[V], error) - Get(id uint64) (V, error) + Apply(ctx context.Context, id uint64, mutation ...proto.Message) (*MutationResult[V], error) + Get(ctx context.Context, id uint64) (V, error) OwnerHost(id uint64) (Host, bool) Hostname() string TakeOwnership(id uint64) diff --git a/pkg/actor/mutation_registry.go b/pkg/actor/mutation_registry.go index b76e693..41f7798 100644 --- a/pkg/actor/mutation_registry.go +++ b/pkg/actor/mutation_registry.go @@ -1,12 +1,14 @@ package actor import ( + "context" "fmt" "log" "reflect" "sync" "github.com/gogo/protobuf/proto" + "go.opentelemetry.io/otel/attribute" ) type ApplyResult struct { @@ -16,27 +18,25 @@ type ApplyResult struct { } type MutationProcessor interface { - Process(grain any) error + Process(ctx context.Context, grain any) error } type BasicMutationProcessor[V any] struct { - processor func(any) error + processor func(ctx context.Context, grain V) error } -func NewMutationProcessor[V any](process func(V) error) MutationProcessor { +func NewMutationProcessor[V any](process func(ctx context.Context, grain V) error) MutationProcessor { return &BasicMutationProcessor[V]{ - processor: func(v any) error { - return process(v.(V)) - }, + processor: process, } } -func (p *BasicMutationProcessor[V]) Process(grain any) error { - return p.processor(grain) +func (p *BasicMutationProcessor[V]) Process(ctx context.Context, grain any) error { + return p.processor(ctx, grain.(V)) } type MutationRegistry interface { - Apply(grain any, msg ...proto.Message) ([]ApplyResult, error) + Apply(ctx context.Context, grain any, msg ...proto.Message) ([]ApplyResult, error) RegisterMutations(handlers ...MutationHandler) Create(typeName string) (proto.Message, bool) GetTypeName(msg proto.Message) (string, bool) @@ -192,7 +192,15 @@ func (r *ProtoMutationRegistry) Create(typeName string) (proto.Message, bool) { // Returns updated grain if successful. // // If the mutation is not registered, returns (nil, ErrMutationNotRegistered). -func (r *ProtoMutationRegistry) Apply(grain any, msg ...proto.Message) ([]ApplyResult, error) { +func (r *ProtoMutationRegistry) Apply(ctx context.Context, grain any, msg ...proto.Message) ([]ApplyResult, error) { + + parentCtx, span := tracer.Start(ctx, "apply mutations") + defer span.End() + span.SetAttributes( + attribute.String("component", "registry"), + attribute.Int("mutations", len(msg)), + ) + results := make([]ApplyResult, 0, len(msg)) if grain == nil { @@ -214,20 +222,30 @@ func (r *ProtoMutationRegistry) Apply(grain any, msg ...proto.Message) ([]ApplyR continue } rt := indirectType(reflect.TypeOf(m)) + _, msgSpan := tracer.Start(parentCtx, rt.Name()) + r.mutationRegistryMu.RLock() entry, ok := r.mutationRegistry[rt] r.mutationRegistryMu.RUnlock() if !ok { results = append(results, ApplyResult{Error: ErrMutationNotRegistered, Type: rt.Name(), Mutation: m}) continue + } else { + err := entry.Handle(grain, m) + if err != nil { + msgSpan.RecordError(err) + } + results = append(results, ApplyResult{Error: err, Type: rt.Name(), Mutation: m}) } - err := entry.Handle(grain, m) - results = append(results, ApplyResult{Error: err, Type: rt.Name(), Mutation: m}) + msgSpan.End() } if len(results) > 0 { + processCtx, processSpan := tracer.Start(ctx, "after mutation processors") + defer processSpan.End() for _, processor := range r.processors { - err := processor.Process(grain) + + err := processor.Process(processCtx, grain) if err != nil { return results, err } diff --git a/pkg/actor/mutation_registry_test.go b/pkg/actor/mutation_registry_test.go index 36602f1..838baaa 100644 --- a/pkg/actor/mutation_registry_test.go +++ b/pkg/actor/mutation_registry_test.go @@ -1,6 +1,7 @@ package actor import ( + "context" "errors" "reflect" "slices" @@ -78,7 +79,7 @@ func TestRegisteredMutationBasics(t *testing.T) { // Apply happy path state := &cartState{} add := &messages.AddItem{ItemId: 42, Quantity: 3, Sku: "ABC"} - if _, err := reg.Apply(state, add); err != nil { + if _, err := reg.Apply(context.Background(), state, add); err != nil { t.Fatalf("Apply returned error: %v", err) } if state.calls != 1 { @@ -94,12 +95,12 @@ func TestRegisteredMutationBasics(t *testing.T) { } // Apply nil message - if _, err := reg.Apply(state, nil); err == nil { + if _, err := reg.Apply(context.Background(), state, nil); err == nil { t.Fatalf("expected error for nil mutation message") } // Apply unregistered message - if _, err := reg.Apply(state, &messages.Noop{}); !errors.Is(err, ErrMutationNotRegistered) { + if _, err := reg.Apply(context.Background(), state, &messages.Noop{}); !errors.Is(err, ErrMutationNotRegistered) { t.Fatalf("expected ErrMutationNotRegistered, got %v", err) } } diff --git a/pkg/actor/simple_grain_pool.go b/pkg/actor/simple_grain_pool.go index 38a25e7..f08dab4 100644 --- a/pkg/actor/simple_grain_pool.go +++ b/pkg/actor/simple_grain_pool.go @@ -1,6 +1,7 @@ package actor import ( + "context" "fmt" "log" "maps" @@ -15,7 +16,7 @@ type SimpleGrainPool[V any] struct { localMu sync.RWMutex grains map[uint64]Grain[V] mutationRegistry MutationRegistry - spawn func(id uint64) (Grain[V], error) + spawn func(ctx context.Context, id uint64) (Grain[V], error) spawnHost func(host string) (Host, error) listeners []LogListener storage LogStorage[V] @@ -35,7 +36,7 @@ type SimpleGrainPool[V any] struct { type GrainPoolConfig[V any] struct { Hostname string - Spawn func(id uint64) (Grain[V], error) + Spawn func(ctx context.Context, id uint64) (Grain[V], error) SpawnHost func(host string) (Host, error) TTL time.Duration PoolSize int @@ -366,7 +367,7 @@ func (p *SimpleGrainPool[V]) broadcastOwnership(ids []uint64) { // go p.statsUpdate() } -func (p *SimpleGrainPool[V]) getOrClaimGrain(id uint64) (Grain[V], error) { +func (p *SimpleGrainPool[V]) getOrClaimGrain(ctx context.Context, id uint64) (Grain[V], error) { p.localMu.RLock() grain, exists := p.grains[id] p.localMu.RUnlock() @@ -374,7 +375,7 @@ func (p *SimpleGrainPool[V]) getOrClaimGrain(id uint64) (Grain[V], error) { return grain, nil } - grain, err := p.spawn(id) + grain, err := p.spawn(ctx, id) if err != nil { return nil, err } @@ -389,13 +390,13 @@ func (p *SimpleGrainPool[V]) getOrClaimGrain(id uint64) (Grain[V], error) { // var ErrNotOwner = fmt.Errorf("not owner") // Apply applies a mutation to a grain. -func (p *SimpleGrainPool[V]) Apply(id uint64, mutation ...proto.Message) (*MutationResult[*V], error) { - grain, err := p.getOrClaimGrain(id) +func (p *SimpleGrainPool[V]) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (*MutationResult[*V], error) { + grain, err := p.getOrClaimGrain(ctx, id) if err != nil { return nil, err } - mutations, err := p.mutationRegistry.Apply(grain, mutation...) + mutations, err := p.mutationRegistry.Apply(ctx, grain, mutation...) if err != nil { return nil, err } @@ -420,8 +421,8 @@ func (p *SimpleGrainPool[V]) Apply(id uint64, mutation ...proto.Message) (*Mutat } // Get returns the current state of a grain. -func (p *SimpleGrainPool[V]) Get(id uint64) (*V, error) { - grain, err := p.getOrClaimGrain(id) +func (p *SimpleGrainPool[V]) Get(ctx context.Context, id uint64) (*V, error) { + grain, err := p.getOrClaimGrain(ctx, id) if err != nil { return nil, err } diff --git a/pkg/cart/mutation_test.go b/pkg/cart/mutation_test.go index 50fff32..2aa6519 100644 --- a/pkg/cart/mutation_test.go +++ b/pkg/cart/mutation_test.go @@ -1,6 +1,7 @@ package cart import ( + "context" "reflect" "slices" "strings" @@ -90,7 +91,7 @@ func ptr[T any](v T) *T { return &v } func applyOne(t *testing.T, reg actor.MutationRegistry, g *CartGrain, msg proto.Message) actor.ApplyResult { t.Helper() - results, err := reg.Apply(g, msg) + results, err := reg.Apply(context.Background(), g, msg) if err != nil { t.Fatalf("unexpected registry-level error applying %T: %v", msg, err) } @@ -178,7 +179,7 @@ func TestMutationRegistryCoverage(t *testing.T) { } // Apply unregistered message -> result should contain ErrMutationNotRegistered, no top-level error - results, err := reg.Apply(newTestGrain(), &messages.Noop{}) + results, err := reg.Apply(context.Background(), newTestGrain(), &messages.Noop{}) if err != nil { t.Fatalf("unexpected top-level error applying unregistered mutation: %v", err) } @@ -426,7 +427,7 @@ func TestSubscriptionDetailsMutation(t *testing.T) { applyErrorContains(t, reg, g, &messages.UpsertSubscriptionDetails{Id: &badId}, "not found") // Nil mutation should be ignored and produce zero results. - resultsNil, errNil := reg.Apply(g, (*messages.UpsertSubscriptionDetails)(nil)) + resultsNil, errNil := reg.Apply(context.Background(), g, (*messages.UpsertSubscriptionDetails)(nil)) if errNil != nil { t.Fatalf("unexpected error for nil mutation element: %v", errNil) } @@ -450,7 +451,7 @@ func TestRegistryDefensiveErrors(t *testing.T) { } // Nil message slice - results, _ = reg.Apply(g, nil) + results, _ = reg.Apply(context.Background(), g, nil) if len(results) != 0 { t.Fatalf("expected no results when message slice nil")