diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 3179b3e..79bfd11 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -99,6 +99,11 @@ type MutationContext struct { VoucherService voucher.Service } +type CartChangeEvent struct { + CartId cart.CartId `json:"cartId"` + Mutations []actor.ApplyResult `json:"mutations"` +} + func main() { controlPlaneConfig := actor.DefaultServerConfig() @@ -173,7 +178,12 @@ func main() { fmt.Errorf("failed to connect to RabbitMQ: %w", err) } - amqpListener := actor.NewAmqpListener(conn) + amqpListener := actor.NewAmqpListener(conn, func(id uint64, msg []actor.ApplyResult) (any, error) { + return &CartChangeEvent{ + CartId: cart.CartId(id), + Mutations: msg, + }, nil + }) amqpListener.DefineTopics() pool.AddListener(amqpListener) diff --git a/pkg/actor/disk_storage.go b/pkg/actor/disk_storage.go index 08daf71..deff401 100644 --- a/pkg/actor/disk_storage.go +++ b/pkg/actor/disk_storage.go @@ -10,8 +10,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - "github.com/matst80/slask-finder/pkg/messaging" - amqp "github.com/rabbitmq/amqp091-go" ) type QueueEvent struct { @@ -31,46 +29,6 @@ type LogStorage[V any] interface { AppendMutations(id uint64, msg ...proto.Message) error } -type LogListener interface { - AppendMutations(id uint64, msg ...ApplyResult) -} - -type AmqpListener struct { - conn *amqp.Connection -} - -func NewAmqpListener(conn *amqp.Connection) *AmqpListener { - return &AmqpListener{ - conn: conn, - } -} - -func (l *AmqpListener) DefineTopics() { - ch, err := l.conn.Channel() - if err != nil { - log.Fatalf("Failed to open a channel: %v", err) - } - defer ch.Close() - if err := messaging.DefineTopic(ch, "cart", "mutation"); err != nil { - log.Fatalf("Failed to declare topic mutation: %v", err) - } -} - -type CartEvent struct { - Id uint64 `json:"id"` - Mutations []ApplyResult `json:"mutations"` -} - -func (l *AmqpListener) AppendMutations(id uint64, msg ...ApplyResult) { - err := messaging.SendChange(l.conn, "cart", "mutation", &CartEvent{ - Id: id, - Mutations: msg, - }) - if err != nil { - log.Printf("Failed to send mutation event: %v", err) - } -} - func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] { return &DiskStorage[V]{ StateStorage: NewState(registry), diff --git a/pkg/actor/log_listerner.go b/pkg/actor/log_listerner.go new file mode 100644 index 0000000..3990c77 --- /dev/null +++ b/pkg/actor/log_listerner.go @@ -0,0 +1,47 @@ +package actor + +import ( + "log" + + "github.com/matst80/slask-finder/pkg/messaging" + amqp "github.com/rabbitmq/amqp091-go" +) + +type LogListener interface { + AppendMutations(id uint64, msg ...ApplyResult) +} + +type AmqpListener struct { + conn *amqp.Connection + transformer func(id uint64, msg []ApplyResult) (any, error) +} + +func NewAmqpListener(conn *amqp.Connection, transformer func(id uint64, msg []ApplyResult) (any, error)) *AmqpListener { + return &AmqpListener{ + conn: conn, + transformer: transformer, + } +} + +func (l *AmqpListener) DefineTopics() { + ch, err := l.conn.Channel() + if err != nil { + log.Fatalf("Failed to open a channel: %v", err) + } + defer ch.Close() + if err := messaging.DefineTopic(ch, "cart", "mutation"); err != nil { + log.Fatalf("Failed to declare topic mutation: %v", err) + } +} + +func (l *AmqpListener) AppendMutations(id uint64, msg ...ApplyResult) { + data, err := l.transformer(id, msg) + if err != nil { + log.Printf("Failed to transform mutation event: %v", err) + return + } + err = messaging.SendChange(l.conn, "cart", "mutation", data) + if err != nil { + log.Printf("Failed to send mutation event: %v", err) + } +}