From a8a697d113b694d3f6209f079283817feddddeb1 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 14:40:03 +0200 Subject: [PATCH] ack messages --- cmd/backoffice/main.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index f28ed8a..03fbdd1 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -3,7 +3,6 @@ package main import ( "context" "errors" - "fmt" "log" "net/http" "os" @@ -75,11 +74,12 @@ func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) return case m, ok := <-msgs: if !ok { - log.Print("no message, would have closed") + log.Fatalf("connection closed") continue } // Log and broadcast to all websocket clients log.Printf("mutation event: %s", string(m.Body)) + if hub != nil { select { case hub.broadcast <- m.Body: @@ -87,6 +87,9 @@ func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) // if hub queue is full, drop to avoid blocking } } + if err := m.Ack(false); err != nil { + log.Printf("error acknowledging message: %v", err) + } } } }() @@ -152,7 +155,7 @@ func main() { if amqpURL != "" { conn, err := amqp.Dial(amqpURL) if err != nil { - fmt.Errorf("failed to connect to RabbitMQ: %w", err) + log.Fatalf("failed to connect to RabbitMQ: %w", err) } if err := startMutationConsumer(ctx, conn, hub); err != nil { log.Printf("AMQP listener disabled: %v", err)