package main import ( "encoding/json" "log" amqp "github.com/rabbitmq/amqp091-go" ) type UpdateHandler interface { OrderPlaced(order Order) } type RabbitTransportClient struct { Url string OrderTopic string ClientName string handler UpdateHandler connection *amqp.Connection channel *amqp.Channel quit chan bool } func (t *RabbitTransportClient) declareBindAndConsume(topic string) (<-chan amqp.Delivery, error) { q, err := t.channel.QueueDeclare( "orders", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) if err != nil { return nil, err } err = t.channel.QueueBind(q.Name, topic, topic, false, nil) if err != nil { return nil, err } return t.channel.Consume( q.Name, "", true, false, false, false, nil, ) } func (t *RabbitTransportClient) Connect(handler UpdateHandler) error { conn, err := amqp.DialConfig(t.Url, amqp.Config{ Properties: amqp.NewConnectionProperties(), }) //conn.Config.Vhost = t.VHost t.quit = make(chan bool) if err != nil { return err } t.connection = conn ch, err := conn.Channel() if err != nil { return err } t.handler = handler t.channel = ch toAdd, err := t.declareBindAndConsume(t.OrderTopic) if err != nil { return err } log.Printf("Connected to rabbit upsert topic: %s", t.OrderTopic) go func(msgs <-chan amqp.Delivery) { for d := range msgs { var order Order if err := json.Unmarshal(d.Body, &order); err == nil { log.Printf("Got order") t.handler.OrderPlaced(order) } else { log.Printf("Failed to unmarshal upset message %v", err) } } }(toAdd) return nil } func (t *RabbitTransportClient) Close() { if (t.channel != nil) && (!t.channel.IsClosed()) { t.channel.Close() } if (t.connection != nil) && (!t.connection.IsClosed()) { t.connection.Close() } //t.quit <- true }