diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index 8697dea..50df960 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -3,6 +3,7 @@ package main import ( "context" "errors" + "fmt" "log" "net/http" "os" @@ -12,6 +13,7 @@ import ( actor "git.tornberg.me/go-cart-actor/pkg/actor" "git.tornberg.me/go-cart-actor/pkg/cart" messages "git.tornberg.me/go-cart-actor/pkg/messages" + "github.com/matst80/slask-finder/pkg/messaging" amqp "github.com/rabbitmq/amqp091-go" ) @@ -52,59 +54,21 @@ func buildRegistry() actor.MutationRegistry { return reg } -func startMutationConsumer(ctx context.Context, amqpURL string, hub *Hub) error { - conn, err := amqp.Dial(amqpURL) - if err != nil { - return err - } +func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) error { ch, err := conn.Channel() if err != nil { _ = conn.Close() return err } + msgs, err := messaging.DeclareBindAndConsume(ch, "cart", "mutation") + if err != nil { + _ = ch.Close() + return err + } - // declare exchange (idempotent) - if err := ch.ExchangeDeclare( - "cart", // name - "topic", // type - true, // durable - false, // autoDelete - false, // internal - false, // noWait - nil, // args - ); err != nil { - _ = ch.Close() - _ = conn.Close() - return err - } - // declare an exclusive, auto-deleted queue by default - q, err := ch.QueueDeclare( - "", // name -> let server generate - false, // durable - true, // autoDelete - true, // exclusive - false, // noWait - nil, // args - ) - if err != nil { - _ = ch.Close() - _ = conn.Close() - return err - } - if err := ch.QueueBind(q.Name, "mutation", "cart", false, nil); err != nil { - _ = ch.Close() - _ = conn.Close() - return err - } - msgs, err := ch.Consume(q.Name, "backoffice", true, true, false, false, nil) - if err != nil { - _ = ch.Close() - _ = conn.Close() - return err - } go func() { defer ch.Close() - defer conn.Close() + for { select { case <-ctx.Done(): @@ -185,7 +149,11 @@ func main() { defer cancel() if amqpURL != "" { - if err := startMutationConsumer(ctx, amqpURL, hub); err != nil { + conn, err := amqp.Dial(amqpURL) + if err != nil { + fmt.Errorf("failed to connect to RabbitMQ: %w", err) + } + if err := startMutationConsumer(ctx, conn, hub); err != nil { log.Printf("AMQP listener disabled: %v", err) } else { log.Printf("AMQP listener connected")