195 lines
5.1 KiB
Go
195 lines
5.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
actor "git.k6n.net/go-cart-actor/pkg/actor"
|
|
"git.k6n.net/go-cart-actor/pkg/cart"
|
|
"github.com/matst80/go-redis-inventory/pkg/inventory"
|
|
"github.com/matst80/slask-finder/pkg/messaging"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
type CartFileInfo struct {
|
|
ID string `json:"id"`
|
|
CartId cart.CartId `json:"cartId"`
|
|
Size int64 `json:"size"`
|
|
Modified time.Time `json:"modified"`
|
|
}
|
|
|
|
func envOrDefault(key, def string) string {
|
|
if v := os.Getenv(key); v != "" {
|
|
return v
|
|
}
|
|
return def
|
|
}
|
|
|
|
func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) error {
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
_ = conn.Close()
|
|
return err
|
|
}
|
|
msgs, err := messaging.DeclareBindAndConsume(ch, "cart", "mutation")
|
|
if err != nil {
|
|
_ = ch.Close()
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
defer ch.Close()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case m, ok := <-msgs:
|
|
if !ok {
|
|
log.Fatalf("connection closed")
|
|
continue
|
|
}
|
|
// Log and broadcast to all websocket clients
|
|
log.Printf("mutation event: %s", string(m.Body))
|
|
|
|
if hub != nil {
|
|
select {
|
|
case hub.broadcast <- m.Body:
|
|
default:
|
|
// if hub queue is full, drop to avoid blocking
|
|
}
|
|
}
|
|
if err := m.Ack(false); err != nil {
|
|
log.Printf("error acknowledging message: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
var redisAddress = os.Getenv("REDIS_ADDRESS")
|
|
var redisPassword = os.Getenv("REDIS_PASSWORD")
|
|
|
|
func main() {
|
|
dataDir := envOrDefault("DATA_DIR", "data")
|
|
addr := envOrDefault("ADDR", ":8080")
|
|
amqpURL := os.Getenv("AMQP_URL")
|
|
|
|
rdb := redis.NewClient(&redis.Options{
|
|
Addr: redisAddress,
|
|
Password: redisPassword,
|
|
DB: 0,
|
|
})
|
|
inventoryService, err := inventory.NewRedisInventoryService(rdb)
|
|
if err != nil {
|
|
log.Fatalf("Error creating inventory service: %v\n", err)
|
|
}
|
|
|
|
_ = os.MkdirAll(dataDir, 0755)
|
|
|
|
reg := cart.NewCartMultationRegistry()
|
|
diskStorage := actor.NewDiskStorage[cart.CartGrain](dataDir, reg)
|
|
|
|
fs := NewFileServer(dataDir, diskStorage)
|
|
|
|
hub := NewHub()
|
|
go hub.Run()
|
|
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("GET /carts", fs.CartsHandler)
|
|
mux.HandleFunc("GET /cart/{id}", fs.CartHandler)
|
|
mux.HandleFunc("PUT /inventory/{locationId}/{sku}", func(w http.ResponseWriter, r *http.Request) {
|
|
inventoryLocationId := inventory.LocationID(r.PathValue("locationId"))
|
|
inventorySku := inventory.SKU(r.PathValue("sku"))
|
|
pipe := rdb.Pipeline()
|
|
var payload struct {
|
|
Quantity int64 `json:"quantity"`
|
|
}
|
|
err := json.NewDecoder(r.Body).Decode(&payload)
|
|
if err != nil {
|
|
http.Error(w, "invalid payload", http.StatusBadRequest)
|
|
return
|
|
}
|
|
inventoryService.UpdateInventory(r.Context(), pipe, inventorySku, inventoryLocationId, payload.Quantity)
|
|
|
|
_, err = pipe.Exec(r.Context())
|
|
if err != nil {
|
|
http.Error(w, "failed to update inventory", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
err := inventoryService.SendInventoryChanged(r.Context(), inventorySku, inventoryLocationId)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
})
|
|
mux.HandleFunc("/promotions", fs.PromotionsHandler)
|
|
mux.HandleFunc("/vouchers", fs.VoucherHandler)
|
|
mux.HandleFunc("/promotion/{id}", fs.PromotionPartHandler)
|
|
|
|
mux.HandleFunc("/ws", hub.ServeWS)
|
|
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("ok"))
|
|
})
|
|
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("ok"))
|
|
})
|
|
mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("ok"))
|
|
})
|
|
|
|
// Global CORS middleware allowing all origins and handling preflight
|
|
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, PATCH, DELETE, OPTIONS")
|
|
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With")
|
|
w.Header().Set("Access-Control-Expose-Headers", "*")
|
|
if r.Method == http.MethodOptions {
|
|
w.WriteHeader(http.StatusNoContent)
|
|
return
|
|
}
|
|
mux.ServeHTTP(w, r)
|
|
})
|
|
|
|
srv := &http.Server{
|
|
Addr: addr,
|
|
Handler: handler,
|
|
ReadTimeout: 15 * time.Second,
|
|
WriteTimeout: 30 * time.Second,
|
|
IdleTimeout: 60 * time.Second,
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
if amqpURL != "" {
|
|
conn, err := amqp.Dial(amqpURL)
|
|
if err != nil {
|
|
log.Fatalf("failed to connect to RabbitMQ: %v", err)
|
|
}
|
|
if err := startMutationConsumer(ctx, conn, hub); err != nil {
|
|
log.Printf("AMQP listener disabled: %v", err)
|
|
} else {
|
|
log.Printf("AMQP listener connected")
|
|
}
|
|
}
|
|
|
|
log.Printf("backoffice HTTP listening on %s (dataDir=%s)", addr, dataDir)
|
|
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
log.Fatalf("http server error: %v", err)
|
|
}
|
|
|
|
// server stopped
|
|
}
|