package main import ( "context" "fmt" "time" amqp "github.com/rabbitmq/amqp091-go" ) type AmqpOrderHandler struct { conn *amqp.Connection } func NewAmqpOrderHandler(conn *amqp.Connection) *AmqpOrderHandler { return &AmqpOrderHandler{ conn: conn, } } func (h *AmqpOrderHandler) DefineTopics() error { ch, err := h.conn.Channel() if err != nil { return fmt.Errorf("failed to open a channel: %w", err) } defer ch.Close() err = ch.ExchangeDeclare( "orders", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err != nil { return fmt.Errorf("failed to declare an exchange: %w", err) } return nil } func (h *AmqpOrderHandler) OrderCompleted(body []byte) error { ch, err := h.conn.Channel() if err != nil { return fmt.Errorf("failed to open a channel: %w", err) } defer ch.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() return ch.PublishWithContext(ctx, "orders", // exchange "new", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", Body: body, }) }