Files
go-order-manager/order_client.go
matst80 4854db055f
Some checks failed
Build and Publish / BuildAndDeploy (push) Has been cancelled
Update order_client.go
2025-11-11 17:28:11 +01:00

99 lines
1.9 KiB
Go

package main
import (
"encoding/json"
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
type UpdateHandler interface {
OrderPlaced(order Order)
}
type RabbitTransportClient struct {
Url string
OrderTopic string
ClientName string
handler UpdateHandler
connection *amqp.Connection
channel *amqp.Channel
quit chan bool
}
func (t *RabbitTransportClient) declareBindAndConsume(topic string) (<-chan amqp.Delivery, error) {
q, err := t.channel.QueueDeclare(
"order-queue", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
err = t.channel.QueueBind(q.Name, topic, topic, false, nil)
if err != nil {
return nil, err
}
return t.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
}
func (t *RabbitTransportClient) Connect(handler UpdateHandler) error {
conn, err := amqp.DialConfig(t.Url, amqp.Config{
Properties: amqp.NewConnectionProperties(),
})
//conn.Config.Vhost = t.VHost
t.quit = make(chan bool)
if err != nil {
return err
}
t.connection = conn
ch, err := conn.Channel()
if err != nil {
return err
}
t.handler = handler
t.channel = ch
toAdd, err := t.declareBindAndConsume(t.OrderTopic)
if err != nil {
return err
}
log.Printf("Connected to rabbit order topic: %s", t.OrderTopic)
go func(msgs <-chan amqp.Delivery) {
for d := range msgs {
var order Order
if err := json.Unmarshal(d.Body, &order); err == nil {
log.Printf("Got order")
t.handler.OrderPlaced(order)
} else {
log.Printf("Failed to unmarshal upset message %v", err)
}
}
}(toAdd)
return nil
}
func (t *RabbitTransportClient) Close() {
if (t.channel != nil) && (!t.channel.IsClosed()) {
t.channel.Close()
}
if (t.connection != nil) && (!t.connection.IsClosed()) {
t.connection.Close()
}
//t.quit <- true
}