feature/backoffice #6

Merged
mats merged 24 commits from feature/backoffice into main 2025-10-16 09:46:07 +02:00
Showing only changes of commit a8a697d113 - Show all commits

View File

@@ -3,7 +3,6 @@ package main
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"log" "log"
"net/http" "net/http"
"os" "os"
@@ -75,11 +74,12 @@ func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub)
return return
case m, ok := <-msgs: case m, ok := <-msgs:
if !ok { if !ok {
log.Print("no message, would have closed") log.Fatalf("connection closed")
continue continue
} }
// Log and broadcast to all websocket clients // Log and broadcast to all websocket clients
log.Printf("mutation event: %s", string(m.Body)) log.Printf("mutation event: %s", string(m.Body))
if hub != nil { if hub != nil {
select { select {
case hub.broadcast <- m.Body: case hub.broadcast <- m.Body:
@@ -87,6 +87,9 @@ func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub)
// if hub queue is full, drop to avoid blocking // if hub queue is full, drop to avoid blocking
} }
} }
if err := m.Ack(false); err != nil {
log.Printf("error acknowledging message: %v", err)
}
} }
} }
}() }()
@@ -152,7 +155,7 @@ func main() {
if amqpURL != "" { if amqpURL != "" {
conn, err := amqp.Dial(amqpURL) conn, err := amqp.Dial(amqpURL)
if err != nil { if err != nil {
fmt.Errorf("failed to connect to RabbitMQ: %w", err) log.Fatalf("failed to connect to RabbitMQ: %w", err)
} }
if err := startMutationConsumer(ctx, conn, hub); err != nil { if err := startMutationConsumer(ctx, conn, hub); err != nil {
log.Printf("AMQP listener disabled: %v", err) log.Printf("AMQP listener disabled: %v", err)