package main import ( "context" "encoding/json" "errors" "log" "net/http" "os" "time" actor "git.k6n.net/go-cart-actor/pkg/actor" "git.k6n.net/go-cart-actor/pkg/cart" "git.k6n.net/go-cart-actor/pkg/checkout" "github.com/matst80/go-redis-inventory/pkg/inventory" "github.com/matst80/slask-finder/pkg/messaging" amqp "github.com/rabbitmq/amqp091-go" "github.com/redis/go-redis/v9" ) type CartFileInfo struct { ID string `json:"id"` CartId cart.CartId `json:"cartId"` Size int64 `json:"size"` Modified time.Time `json:"modified"` } type CheckoutFileInfo struct { ID string `json:"id"` CheckoutId checkout.CheckoutId `json:"checkoutId"` Size int64 `json:"size"` Modified time.Time `json:"modified"` } func envOrDefault(key, def string) string { if v := os.Getenv(key); v != "" { return v } return def } func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) error { ch, err := conn.Channel() if err != nil { _ = conn.Close() return err } msgs, err := messaging.DeclareBindAndConsume(ch, "cart", "mutation") if err != nil { _ = ch.Close() return err } go func() { defer ch.Close() for { select { case <-ctx.Done(): return case m, ok := <-msgs: if !ok { log.Fatalf("connection closed") continue } // Log and broadcast to all websocket clients log.Printf("mutation event: %s", string(m.Body)) if hub != nil { select { case hub.broadcast <- m.Body: default: // if hub queue is full, drop to avoid blocking } } if err := m.Ack(false); err != nil { log.Printf("error acknowledging message: %v", err) } } } }() return nil } var redisAddress = os.Getenv("REDIS_ADDRESS") var redisPassword = os.Getenv("REDIS_PASSWORD") func main() { dataDir := envOrDefault("DATA_DIR", "data") addr := envOrDefault("ADDR", ":8080") amqpURL := os.Getenv("AMQP_URL") rdb := redis.NewClient(&redis.Options{ Addr: redisAddress, Password: redisPassword, DB: 0, }) inventoryService, err := inventory.NewRedisInventoryService(rdb) if err != nil { log.Fatalf("Error creating inventory service: %v\n", err) } _ = os.MkdirAll(dataDir, 0755) reg := cart.NewCartMultationRegistry(cart.NewCartMutationContext(nil)) diskStorage := actor.NewDiskStorage[cart.CartGrain](dataDir, reg) checkoutDataDir := envOrDefault("CHECKOUT_DATA_DIR", "checkout-data") _ = os.MkdirAll(checkoutDataDir, 0755) regCheckout := checkout.NewCheckoutMutationRegistry(checkout.NewCheckoutMutationContext()) diskStorageCheckout := actor.NewDiskStorage[checkout.CheckoutGrain](checkoutDataDir, regCheckout) fs := NewFileServer(dataDir, checkoutDataDir, diskStorage, diskStorageCheckout) hub := NewHub() go hub.Run() mux := http.NewServeMux() mux.HandleFunc("GET /carts", fs.CartsHandler) mux.HandleFunc("GET /cart/{id}", fs.CartHandler) mux.HandleFunc("GET /checkouts", fs.CheckoutsHandler) mux.HandleFunc("GET /checkout/{id}", fs.CheckoutHandler) mux.HandleFunc("PUT /inventory/{locationId}/{sku}", func(w http.ResponseWriter, r *http.Request) { inventoryLocationId := inventory.LocationID(r.PathValue("locationId")) inventorySku := inventory.SKU(r.PathValue("sku")) pipe := rdb.Pipeline() var payload struct { Quantity int64 `json:"quantity"` } err := json.NewDecoder(r.Body).Decode(&payload) if err != nil { http.Error(w, "invalid payload", http.StatusBadRequest) return } inventoryService.UpdateInventory(r.Context(), pipe, inventorySku, inventoryLocationId, payload.Quantity) _, err = pipe.Exec(r.Context()) if err != nil { http.Error(w, "failed to update inventory", http.StatusInternalServerError) return } err = inventoryService.SendInventoryChanged(r.Context(), inventorySku, inventoryLocationId) if err != nil { w.WriteHeader(http.StatusBadRequest) return } w.WriteHeader(http.StatusOK) }) mux.HandleFunc("/promotions", fs.PromotionsHandler) mux.HandleFunc("/vouchers", fs.VoucherHandler) mux.HandleFunc("/promotion/{id}", fs.PromotionPartHandler) mux.HandleFunc("/ws", hub.ServeWS) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) // Global CORS middleware allowing all origins and handling preflight handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, PATCH, DELETE, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With") w.Header().Set("Access-Control-Expose-Headers", "*") if r.Method == http.MethodOptions { w.WriteHeader(http.StatusNoContent) return } mux.ServeHTTP(w, r) }) srv := &http.Server{ Addr: addr, Handler: handler, ReadTimeout: 15 * time.Second, WriteTimeout: 30 * time.Second, IdleTimeout: 60 * time.Second, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() if amqpURL != "" { conn, err := amqp.Dial(amqpURL) if err != nil { log.Fatalf("failed to connect to RabbitMQ: %v", err) } if err := startMutationConsumer(ctx, conn, hub); err != nil { log.Printf("AMQP listener disabled: %v", err) } else { log.Printf("AMQP listener connected") } } log.Printf("backoffice HTTP listening on %s (dataDir=%s)", addr, dataDir) if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { log.Fatalf("http server error: %v", err) } // server stopped }