This commit is contained in:
matst80
2025-12-04 22:09:26 +01:00
parent d78685cd8f
commit 5e36af2524
2 changed files with 160 additions and 26 deletions

View File

@@ -41,14 +41,16 @@ type MutationRegistry interface {
Create(typeName string) (proto.Message, bool)
GetTypeName(msg proto.Message) (string, bool)
RegisterProcessor(processor ...MutationProcessor)
//GetStorageEvent(msg proto.Message) StorageEvent
//FromStorageEvent(event StorageEvent) (proto.Message, error)
RegisterTrigger(trigger ...TriggerHandler)
SetEventChannel(ch chan<- ApplyResult)
}
type ProtoMutationRegistry struct {
mutationRegistryMu sync.RWMutex
mutationRegistry map[reflect.Type]MutationHandler
triggers map[reflect.Type][]TriggerHandler
processors []MutationProcessor
eventChannel chan<- ApplyResult
}
var (
@@ -84,6 +86,26 @@ func WithTotals() MutationOption {
}
}
type TriggerHandler interface {
Handle(state any, msg proto.Message) []proto.Message
Name() string
Type() reflect.Type
}
type RegisteredTrigger[V any, I proto.Message] struct {
name string
handler func(state any, msg proto.Message) []proto.Message
msgType reflect.Type
}
func NewTrigger[V any, I proto.Message](name string, handler func(state any, msg proto.Message) []proto.Message) *RegisteredTrigger[V, I] {
return &RegisteredTrigger[V, I]{
name: name,
handler: handler,
msgType: reflect.TypeOf((*I)(nil)).Elem(),
}
}
type MutationHandler interface {
Handle(state any, msg proto.Message) error
Name() string
@@ -145,6 +167,7 @@ func NewMutationRegistry() MutationRegistry {
return &ProtoMutationRegistry{
mutationRegistry: make(map[reflect.Type]MutationHandler),
mutationRegistryMu: sync.RWMutex{},
triggers: make(map[reflect.Type][]TriggerHandler),
processors: make([]MutationProcessor, 0),
}
}
@@ -162,6 +185,24 @@ func (r *ProtoMutationRegistry) RegisterMutations(handlers ...MutationHandler) {
}
}
func (r *ProtoMutationRegistry) RegisterTrigger(triggers ...TriggerHandler) {
r.mutationRegistryMu.Lock()
defer r.mutationRegistryMu.Unlock()
for _, trigger := range triggers {
existingTriggers, ok := r.triggers[trigger.Type()]
if !ok {
r.triggers[trigger.Type()] = []TriggerHandler{trigger}
} else {
r.triggers[trigger.Type()] = append(existingTriggers, trigger)
}
}
}
func (r *ProtoMutationRegistry) SetEventChannel(ch chan<- ApplyResult) {
r.eventChannel = ch
}
func (r *ProtoMutationRegistry) GetTypeName(msg proto.Message) (string, bool) {
r.mutationRegistryMu.RLock()
defer r.mutationRegistryMu.RUnlock()
@@ -244,6 +285,25 @@ func (r *ProtoMutationRegistry) Apply(ctx context.Context, grain any, msg ...pro
if err != nil {
msgSpan.RecordError(err)
}
if r.eventChannel != nil {
go func() {
defer func() {
if r := recover(); r != nil {
// Handle panic from sending to closed channel
log.Printf("event channel closed: %v", r)
}
}()
for _, tr := range r.triggers[rt] {
for _, msg := range tr.Handle(grain, m) {
select {
case r.eventChannel <- msg:
default:
// Channel full or no receiver, skip to avoid blocking
}
}
}
}()
}
results = append(results, ApplyResult{Error: err, Type: rt.Name(), Mutation: m})
}
msgSpan.End()
@@ -266,6 +326,7 @@ func (r *ProtoMutationRegistry) Apply(ctx context.Context, grain any, msg ...pro
return results, res.Error
}
}
return results, nil
}