listen to correct topic
All checks were successful
Build and Publish / Metadata (push) Successful in 9s
Build and Publish / BuildAndDeployAmd64 (push) Successful in 1m26s
Build and Publish / BuildAndDeployArm64 (push) Successful in 4m32s

This commit is contained in:
matst80
2025-10-15 09:52:43 +02:00
parent a5d61ce56f
commit b918f406df

View File

@@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"log" "log"
"net/http" "net/http"
"os" "os"
@@ -12,6 +13,7 @@ import (
actor "git.tornberg.me/go-cart-actor/pkg/actor" actor "git.tornberg.me/go-cart-actor/pkg/actor"
"git.tornberg.me/go-cart-actor/pkg/cart" "git.tornberg.me/go-cart-actor/pkg/cart"
messages "git.tornberg.me/go-cart-actor/pkg/messages" messages "git.tornberg.me/go-cart-actor/pkg/messages"
"github.com/matst80/slask-finder/pkg/messaging"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
) )
@@ -52,59 +54,21 @@ func buildRegistry() actor.MutationRegistry {
return reg return reg
} }
func startMutationConsumer(ctx context.Context, amqpURL string, hub *Hub) error { func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) error {
conn, err := amqp.Dial(amqpURL)
if err != nil {
return err
}
ch, err := conn.Channel() ch, err := conn.Channel()
if err != nil { if err != nil {
_ = conn.Close() _ = conn.Close()
return err return err
} }
msgs, err := messaging.DeclareBindAndConsume(ch, "cart", "mutation")
if err != nil {
_ = ch.Close()
return err
}
// declare exchange (idempotent)
if err := ch.ExchangeDeclare(
"cart", // name
"topic", // type
true, // durable
false, // autoDelete
false, // internal
false, // noWait
nil, // args
); err != nil {
_ = ch.Close()
_ = conn.Close()
return err
}
// declare an exclusive, auto-deleted queue by default
q, err := ch.QueueDeclare(
"", // name -> let server generate
false, // durable
true, // autoDelete
true, // exclusive
false, // noWait
nil, // args
)
if err != nil {
_ = ch.Close()
_ = conn.Close()
return err
}
if err := ch.QueueBind(q.Name, "mutation", "cart", false, nil); err != nil {
_ = ch.Close()
_ = conn.Close()
return err
}
msgs, err := ch.Consume(q.Name, "backoffice", true, true, false, false, nil)
if err != nil {
_ = ch.Close()
_ = conn.Close()
return err
}
go func() { go func() {
defer ch.Close() defer ch.Close()
defer conn.Close()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -185,7 +149,11 @@ func main() {
defer cancel() defer cancel()
if amqpURL != "" { if amqpURL != "" {
if err := startMutationConsumer(ctx, amqpURL, hub); err != nil { conn, err := amqp.Dial(amqpURL)
if err != nil {
fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}
if err := startMutationConsumer(ctx, conn, hub); err != nil {
log.Printf("AMQP listener disabled: %v", err) log.Printf("AMQP listener disabled: %v", err)
} else { } else {
log.Printf("AMQP listener connected") log.Printf("AMQP listener connected")