136 lines
2.5 KiB
Go
136 lines
2.5 KiB
Go
package main
|
|
|
|
import (
|
|
"compress/gzip"
|
|
"encoding/gob"
|
|
"encoding/json"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"runtime"
|
|
"sync"
|
|
)
|
|
|
|
var amqpUrl = os.Getenv("AMQP_URL")
|
|
|
|
func init() {
|
|
gob.Register([]Order{})
|
|
if amqpUrl == "" {
|
|
log.Fatal("AMQP_URL environment variable is not set")
|
|
}
|
|
}
|
|
|
|
type PersistingOrderHandler struct {
|
|
mu sync.RWMutex
|
|
Orders []Order `json:"orders"`
|
|
fileName string
|
|
}
|
|
|
|
func (h *PersistingOrderHandler) Load() error {
|
|
file, err := os.Open(h.fileName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer runtime.GC()
|
|
defer file.Close()
|
|
|
|
zipReader, err := gzip.NewReader(file)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
enc := json.NewDecoder(zipReader)
|
|
defer zipReader.Close()
|
|
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
tmp := Order{}
|
|
for err == nil {
|
|
if err = enc.Decode(&tmp); err == nil {
|
|
h.Orders = append(h.Orders, tmp)
|
|
}
|
|
}
|
|
enc = nil
|
|
|
|
if err.Error() == "EOF" {
|
|
return nil
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (h *PersistingOrderHandler) Save() error {
|
|
file, err := os.Create(h.fileName + ".tmp")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer runtime.GC()
|
|
defer file.Close()
|
|
zipWriter := gzip.NewWriter(file)
|
|
enc := json.NewEncoder(zipWriter)
|
|
defer zipWriter.Close()
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
for _, order := range h.Orders {
|
|
err = enc.Encode(order)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
enc = nil
|
|
return os.Rename(h.fileName+".tmp", h.fileName)
|
|
}
|
|
|
|
func (h *PersistingOrderHandler) OrderPlaced(order Order) {
|
|
// Here you would implement the logic to persist the order
|
|
log.Printf("Order placed: %s", order.ID)
|
|
h.mu.Lock()
|
|
h.Orders = append(h.Orders, order)
|
|
h.mu.Unlock()
|
|
go func() {
|
|
err := h.Save()
|
|
if err != nil {
|
|
log.Printf("error saving: %v", err)
|
|
}
|
|
}()
|
|
|
|
}
|
|
|
|
func main() {
|
|
handler := &PersistingOrderHandler{
|
|
Orders: make([]Order, 0),
|
|
mu: sync.RWMutex{},
|
|
fileName: "data/order.dbz",
|
|
}
|
|
err := handler.Load()
|
|
if err != nil {
|
|
//log.Fatal(err)
|
|
}
|
|
client := &RabbitTransportClient{
|
|
Url: amqpUrl,
|
|
OrderTopic: "order-placed",
|
|
ClientName: "OrderClient",
|
|
}
|
|
|
|
err = client.Connect(handler)
|
|
if err != nil {
|
|
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("GET /orders", func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
handler.mu.RLock()
|
|
defer handler.mu.RUnlock()
|
|
json.NewEncoder(w).Encode(handler.Orders)
|
|
})
|
|
if err := http.ListenAndServe(":8080", mux); err != nil {
|
|
log.Fatalf("Failed to start server: %v", err)
|
|
}
|
|
}
|