diff --git a/cmd/cart/amqp-order-handler.go b/cmd/cart/amqp-order-handler.go index 5003732..dddaac9 100644 --- a/cmd/cart/amqp-order-handler.go +++ b/cmd/cart/amqp-order-handler.go @@ -9,42 +9,48 @@ import ( ) type AmqpOrderHandler struct { - Url string - Connection *amqp.Connection - Channel *amqp.Channel + conn *amqp.Connection } -func (h *AmqpOrderHandler) Connect() error { - conn, err := amqp.Dial(h.Url) - if err != nil { - return fmt.Errorf("failed to connect to RabbitMQ: %w", err) +func NewAmqpOrderHandler(conn *amqp.Connection) *AmqpOrderHandler { + return &AmqpOrderHandler{ + conn: conn, } - h.Connection = conn +} - ch, err := conn.Channel() +func (h *AmqpOrderHandler) DefineTopics() error { + ch, err := h.conn.Channel() if err != nil { return fmt.Errorf("failed to open a channel: %w", err) } - h.Channel = ch + defer ch.Close() - return nil -} + err = ch.ExchangeDeclare( + "orders", // name + "direct", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return fmt.Errorf("failed to declare an exchange: %w", err) + } -func (h *AmqpOrderHandler) Close() error { - if h.Channel != nil { - h.Channel.Close() - } - if h.Connection != nil { - return h.Connection.Close() - } return nil } func (h *AmqpOrderHandler) OrderCompleted(body []byte) error { + ch, err := h.conn.Channel() + if err != nil { + return fmt.Errorf("failed to open a channel: %w", err) + } + defer ch.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - err := h.Channel.PublishWithContext(ctx, + return ch.PublishWithContext(ctx, "orders", // exchange "new", // routing key false, // mandatory @@ -53,9 +59,5 @@ func (h *AmqpOrderHandler) OrderCompleted(body []byte) error { ContentType: "application/json", Body: body, }) - if err != nil { - return fmt.Errorf("failed to publish a message: %w", err) - } - return nil } diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 41ce388..88813db 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" + amqp "github.com/rabbitmq/amqp091-go" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -175,6 +176,15 @@ func main() { pool: pool, } + conn, err := amqp.Dial(amqpUrl) + if err != nil { + fmt.Errorf("failed to connect to RabbitMQ: %w", err) + } + + amqpListener := actor.NewAmqpListener(conn) + amqpListener.DefineTopics() + pool.AddListener(amqpListener) + grpcSrv, err := actor.NewControlServer[*CartGrain](controlPlaneConfig, pool) if err != nil { log.Fatalf("Error starting control plane gRPC server: %v\n", err) @@ -211,12 +221,12 @@ func main() { } }(GetDiscovery()) - orderHandler := &AmqpOrderHandler{ - Url: amqpUrl, - } + orderHandler := NewAmqpOrderHandler(conn) + orderHandler.DefineTopics() klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient) + mux := http.NewServeMux() mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve())) // only for local @@ -447,11 +457,7 @@ func confirmOrder(order *CheckoutOrder, orderHandler *AmqpOrderHandler) error { if err != nil { return err } - err = orderHandler.Connect() - if err != nil { - return err - } - defer orderHandler.Close() + err = orderHandler.OrderCompleted(orderToSend) if err != nil { return err diff --git a/pkg/actor/disk_storage.go b/pkg/actor/disk_storage.go index 2b71093..08daf71 100644 --- a/pkg/actor/disk_storage.go +++ b/pkg/actor/disk_storage.go @@ -10,6 +10,8 @@ import ( "time" "github.com/gogo/protobuf/proto" + "github.com/matst80/slask-finder/pkg/messaging" + amqp "github.com/rabbitmq/amqp091-go" ) type QueueEvent struct { @@ -26,7 +28,47 @@ type DiskStorage[V any] struct { type LogStorage[V any] interface { LoadEvents(id uint64, grain Grain[V]) error - AppendEvent(id uint64, msg ...proto.Message) error + AppendMutations(id uint64, msg ...proto.Message) error +} + +type LogListener interface { + AppendMutations(id uint64, msg ...ApplyResult) +} + +type AmqpListener struct { + conn *amqp.Connection +} + +func NewAmqpListener(conn *amqp.Connection) *AmqpListener { + return &AmqpListener{ + conn: conn, + } +} + +func (l *AmqpListener) DefineTopics() { + ch, err := l.conn.Channel() + if err != nil { + log.Fatalf("Failed to open a channel: %v", err) + } + defer ch.Close() + if err := messaging.DefineTopic(ch, "cart", "mutation"); err != nil { + log.Fatalf("Failed to declare topic mutation: %v", err) + } +} + +type CartEvent struct { + Id uint64 `json:"id"` + Mutations []ApplyResult `json:"mutations"` +} + +func (l *AmqpListener) AppendMutations(id uint64, msg ...ApplyResult) { + err := messaging.SendChange(l.conn, "cart", "mutation", &CartEvent{ + Id: id, + Mutations: msg, + }) + if err != nil { + log.Printf("Failed to send mutation event: %v", err) + } } func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] { @@ -108,7 +150,7 @@ func (s *DiskStorage[V]) Close() { close(s.done) } -func (s *DiskStorage[V]) AppendEvent(id uint64, msg ...proto.Message) error { +func (s *DiskStorage[V]) AppendMutations(id uint64, msg ...proto.Message) error { if s.queue != nil { queue := make([]QueueEvent, 0) data, found := s.queue.Load(id) diff --git a/pkg/actor/grpc_server.go b/pkg/actor/grpc_server.go index 76b24da..8db40e8 100644 --- a/pkg/actor/grpc_server.go +++ b/pkg/actor/grpc_server.go @@ -16,7 +16,6 @@ import ( // It delegates to a grain pool and cluster operations to a synced pool. type ControlServer[V any] struct { messages.UnimplementedControlPlaneServer - pool GrainPool[V] } diff --git a/pkg/actor/simple_grain_pool.go b/pkg/actor/simple_grain_pool.go index 77b53c5..c49111e 100644 --- a/pkg/actor/simple_grain_pool.go +++ b/pkg/actor/simple_grain_pool.go @@ -17,6 +17,7 @@ type SimpleGrainPool[V any] struct { mutationRegistry MutationRegistry spawn func(id uint64) (Grain[V], error) spawnHost func(host string) (Host, error) + listeners []LogListener storage LogStorage[V] ttl time.Duration poolSize int @@ -66,6 +67,19 @@ func NewSimpleGrainPool[V any](config GrainPoolConfig[V]) (*SimpleGrainPool[V], return p, nil } +func (p *SimpleGrainPool[V]) AddListener(listener LogListener) { + p.listeners = append(p.listeners, listener) +} + +func (p *SimpleGrainPool[V]) RemoveListener(listener LogListener) { + for i, l := range p.listeners { + if l == listener { + p.listeners = append(p.listeners[:i], p.listeners[i+1:]...) + break + } + } +} + func (p *SimpleGrainPool[V]) purge() { purgeLimit := time.Now().Add(-p.ttl) purgedIds := make([]uint64, 0, len(p.grains)) @@ -383,11 +397,14 @@ func (p *SimpleGrainPool[V]) Apply(id uint64, mutation ...proto.Message) (*Mutat } if p.storage != nil { go func() { - if err := p.storage.AppendEvent(id, mutation...); err != nil { + if err := p.storage.AppendMutations(id, mutation...); err != nil { log.Printf("failed to store mutation for grain %d: %v", id, err) } }() } + for _, listener := range p.listeners { + go listener.AppendMutations(id, mutations...) + } result, err := grain.GetCurrentState() if err != nil { return nil, err