94 lines
1.8 KiB
Go
94 lines
1.8 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"log"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
type UpdateHandler interface {
|
|
OrderPlaced(order Order)
|
|
}
|
|
|
|
type RabbitTransportClient struct {
|
|
Url string
|
|
ClientName string
|
|
handler UpdateHandler
|
|
connection *amqp.Connection
|
|
channel *amqp.Channel
|
|
quit chan bool
|
|
}
|
|
|
|
func (t *RabbitTransportClient) declareBindAndConsume() (<-chan amqp.Delivery, error) {
|
|
q, err := t.channel.QueueDeclare(
|
|
"order-queue", // name
|
|
false, // durable
|
|
false, // delete when unused
|
|
true, // exclusive
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return t.channel.Consume(
|
|
q.Name,
|
|
"",
|
|
true,
|
|
false,
|
|
false,
|
|
false,
|
|
nil,
|
|
)
|
|
}
|
|
|
|
func (t *RabbitTransportClient) Connect(handler UpdateHandler) error {
|
|
conn, err := amqp.DialConfig(t.Url, amqp.Config{
|
|
Properties: amqp.NewConnectionProperties(),
|
|
})
|
|
//conn.Config.Vhost = t.VHost
|
|
t.quit = make(chan bool)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
t.connection = conn
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
t.handler = handler
|
|
t.channel = ch
|
|
toAdd, err := t.declareBindAndConsume()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Printf("Connected to rabbit order topic")
|
|
go func(msgs <-chan amqp.Delivery) {
|
|
for d := range msgs {
|
|
|
|
var order Order
|
|
if err := json.Unmarshal(d.Body, &order); err == nil {
|
|
log.Printf("Got order")
|
|
t.handler.OrderPlaced(order)
|
|
} else {
|
|
log.Printf("Failed to unmarshal upset message %v", err)
|
|
}
|
|
}
|
|
}(toAdd)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *RabbitTransportClient) Close() {
|
|
if (t.channel != nil) && (!t.channel.IsClosed()) {
|
|
t.channel.Close()
|
|
}
|
|
if (t.connection != nil) && (!t.connection.IsClosed()) {
|
|
t.connection.Close()
|
|
}
|
|
//t.quit <- true
|
|
|
|
}
|