From 4cc1851626b88ba3889731921cac290f37a29848 Mon Sep 17 00:00:00 2001 From: matst80 Date: Fri, 18 Apr 2025 18:00:40 +0200 Subject: [PATCH] update --- amqp-order-handler.go | 83 ++++++++++++++++++++++++++++++++++++++ deployment/deployment.yaml | 4 ++ go.mod | 1 + go.sum | 4 ++ main.go | 20 +++++++-- 5 files changed, 108 insertions(+), 4 deletions(-) create mode 100644 amqp-order-handler.go diff --git a/amqp-order-handler.go b/amqp-order-handler.go new file mode 100644 index 0000000..dd14360 --- /dev/null +++ b/amqp-order-handler.go @@ -0,0 +1,83 @@ +package main + +import ( + "log" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type RabbitTransportMaster struct { + Url string + connection *amqp.Connection + //channel *amqp.Channel +} + +const ( + topic = "order-placed" +) + +func (t *RabbitTransportMaster) Connect() error { + + conn, err := amqp.DialConfig(t.Url, amqp.Config{ + //Vhost: "/", + Properties: amqp.NewConnectionProperties(), + }) + + if err != nil { + return err + } + t.connection = conn + ch, err := conn.Channel() + if err != nil { + return err + } + defer ch.Close() + if err := ch.ExchangeDeclare( + topic, // name + "topic", // type + true, // durable + false, // auto-delete + false, // internal + false, // noWait + nil, // arguments + ); err != nil { + return err + } + + if _, err = ch.QueueDeclare( + topic, // name of the queue + true, // durable + false, // delete when unused + false, // exclusive + false, // noWait + nil, // arguments + ); err != nil { + return err + } + + return nil +} + +func (t *RabbitTransportMaster) Close() error { + log.Println("Closing master channel") + return t.connection.Close() + //return t.channel.Close() +} + +func (t *RabbitTransportMaster) OrderCompleted(data []byte) error { + ch, err := t.connection.Channel() + if err != nil { + return err + } + defer ch.Close() + return ch.Publish( + topic, + topic, + true, + false, + amqp.Publishing{ + ContentType: "application/json", + Body: data, + }, + ) +} diff --git a/deployment/deployment.yaml b/deployment/deployment.yaml index 77bc6bd..272f61f 100644 --- a/deployment/deployment.yaml +++ b/deployment/deployment.yaml @@ -100,6 +100,8 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP + - name: AMQP_URL + value: "amqp://admin:12bananer@rabbitmq.dev:5672/" - name: POD_NAME valueFrom: fieldRef: @@ -202,6 +204,8 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP + - name: AMQP_URL + value: "amqp://admin:12bananer@rabbitmq.dev:5672/" - name: POD_NAME valueFrom: fieldRef: diff --git a/go.mod b/go.mod index 9b6b46f..953d726 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Flaconi/go-klarna v0.0.0-20230216165926-e2f708c721d9 github.com/matst80/slask-finder v0.0.0-20250418094723-2eb7d6615761 github.com/prometheus/client_golang v1.22.0 + github.com/rabbitmq/amqp091-go v1.10.0 github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 google.golang.org/protobuf v1.36.6 diff --git a/go.sum b/go.sum index 4333e31..8793f42 100644 --- a/go.sum +++ b/go.sum @@ -86,6 +86,8 @@ github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18= github.com/prometheus/procfs v0.16.0 h1:xh6oHhKwnOJKMYiYBDWmkHqQPyiY40sny36Cmx2bbsM= github.com/prometheus/procfs v0.16.0/go.mod h1:8veyXUu3nGP7oaCxhX6yeaM5u4stL2FeMXnCqhDthZg= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= @@ -102,6 +104,8 @@ github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e h1:fAzVSmKQkWfl github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e/go.mod h1:gQsFrHrY6nviQu+VX7zKWDyhtLPNzngtYZ+C+7cywdk= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/main.go b/main.go index 5e81e5d..be14e70 100644 --- a/main.go +++ b/main.go @@ -95,6 +95,7 @@ func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) { var podIp = os.Getenv("POD_IP") var name = os.Getenv("POD_NAME") +var amqpUrl = os.Getenv("AMQP_URL") func GetDiscovery() Discovery { if podIp == "" { @@ -143,7 +144,9 @@ func main() { } } }() - + orderHandler := &RabbitTransportMaster{ + Url: amqpUrl, + } syncedServer := NewPoolServer(syncedPool, fmt.Sprintf("%s, %s", name, podIp)) mux := http.NewServeMux() mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve())) @@ -217,6 +220,18 @@ func main() { w.Write([]byte(err.Error())) return } + orderToSend, err := json.Marshal(klarnaOrderResponse) + if err != nil { + log.Printf("Error marshaling order: %v\n", err) + } else { + err = orderHandler.OrderCompleted(orderToSend) + if err != nil { + log.Printf("Error sending order: %v\n", err) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + } _, err = syncedServer.pool.Process(ToCartId(klarnaOrderResponse.MerchantReference1), Message{ Type: OrderCompletedType, Content: messages.OrderCreated{ @@ -225,9 +240,6 @@ func main() { }) if err != nil { log.Printf("Error processing cart message: %v\n", err) - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) - return } w.WriteHeader(http.StatusOK)