update
This commit is contained in:
@@ -99,6 +99,11 @@ type MutationContext struct {
|
|||||||
VoucherService voucher.Service
|
VoucherService voucher.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CartChangeEvent struct {
|
||||||
|
CartId cart.CartId `json:"cartId"`
|
||||||
|
Mutations []actor.ApplyResult `json:"mutations"`
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|
||||||
controlPlaneConfig := actor.DefaultServerConfig()
|
controlPlaneConfig := actor.DefaultServerConfig()
|
||||||
@@ -173,7 +178,12 @@ func main() {
|
|||||||
fmt.Errorf("failed to connect to RabbitMQ: %w", err)
|
fmt.Errorf("failed to connect to RabbitMQ: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
amqpListener := actor.NewAmqpListener(conn)
|
amqpListener := actor.NewAmqpListener(conn, func(id uint64, msg []actor.ApplyResult) (any, error) {
|
||||||
|
return &CartChangeEvent{
|
||||||
|
CartId: cart.CartId(id),
|
||||||
|
Mutations: msg,
|
||||||
|
}, nil
|
||||||
|
})
|
||||||
amqpListener.DefineTopics()
|
amqpListener.DefineTopics()
|
||||||
pool.AddListener(amqpListener)
|
pool.AddListener(amqpListener)
|
||||||
|
|
||||||
|
|||||||
@@ -10,8 +10,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
"github.com/matst80/slask-finder/pkg/messaging"
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type QueueEvent struct {
|
type QueueEvent struct {
|
||||||
@@ -31,46 +29,6 @@ type LogStorage[V any] interface {
|
|||||||
AppendMutations(id uint64, msg ...proto.Message) error
|
AppendMutations(id uint64, msg ...proto.Message) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type LogListener interface {
|
|
||||||
AppendMutations(id uint64, msg ...ApplyResult)
|
|
||||||
}
|
|
||||||
|
|
||||||
type AmqpListener struct {
|
|
||||||
conn *amqp.Connection
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewAmqpListener(conn *amqp.Connection) *AmqpListener {
|
|
||||||
return &AmqpListener{
|
|
||||||
conn: conn,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *AmqpListener) DefineTopics() {
|
|
||||||
ch, err := l.conn.Channel()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to open a channel: %v", err)
|
|
||||||
}
|
|
||||||
defer ch.Close()
|
|
||||||
if err := messaging.DefineTopic(ch, "cart", "mutation"); err != nil {
|
|
||||||
log.Fatalf("Failed to declare topic mutation: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type CartEvent struct {
|
|
||||||
Id uint64 `json:"id"`
|
|
||||||
Mutations []ApplyResult `json:"mutations"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *AmqpListener) AppendMutations(id uint64, msg ...ApplyResult) {
|
|
||||||
err := messaging.SendChange(l.conn, "cart", "mutation", &CartEvent{
|
|
||||||
Id: id,
|
|
||||||
Mutations: msg,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to send mutation event: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] {
|
func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] {
|
||||||
return &DiskStorage[V]{
|
return &DiskStorage[V]{
|
||||||
StateStorage: NewState(registry),
|
StateStorage: NewState(registry),
|
||||||
|
|||||||
47
pkg/actor/log_listerner.go
Normal file
47
pkg/actor/log_listerner.go
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
package actor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/matst80/slask-finder/pkg/messaging"
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type LogListener interface {
|
||||||
|
AppendMutations(id uint64, msg ...ApplyResult)
|
||||||
|
}
|
||||||
|
|
||||||
|
type AmqpListener struct {
|
||||||
|
conn *amqp.Connection
|
||||||
|
transformer func(id uint64, msg []ApplyResult) (any, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAmqpListener(conn *amqp.Connection, transformer func(id uint64, msg []ApplyResult) (any, error)) *AmqpListener {
|
||||||
|
return &AmqpListener{
|
||||||
|
conn: conn,
|
||||||
|
transformer: transformer,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *AmqpListener) DefineTopics() {
|
||||||
|
ch, err := l.conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to open a channel: %v", err)
|
||||||
|
}
|
||||||
|
defer ch.Close()
|
||||||
|
if err := messaging.DefineTopic(ch, "cart", "mutation"); err != nil {
|
||||||
|
log.Fatalf("Failed to declare topic mutation: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *AmqpListener) AppendMutations(id uint64, msg ...ApplyResult) {
|
||||||
|
data, err := l.transformer(id, msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to transform mutation event: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = messaging.SendChange(l.conn, "cart", "mutation", data)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to send mutation event: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user