package main import ( "context" "errors" "log" "net/http" "os" "regexp" "time" actor "git.tornberg.me/go-cart-actor/pkg/actor" "git.tornberg.me/go-cart-actor/pkg/cart" messages "git.tornberg.me/go-cart-actor/pkg/messages" amqp "github.com/rabbitmq/amqp091-go" ) type CartFileInfo struct { ID uint64 `json:"id"` Path string `json:"path"` Size int64 `json:"size"` Modified time.Time `json:"modified"` } func envOrDefault(key, def string) string { if v := os.Getenv(key); v != "" { return v } return def } var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`) var globalDisk *actor.DiskStorage[cart.CartGrain] func buildRegistry() actor.MutationRegistry { reg := actor.NewMutationRegistry() reg.RegisterMutations( actor.NewMutation(cart.AddItem, func() *messages.AddItem { return &messages.AddItem{} }), actor.NewMutation(cart.ChangeQuantity, func() *messages.ChangeQuantity { return &messages.ChangeQuantity{} }), actor.NewMutation(cart.RemoveItem, func() *messages.RemoveItem { return &messages.RemoveItem{} }), actor.NewMutation(cart.InitializeCheckout, func() *messages.InitializeCheckout { return &messages.InitializeCheckout{} }), actor.NewMutation(cart.OrderCreated, func() *messages.OrderCreated { return &messages.OrderCreated{} }), actor.NewMutation(cart.RemoveDelivery, func() *messages.RemoveDelivery { return &messages.RemoveDelivery{} }), actor.NewMutation(cart.SetDelivery, func() *messages.SetDelivery { return &messages.SetDelivery{} }), actor.NewMutation(cart.SetPickupPoint, func() *messages.SetPickupPoint { return &messages.SetPickupPoint{} }), actor.NewMutation(cart.ClearCart, func() *messages.ClearCartRequest { return &messages.ClearCartRequest{} }), actor.NewMutation(cart.AddVoucher, func() *messages.AddVoucher { return &messages.AddVoucher{} }), actor.NewMutation(cart.RemoveVoucher, func() *messages.RemoveVoucher { return &messages.RemoveVoucher{} }), ) return reg } func startMutationConsumer(ctx context.Context, amqpURL string, hub *Hub) error { conn, err := amqp.Dial(amqpURL) if err != nil { return err } ch, err := conn.Channel() if err != nil { _ = conn.Close() return err } // declare exchange (idempotent) if err := ch.ExchangeDeclare( "cart", // name "topic", // type true, // durable false, // autoDelete false, // internal false, // noWait nil, // args ); err != nil { _ = ch.Close() _ = conn.Close() return err } // declare an exclusive, auto-deleted queue by default q, err := ch.QueueDeclare( "", // name -> let server generate false, // durable true, // autoDelete true, // exclusive false, // noWait nil, // args ) if err != nil { _ = ch.Close() _ = conn.Close() return err } if err := ch.QueueBind(q.Name, "mutation", "cart", false, nil); err != nil { _ = ch.Close() _ = conn.Close() return err } msgs, err := ch.Consume(q.Name, "backoffice", true, true, false, false, nil) if err != nil { _ = ch.Close() _ = conn.Close() return err } go func() { defer ch.Close() defer conn.Close() for { select { case <-ctx.Done(): return case m, ok := <-msgs: if !ok { return } // Log and broadcast to all websocket clients log.Printf("mutation event: %s", string(m.Body)) if hub != nil { select { case hub.broadcast <- m.Body: default: // if hub queue is full, drop to avoid blocking } } } } }() return nil } func main() { dataDir := envOrDefault("DATA_DIR", "data") addr := envOrDefault("ADDR", ":8080") amqpURL := os.Getenv("AMQP_URL") _ = os.MkdirAll(dataDir, 0755) reg := buildRegistry() globalDisk = actor.NewDiskStorage[cart.CartGrain](dataDir, reg) fs := NewFileServer(dataDir) hub := NewHub() go hub.Run() mux := http.NewServeMux() mux.HandleFunc("GET /carts", fs.CartsHandler) mux.HandleFunc("GET /cart/{id}", fs.CartHandler) mux.HandleFunc("/ws", hub.ServeWS) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) // Global CORS middleware allowing all origins and handling preflight handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, PATCH, DELETE, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With") w.Header().Set("Access-Control-Expose-Headers", "*") if r.Method == http.MethodOptions { w.WriteHeader(http.StatusNoContent) return } mux.ServeHTTP(w, r) }) srv := &http.Server{ Addr: addr, Handler: handler, ReadTimeout: 10 * time.Second, WriteTimeout: 30 * time.Second, IdleTimeout: 60 * time.Second, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() if amqpURL != "" { if err := startMutationConsumer(ctx, amqpURL, hub); err != nil { log.Printf("AMQP listener disabled: %v", err) } else { log.Printf("AMQP listener connected") } } log.Printf("backoffice HTTP listening on %s (dataDir=%s)", addr, dataDir) if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { log.Fatalf("http server error: %v", err) } // server stopped }