This commit is contained in:
98
order_client.go
Normal file
98
order_client.go
Normal file
@@ -0,0 +1,98 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
type UpdateHandler interface {
|
||||
OrderPlaced(order Order)
|
||||
}
|
||||
|
||||
type RabbitTransportClient struct {
|
||||
Url string
|
||||
|
||||
OrderTopic string
|
||||
ClientName string
|
||||
handler UpdateHandler
|
||||
connection *amqp.Connection
|
||||
channel *amqp.Channel
|
||||
quit chan bool
|
||||
}
|
||||
|
||||
func (t *RabbitTransportClient) declareBindAndConsume(topic string) (<-chan amqp.Delivery, error) {
|
||||
q, err := t.channel.QueueDeclare(
|
||||
"", // name
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
true, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = t.channel.QueueBind(q.Name, topic, topic, false, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return t.channel.Consume(
|
||||
q.Name,
|
||||
"",
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
}
|
||||
|
||||
func (t *RabbitTransportClient) Connect(handler UpdateHandler) error {
|
||||
conn, err := amqp.DialConfig(t.Url, amqp.Config{
|
||||
Properties: amqp.NewConnectionProperties(),
|
||||
})
|
||||
//conn.Config.Vhost = t.VHost
|
||||
t.quit = make(chan bool)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.connection = conn
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.handler = handler
|
||||
t.channel = ch
|
||||
toAdd, err := t.declareBindAndConsume(t.OrderTopic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("Connected to rabbit upsert topic: %s", t.OrderTopic)
|
||||
go func(msgs <-chan amqp.Delivery) {
|
||||
for d := range msgs {
|
||||
|
||||
var order Order
|
||||
if err := json.Unmarshal(d.Body, &order); err == nil {
|
||||
log.Printf("Got order")
|
||||
t.handler.OrderPlaced(order)
|
||||
} else {
|
||||
log.Printf("Failed to unmarshal upset message %v", err)
|
||||
}
|
||||
}
|
||||
}(toAdd)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *RabbitTransportClient) Close() {
|
||||
if (t.channel != nil) && (!t.channel.IsClosed()) {
|
||||
t.channel.Close()
|
||||
}
|
||||
if (t.connection != nil) && (!t.connection.IsClosed()) {
|
||||
t.connection.Close()
|
||||
}
|
||||
//t.quit <- true
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user