package main import ( "compress/gzip" "encoding/gob" "encoding/json" "log" "net/http" "os" "runtime" "sync" ) var amqpUrl = os.Getenv("AMQP_URL") func init() { gob.Register([]Order{}) if amqpUrl == "" { log.Fatal("AMQP_URL environment variable is not set") } } type PersistingOrderHandler struct { mu sync.RWMutex Orders map[string]Order `json:"orders"` fileName string } func (h *PersistingOrderHandler) Load() error { file, err := os.Open(h.fileName) if err != nil { return err } defer runtime.GC() defer file.Close() zipReader, err := gzip.NewReader(file) if err != nil { return err } enc := json.NewDecoder(zipReader) defer zipReader.Close() h.mu.Lock() defer h.mu.Unlock() tmp := Order{} for err == nil { if err = enc.Decode(&tmp); err == nil { h.Orders[tmp.ID] = tmp } } enc = nil if err.Error() == "EOF" { return nil } return err } func (h *PersistingOrderHandler) GetById(id string) (*Order, bool) { h.mu.RLock() defer h.mu.RUnlock() order, ok := h.Orders[id] return &order, ok } func (h *PersistingOrderHandler) Save() error { file, err := os.Create(h.fileName + ".tmp") if err != nil { return err } defer runtime.GC() defer file.Close() zipWriter := gzip.NewWriter(file) enc := json.NewEncoder(zipWriter) defer zipWriter.Close() h.mu.Lock() defer h.mu.Unlock() for _, order := range h.Orders { err = enc.Encode(order) if err != nil { return err } } enc = nil return os.Rename(h.fileName+".tmp", h.fileName) } func (h *PersistingOrderHandler) GetLatest() []Order { h.mu.RLock() defer h.mu.RUnlock() ret := make([]Order, 0, len(h.Orders)) for _, order := range h.Orders { ret = append(ret, order) } return ret } func (h *PersistingOrderHandler) OrderPlaced(order Order) { // Here you would implement the logic to persist the order log.Printf("Order placed: %s", order.ID) h.mu.Lock() h.Orders[order.ID] = order h.mu.Unlock() go func() { err := h.Save() if err != nil { log.Printf("error saving: %v", err) } }() } func main() { handler := &PersistingOrderHandler{ Orders: make(map[string]Order), mu: sync.RWMutex{}, fileName: "data/order.dbz", } err := handler.Load() if err != nil { //log.Fatal(err) } client := &RabbitTransportClient{ Url: amqpUrl, OrderTopic: "order-placed", ClientName: "OrderClient", } err = client.Connect(handler) if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer client.Close() mux := http.NewServeMux() mux.HandleFunc("GET /api/orders", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(handler.GetLatest()) }) mux.HandleFunc("GET /api/orders/{id}", func(w http.ResponseWriter, r *http.Request) { order_id := r.PathValue("id") order, ok := handler.GetById(order_id) if !ok { w.WriteHeader(http.StatusNotFound) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(order) }) mux.HandleFunc("POST /api/orders/{id}/capture", func(w http.ResponseWriter, r *http.Request) { order_id := r.PathValue("id") capture := &CaptureData{} json.NewDecoder(r.Body).Decode(capture) w.WriteHeader(http.StatusNotImplemented) w.Write([]byte(order_id)) }) if err := http.ListenAndServe(":8080", mux); err != nil { log.Fatalf("Failed to start server: %v", err) } }