update
This commit is contained in:
83
amqp-order-handler.go
Normal file
83
amqp-order-handler.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
type RabbitTransportMaster struct {
|
||||
Url string
|
||||
connection *amqp.Connection
|
||||
//channel *amqp.Channel
|
||||
}
|
||||
|
||||
const (
|
||||
topic = "order-placed"
|
||||
)
|
||||
|
||||
func (t *RabbitTransportMaster) Connect() error {
|
||||
|
||||
conn, err := amqp.DialConfig(t.Url, amqp.Config{
|
||||
//Vhost: "/",
|
||||
Properties: amqp.NewConnectionProperties(),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.connection = conn
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer ch.Close()
|
||||
if err := ch.ExchangeDeclare(
|
||||
topic, // name
|
||||
"topic", // type
|
||||
true, // durable
|
||||
false, // auto-delete
|
||||
false, // internal
|
||||
false, // noWait
|
||||
nil, // arguments
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err = ch.QueueDeclare(
|
||||
topic, // name of the queue
|
||||
true, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // noWait
|
||||
nil, // arguments
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *RabbitTransportMaster) Close() error {
|
||||
log.Println("Closing master channel")
|
||||
return t.connection.Close()
|
||||
//return t.channel.Close()
|
||||
}
|
||||
|
||||
func (t *RabbitTransportMaster) OrderCompleted(data []byte) error {
|
||||
ch, err := t.connection.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer ch.Close()
|
||||
return ch.Publish(
|
||||
topic,
|
||||
topic,
|
||||
true,
|
||||
false,
|
||||
amqp.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: data,
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -100,6 +100,8 @@ spec:
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: status.podIP
|
||||
- name: AMQP_URL
|
||||
value: "amqp://admin:12bananer@rabbitmq.dev:5672/"
|
||||
- name: POD_NAME
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
@@ -202,6 +204,8 @@ spec:
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: status.podIP
|
||||
- name: AMQP_URL
|
||||
value: "amqp://admin:12bananer@rabbitmq.dev:5672/"
|
||||
- name: POD_NAME
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
|
||||
1
go.mod
1
go.mod
@@ -6,6 +6,7 @@ require (
|
||||
github.com/Flaconi/go-klarna v0.0.0-20230216165926-e2f708c721d9
|
||||
github.com/matst80/slask-finder v0.0.0-20250418094723-2eb7d6615761
|
||||
github.com/prometheus/client_golang v1.22.0
|
||||
github.com/rabbitmq/amqp091-go v1.10.0
|
||||
github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e
|
||||
golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0
|
||||
google.golang.org/protobuf v1.36.6
|
||||
|
||||
4
go.sum
4
go.sum
@@ -86,6 +86,8 @@ github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA
|
||||
github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18=
|
||||
github.com/prometheus/procfs v0.16.0 h1:xh6oHhKwnOJKMYiYBDWmkHqQPyiY40sny36Cmx2bbsM=
|
||||
github.com/prometheus/procfs v0.16.0/go.mod h1:8veyXUu3nGP7oaCxhX6yeaM5u4stL2FeMXnCqhDthZg=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
|
||||
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
|
||||
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
|
||||
@@ -102,6 +104,8 @@ github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e h1:fAzVSmKQkWfl
|
||||
github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e/go.mod h1:gQsFrHrY6nviQu+VX7zKWDyhtLPNzngtYZ+C+7cywdk=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
|
||||
20
main.go
20
main.go
@@ -95,6 +95,7 @@ func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
var podIp = os.Getenv("POD_IP")
|
||||
var name = os.Getenv("POD_NAME")
|
||||
var amqpUrl = os.Getenv("AMQP_URL")
|
||||
|
||||
func GetDiscovery() Discovery {
|
||||
if podIp == "" {
|
||||
@@ -143,7 +144,9 @@ func main() {
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
orderHandler := &RabbitTransportMaster{
|
||||
Url: amqpUrl,
|
||||
}
|
||||
syncedServer := NewPoolServer(syncedPool, fmt.Sprintf("%s, %s", name, podIp))
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve()))
|
||||
@@ -217,6 +220,18 @@ func main() {
|
||||
w.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
orderToSend, err := json.Marshal(klarnaOrderResponse)
|
||||
if err != nil {
|
||||
log.Printf("Error marshaling order: %v\n", err)
|
||||
} else {
|
||||
err = orderHandler.OrderCompleted(orderToSend)
|
||||
if err != nil {
|
||||
log.Printf("Error sending order: %v\n", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
}
|
||||
_, err = syncedServer.pool.Process(ToCartId(klarnaOrderResponse.MerchantReference1), Message{
|
||||
Type: OrderCompletedType,
|
||||
Content: messages.OrderCreated{
|
||||
@@ -225,9 +240,6 @@ func main() {
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("Error processing cart message: %v\n", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
Reference in New Issue
Block a user