more otel
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package actor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
@@ -25,7 +26,7 @@ type DiskStorage[V any] struct {
|
||||
}
|
||||
|
||||
type LogStorage[V any] interface {
|
||||
LoadEvents(id uint64, grain Grain[V]) error
|
||||
LoadEvents(ctx context.Context, id uint64, grain Grain[V]) error
|
||||
AppendMutations(id uint64, msg ...proto.Message) error
|
||||
}
|
||||
|
||||
@@ -86,7 +87,7 @@ func (s *DiskStorage[V]) logPath(id uint64) string {
|
||||
return filepath.Join(s.path, fmt.Sprintf("%d.events.log", id))
|
||||
}
|
||||
|
||||
func (s *DiskStorage[V]) LoadEvents(id uint64, grain Grain[V]) error {
|
||||
func (s *DiskStorage[V]) LoadEvents(ctx context.Context, id uint64, grain Grain[V]) error {
|
||||
path := s.logPath(id)
|
||||
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
|
||||
// No log -> nothing to replay
|
||||
@@ -99,7 +100,7 @@ func (s *DiskStorage[V]) LoadEvents(id uint64, grain Grain[V]) error {
|
||||
}
|
||||
defer fh.Close()
|
||||
return s.Load(fh, func(msg proto.Message) {
|
||||
s.registry.Apply(grain, msg)
|
||||
s.registry.Apply(ctx, grain, msg)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package actor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
@@ -12,8 +13,8 @@ type MutationResult[V any] struct {
|
||||
}
|
||||
|
||||
type GrainPool[V any] interface {
|
||||
Apply(id uint64, mutation ...proto.Message) (*MutationResult[V], error)
|
||||
Get(id uint64) (V, error)
|
||||
Apply(ctx context.Context, id uint64, mutation ...proto.Message) (*MutationResult[V], error)
|
||||
Get(ctx context.Context, id uint64) (V, error)
|
||||
OwnerHost(id uint64) (Host, bool)
|
||||
Hostname() string
|
||||
TakeOwnership(id uint64)
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package actor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
)
|
||||
|
||||
type ApplyResult struct {
|
||||
@@ -16,27 +18,25 @@ type ApplyResult struct {
|
||||
}
|
||||
|
||||
type MutationProcessor interface {
|
||||
Process(grain any) error
|
||||
Process(ctx context.Context, grain any) error
|
||||
}
|
||||
|
||||
type BasicMutationProcessor[V any] struct {
|
||||
processor func(any) error
|
||||
processor func(ctx context.Context, grain V) error
|
||||
}
|
||||
|
||||
func NewMutationProcessor[V any](process func(V) error) MutationProcessor {
|
||||
func NewMutationProcessor[V any](process func(ctx context.Context, grain V) error) MutationProcessor {
|
||||
return &BasicMutationProcessor[V]{
|
||||
processor: func(v any) error {
|
||||
return process(v.(V))
|
||||
},
|
||||
processor: process,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *BasicMutationProcessor[V]) Process(grain any) error {
|
||||
return p.processor(grain)
|
||||
func (p *BasicMutationProcessor[V]) Process(ctx context.Context, grain any) error {
|
||||
return p.processor(ctx, grain.(V))
|
||||
}
|
||||
|
||||
type MutationRegistry interface {
|
||||
Apply(grain any, msg ...proto.Message) ([]ApplyResult, error)
|
||||
Apply(ctx context.Context, grain any, msg ...proto.Message) ([]ApplyResult, error)
|
||||
RegisterMutations(handlers ...MutationHandler)
|
||||
Create(typeName string) (proto.Message, bool)
|
||||
GetTypeName(msg proto.Message) (string, bool)
|
||||
@@ -192,7 +192,15 @@ func (r *ProtoMutationRegistry) Create(typeName string) (proto.Message, bool) {
|
||||
// Returns updated grain if successful.
|
||||
//
|
||||
// If the mutation is not registered, returns (nil, ErrMutationNotRegistered).
|
||||
func (r *ProtoMutationRegistry) Apply(grain any, msg ...proto.Message) ([]ApplyResult, error) {
|
||||
func (r *ProtoMutationRegistry) Apply(ctx context.Context, grain any, msg ...proto.Message) ([]ApplyResult, error) {
|
||||
|
||||
parentCtx, span := tracer.Start(ctx, "apply mutations")
|
||||
defer span.End()
|
||||
span.SetAttributes(
|
||||
attribute.String("component", "registry"),
|
||||
attribute.Int("mutations", len(msg)),
|
||||
)
|
||||
|
||||
results := make([]ApplyResult, 0, len(msg))
|
||||
|
||||
if grain == nil {
|
||||
@@ -214,20 +222,30 @@ func (r *ProtoMutationRegistry) Apply(grain any, msg ...proto.Message) ([]ApplyR
|
||||
continue
|
||||
}
|
||||
rt := indirectType(reflect.TypeOf(m))
|
||||
_, msgSpan := tracer.Start(parentCtx, rt.Name())
|
||||
|
||||
r.mutationRegistryMu.RLock()
|
||||
entry, ok := r.mutationRegistry[rt]
|
||||
r.mutationRegistryMu.RUnlock()
|
||||
if !ok {
|
||||
results = append(results, ApplyResult{Error: ErrMutationNotRegistered, Type: rt.Name(), Mutation: m})
|
||||
continue
|
||||
} else {
|
||||
err := entry.Handle(grain, m)
|
||||
if err != nil {
|
||||
msgSpan.RecordError(err)
|
||||
}
|
||||
results = append(results, ApplyResult{Error: err, Type: rt.Name(), Mutation: m})
|
||||
}
|
||||
err := entry.Handle(grain, m)
|
||||
results = append(results, ApplyResult{Error: err, Type: rt.Name(), Mutation: m})
|
||||
msgSpan.End()
|
||||
}
|
||||
|
||||
if len(results) > 0 {
|
||||
processCtx, processSpan := tracer.Start(ctx, "after mutation processors")
|
||||
defer processSpan.End()
|
||||
for _, processor := range r.processors {
|
||||
err := processor.Process(grain)
|
||||
|
||||
err := processor.Process(processCtx, grain)
|
||||
if err != nil {
|
||||
return results, err
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package actor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
"slices"
|
||||
@@ -78,7 +79,7 @@ func TestRegisteredMutationBasics(t *testing.T) {
|
||||
// Apply happy path
|
||||
state := &cartState{}
|
||||
add := &messages.AddItem{ItemId: 42, Quantity: 3, Sku: "ABC"}
|
||||
if _, err := reg.Apply(state, add); err != nil {
|
||||
if _, err := reg.Apply(context.Background(), state, add); err != nil {
|
||||
t.Fatalf("Apply returned error: %v", err)
|
||||
}
|
||||
if state.calls != 1 {
|
||||
@@ -94,12 +95,12 @@ func TestRegisteredMutationBasics(t *testing.T) {
|
||||
}
|
||||
|
||||
// Apply nil message
|
||||
if _, err := reg.Apply(state, nil); err == nil {
|
||||
if _, err := reg.Apply(context.Background(), state, nil); err == nil {
|
||||
t.Fatalf("expected error for nil mutation message")
|
||||
}
|
||||
|
||||
// Apply unregistered message
|
||||
if _, err := reg.Apply(state, &messages.Noop{}); !errors.Is(err, ErrMutationNotRegistered) {
|
||||
if _, err := reg.Apply(context.Background(), state, &messages.Noop{}); !errors.Is(err, ErrMutationNotRegistered) {
|
||||
t.Fatalf("expected ErrMutationNotRegistered, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package actor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"maps"
|
||||
@@ -15,7 +16,7 @@ type SimpleGrainPool[V any] struct {
|
||||
localMu sync.RWMutex
|
||||
grains map[uint64]Grain[V]
|
||||
mutationRegistry MutationRegistry
|
||||
spawn func(id uint64) (Grain[V], error)
|
||||
spawn func(ctx context.Context, id uint64) (Grain[V], error)
|
||||
spawnHost func(host string) (Host, error)
|
||||
listeners []LogListener
|
||||
storage LogStorage[V]
|
||||
@@ -35,7 +36,7 @@ type SimpleGrainPool[V any] struct {
|
||||
|
||||
type GrainPoolConfig[V any] struct {
|
||||
Hostname string
|
||||
Spawn func(id uint64) (Grain[V], error)
|
||||
Spawn func(ctx context.Context, id uint64) (Grain[V], error)
|
||||
SpawnHost func(host string) (Host, error)
|
||||
TTL time.Duration
|
||||
PoolSize int
|
||||
@@ -366,7 +367,7 @@ func (p *SimpleGrainPool[V]) broadcastOwnership(ids []uint64) {
|
||||
// go p.statsUpdate()
|
||||
}
|
||||
|
||||
func (p *SimpleGrainPool[V]) getOrClaimGrain(id uint64) (Grain[V], error) {
|
||||
func (p *SimpleGrainPool[V]) getOrClaimGrain(ctx context.Context, id uint64) (Grain[V], error) {
|
||||
p.localMu.RLock()
|
||||
grain, exists := p.grains[id]
|
||||
p.localMu.RUnlock()
|
||||
@@ -374,7 +375,7 @@ func (p *SimpleGrainPool[V]) getOrClaimGrain(id uint64) (Grain[V], error) {
|
||||
return grain, nil
|
||||
}
|
||||
|
||||
grain, err := p.spawn(id)
|
||||
grain, err := p.spawn(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -389,13 +390,13 @@ func (p *SimpleGrainPool[V]) getOrClaimGrain(id uint64) (Grain[V], error) {
|
||||
// var ErrNotOwner = fmt.Errorf("not owner")
|
||||
|
||||
// Apply applies a mutation to a grain.
|
||||
func (p *SimpleGrainPool[V]) Apply(id uint64, mutation ...proto.Message) (*MutationResult[*V], error) {
|
||||
grain, err := p.getOrClaimGrain(id)
|
||||
func (p *SimpleGrainPool[V]) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (*MutationResult[*V], error) {
|
||||
grain, err := p.getOrClaimGrain(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mutations, err := p.mutationRegistry.Apply(grain, mutation...)
|
||||
mutations, err := p.mutationRegistry.Apply(ctx, grain, mutation...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -420,8 +421,8 @@ func (p *SimpleGrainPool[V]) Apply(id uint64, mutation ...proto.Message) (*Mutat
|
||||
}
|
||||
|
||||
// Get returns the current state of a grain.
|
||||
func (p *SimpleGrainPool[V]) Get(id uint64) (*V, error) {
|
||||
grain, err := p.getOrClaimGrain(id)
|
||||
func (p *SimpleGrainPool[V]) Get(ctx context.Context, id uint64) (*V, error) {
|
||||
grain, err := p.getOrClaimGrain(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package cart
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"slices"
|
||||
"strings"
|
||||
@@ -90,7 +91,7 @@ func ptr[T any](v T) *T { return &v }
|
||||
|
||||
func applyOne(t *testing.T, reg actor.MutationRegistry, g *CartGrain, msg proto.Message) actor.ApplyResult {
|
||||
t.Helper()
|
||||
results, err := reg.Apply(g, msg)
|
||||
results, err := reg.Apply(context.Background(), g, msg)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected registry-level error applying %T: %v", msg, err)
|
||||
}
|
||||
@@ -178,7 +179,7 @@ func TestMutationRegistryCoverage(t *testing.T) {
|
||||
}
|
||||
|
||||
// Apply unregistered message -> result should contain ErrMutationNotRegistered, no top-level error
|
||||
results, err := reg.Apply(newTestGrain(), &messages.Noop{})
|
||||
results, err := reg.Apply(context.Background(), newTestGrain(), &messages.Noop{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected top-level error applying unregistered mutation: %v", err)
|
||||
}
|
||||
@@ -426,7 +427,7 @@ func TestSubscriptionDetailsMutation(t *testing.T) {
|
||||
applyErrorContains(t, reg, g, &messages.UpsertSubscriptionDetails{Id: &badId}, "not found")
|
||||
|
||||
// Nil mutation should be ignored and produce zero results.
|
||||
resultsNil, errNil := reg.Apply(g, (*messages.UpsertSubscriptionDetails)(nil))
|
||||
resultsNil, errNil := reg.Apply(context.Background(), g, (*messages.UpsertSubscriptionDetails)(nil))
|
||||
if errNil != nil {
|
||||
t.Fatalf("unexpected error for nil mutation element: %v", errNil)
|
||||
}
|
||||
@@ -450,7 +451,7 @@ func TestRegistryDefensiveErrors(t *testing.T) {
|
||||
}
|
||||
|
||||
// Nil message slice
|
||||
results, _ = reg.Apply(g, nil)
|
||||
results, _ = reg.Apply(context.Background(), g, nil)
|
||||
|
||||
if len(results) != 0 {
|
||||
t.Fatalf("expected no results when message slice nil")
|
||||
|
||||
Reference in New Issue
Block a user