initial
This commit is contained in:
135
main.go
Normal file
135
main.go
Normal file
@@ -0,0 +1,135 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"runtime"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var amqpUrl = os.Getenv("AMQP_URL")
|
||||
|
||||
func init() {
|
||||
gob.Register([]Order{})
|
||||
if amqpUrl == "" {
|
||||
log.Fatal("AMQP_URL environment variable is not set")
|
||||
}
|
||||
}
|
||||
|
||||
type PersistingOrderHandler struct {
|
||||
mu sync.RWMutex
|
||||
Orders []Order `json:"orders"`
|
||||
fileName string
|
||||
}
|
||||
|
||||
func (h *PersistingOrderHandler) Load() error {
|
||||
file, err := os.Open(h.fileName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer runtime.GC()
|
||||
defer file.Close()
|
||||
|
||||
zipReader, err := gzip.NewReader(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
enc := json.NewDecoder(zipReader)
|
||||
defer zipReader.Close()
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
tmp := Order{}
|
||||
for err == nil {
|
||||
if err = enc.Decode(&tmp); err == nil {
|
||||
h.Orders = append(h.Orders, tmp)
|
||||
}
|
||||
}
|
||||
enc = nil
|
||||
|
||||
if err.Error() == "EOF" {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (h *PersistingOrderHandler) Save() error {
|
||||
file, err := os.Create(h.fileName + ".tmp")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer runtime.GC()
|
||||
defer file.Close()
|
||||
zipWriter := gzip.NewWriter(file)
|
||||
enc := json.NewEncoder(zipWriter)
|
||||
defer zipWriter.Close()
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
for _, order := range h.Orders {
|
||||
err = enc.Encode(order)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
enc = nil
|
||||
return os.Rename(h.fileName+".tmp", h.fileName)
|
||||
}
|
||||
|
||||
func (h *PersistingOrderHandler) OrderPlaced(order Order) {
|
||||
// Here you would implement the logic to persist the order
|
||||
log.Printf("Order placed: %s", order.ID)
|
||||
h.mu.Lock()
|
||||
h.Orders = append(h.Orders, order)
|
||||
h.mu.Unlock()
|
||||
go func() {
|
||||
err := h.Save()
|
||||
if err != nil {
|
||||
log.Printf("error saving: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
}
|
||||
|
||||
func main() {
|
||||
handler := &PersistingOrderHandler{
|
||||
Orders: make([]Order, 0),
|
||||
mu: sync.RWMutex{},
|
||||
fileName: "data/order.dbz",
|
||||
}
|
||||
err := handler.Load()
|
||||
if err != nil {
|
||||
//log.Fatal(err)
|
||||
}
|
||||
client := &RabbitTransportClient{
|
||||
Url: amqpUrl,
|
||||
OrderTopic: "order-placed",
|
||||
ClientName: "OrderClient",
|
||||
}
|
||||
|
||||
err = client.Connect(handler)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("GET /orders", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
handler.mu.RLock()
|
||||
defer handler.mu.RUnlock()
|
||||
json.NewEncoder(w).Encode(handler.Orders)
|
||||
})
|
||||
if err := http.ListenAndServe(":8080", mux); err != nil {
|
||||
log.Fatalf("Failed to start server: %v", err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user