package actor import ( "fmt" "log" "reflect" "sync" "github.com/gogo/protobuf/proto" ) type ApplyResult struct { Type string `json:"type"` Mutation proto.Message `json:"mutation"` Error error `json:"error,omitempty"` } type MutationProcessor interface { Process(grain any) error } type BasicMutationProcessor[V any] struct { processor func(any) error } func NewMutationProcessor[V any](process func(V) error) MutationProcessor { return &BasicMutationProcessor[V]{ processor: func(v any) error { return process(v.(V)) }, } } func (p *BasicMutationProcessor[V]) Process(grain any) error { return p.processor(grain) } type MutationRegistry interface { Apply(grain any, msg ...proto.Message) ([]ApplyResult, error) RegisterMutations(handlers ...MutationHandler) Create(typeName string) (proto.Message, bool) GetTypeName(msg proto.Message) (string, bool) RegisterProcessor(processor ...MutationProcessor) //GetStorageEvent(msg proto.Message) StorageEvent //FromStorageEvent(event StorageEvent) (proto.Message, error) } type ProtoMutationRegistry struct { mutationRegistryMu sync.RWMutex mutationRegistry map[reflect.Type]MutationHandler processors []MutationProcessor } var ( ErrMutationNotRegistered = &MutationError{ Message: "mutation not registered", Code: 255, StatusCode: 500, } ) type MutationError struct { Message string `json:"message"` Code uint32 `json:"code"` StatusCode uint32 `json:"status_code"` } func (m MutationError) Error() string { return m.Message } // MutationOption configures additional behavior for a registered mutation. type MutationOption func(*mutationOptions) // mutationOptions holds flags adjustable per registration. type mutationOptions struct { updateTotals bool } // WithTotals ensures CartGrain.UpdateTotals() is called after a successful handler. func WithTotals() MutationOption { return func(o *mutationOptions) { o.updateTotals = true } } type MutationHandler interface { Handle(state any, msg proto.Message) error Name() string Type() reflect.Type Create() proto.Message } // RegisteredMutation stores metadata + the execution closure. type RegisteredMutation[V any, T proto.Message] struct { name string handler func(*V, T) error create func() T msgType reflect.Type } func NewMutation[V any, T proto.Message](handler func(*V, T) error, create func() T) *RegisteredMutation[V, T] { // Derive the name and message type from a concrete instance produced by create(). // This avoids relying on reflect.TypeFor (which can yield unexpected results in some toolchains) // and ensures we always peel off the pointer layer for proto messages. instance := create() rt := reflect.TypeOf(instance) if rt.Kind() == reflect.Ptr { rt = rt.Elem() } return &RegisteredMutation[V, T]{ name: rt.Name(), handler: handler, create: create, msgType: rt, } } func (m *RegisteredMutation[V, T]) Handle(state any, msg proto.Message) error { return m.handler(state.(*V), msg.(T)) } func (m *RegisteredMutation[V, T]) Name() string { return m.name } func (m *RegisteredMutation[V, T]) Create() proto.Message { return m.create() } func (m *RegisteredMutation[V, T]) Type() reflect.Type { return m.msgType } func NewMutationRegistry() MutationRegistry { return &ProtoMutationRegistry{ mutationRegistry: make(map[reflect.Type]MutationHandler), mutationRegistryMu: sync.RWMutex{}, processors: make([]MutationProcessor, 0), } } func (r *ProtoMutationRegistry) RegisterProcessor(processors ...MutationProcessor) { r.processors = append(r.processors, processors...) } func (r *ProtoMutationRegistry) RegisterMutations(handlers ...MutationHandler) { r.mutationRegistryMu.Lock() defer r.mutationRegistryMu.Unlock() for _, handler := range handlers { r.mutationRegistry[handler.Type()] = handler } } func (r *ProtoMutationRegistry) GetTypeName(msg proto.Message) (string, bool) { r.mutationRegistryMu.RLock() defer r.mutationRegistryMu.RUnlock() rt := indirectType(reflect.TypeOf(msg)) if handler, ok := r.mutationRegistry[rt]; ok { return handler.Name(), true } return "", false } func (r *ProtoMutationRegistry) getHandler(typeName string) MutationHandler { r.mutationRegistryMu.Lock() defer r.mutationRegistryMu.Unlock() for _, handler := range r.mutationRegistry { if handler.Name() == typeName { return handler } } return nil } func (r *ProtoMutationRegistry) Create(typeName string) (proto.Message, bool) { handler := r.getHandler(typeName) if handler == nil { log.Printf("missing handler for %s", typeName) return nil, false } return handler.Create(), true } // ApplyRegistered attempts to apply a registered mutation. // Returns updated grain if successful. // // If the mutation is not registered, returns (nil, ErrMutationNotRegistered). func (r *ProtoMutationRegistry) Apply(grain any, msg ...proto.Message) ([]ApplyResult, error) { results := make([]ApplyResult, 0, len(msg)) if grain == nil { return results, fmt.Errorf("nil grain") } // Nil slice of mutations still treated as an error (call contract violation). if msg == nil { return results, fmt.Errorf("nil mutation message") } for _, m := range msg { // Ignore nil mutation elements (untyped or typed nil pointers) silently; they carry no data. if m == nil { continue } // Typed nil: interface holds concrete proto message type whose pointer value is nil. rv := reflect.ValueOf(m) if rv.Kind() == reflect.Ptr && rv.IsNil() { continue } rt := indirectType(reflect.TypeOf(m)) r.mutationRegistryMu.RLock() entry, ok := r.mutationRegistry[rt] r.mutationRegistryMu.RUnlock() if !ok { results = append(results, ApplyResult{Error: ErrMutationNotRegistered, Type: rt.Name(), Mutation: m}) continue } err := entry.Handle(grain, m) results = append(results, ApplyResult{Error: err, Type: rt.Name(), Mutation: m}) } for _, processor := range r.processors { err := processor.Process(grain) if err != nil { return results, err } } return results, nil } // RegisteredMutations returns metadata for all registered mutations (snapshot). func (r *ProtoMutationRegistry) RegisteredMutations() []string { r.mutationRegistryMu.RLock() defer r.mutationRegistryMu.RUnlock() out := make([]string, 0, len(r.mutationRegistry)) for _, entry := range r.mutationRegistry { out = append(out, entry.Name()) } return out } // RegisteredMutationTypes returns the reflect.Type list of all registered messages. // Useful for coverage tests ensuring expected set matches actual set. func (r *ProtoMutationRegistry) RegisteredMutationTypes() []reflect.Type { r.mutationRegistryMu.RLock() defer r.mutationRegistryMu.RUnlock() out := make([]reflect.Type, 0, len(r.mutationRegistry)) for _, entry := range r.mutationRegistry { out = append(out, entry.Type()) } return out } func indirectType(t reflect.Type) reflect.Type { for t.Kind() == reflect.Ptr { t = t.Elem() } return t }