Files
go-cart-actor/cmd/backoffice/main.go
matst80 e1302a8ffa
All checks were successful
Build and Publish / Metadata (push) Successful in 10s
Build and Publish / BuildAndDeployAmd64 (push) Successful in 1m30s
Build and Publish / BuildAndDeployArm64 (push) Successful in 4m22s
update
2025-10-15 09:22:00 +02:00

202 lines
5.6 KiB
Go

package main
import (
"context"
"errors"
"log"
"net/http"
"os"
"regexp"
"time"
actor "git.tornberg.me/go-cart-actor/pkg/actor"
"git.tornberg.me/go-cart-actor/pkg/cart"
messages "git.tornberg.me/go-cart-actor/pkg/messages"
amqp "github.com/rabbitmq/amqp091-go"
)
type CartFileInfo struct {
ID uint64 `json:"id"`
CartId cart.CartId `json:"cart_id"`
Path string `json:"path"`
Size int64 `json:"size"`
Modified time.Time `json:"modified"`
}
func envOrDefault(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`)
var globalDisk *actor.DiskStorage[cart.CartGrain]
func buildRegistry() actor.MutationRegistry {
reg := actor.NewMutationRegistry()
reg.RegisterMutations(
actor.NewMutation(cart.AddItem, func() *messages.AddItem { return &messages.AddItem{} }),
actor.NewMutation(cart.ChangeQuantity, func() *messages.ChangeQuantity { return &messages.ChangeQuantity{} }),
actor.NewMutation(cart.RemoveItem, func() *messages.RemoveItem { return &messages.RemoveItem{} }),
actor.NewMutation(cart.InitializeCheckout, func() *messages.InitializeCheckout { return &messages.InitializeCheckout{} }),
actor.NewMutation(cart.OrderCreated, func() *messages.OrderCreated { return &messages.OrderCreated{} }),
actor.NewMutation(cart.RemoveDelivery, func() *messages.RemoveDelivery { return &messages.RemoveDelivery{} }),
actor.NewMutation(cart.SetDelivery, func() *messages.SetDelivery { return &messages.SetDelivery{} }),
actor.NewMutation(cart.SetPickupPoint, func() *messages.SetPickupPoint { return &messages.SetPickupPoint{} }),
actor.NewMutation(cart.ClearCart, func() *messages.ClearCartRequest { return &messages.ClearCartRequest{} }),
actor.NewMutation(cart.AddVoucher, func() *messages.AddVoucher { return &messages.AddVoucher{} }),
actor.NewMutation(cart.RemoveVoucher, func() *messages.RemoveVoucher { return &messages.RemoveVoucher{} }),
)
return reg
}
func startMutationConsumer(ctx context.Context, amqpURL string, hub *Hub) error {
conn, err := amqp.Dial(amqpURL)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
_ = conn.Close()
return err
}
// declare exchange (idempotent)
if err := ch.ExchangeDeclare(
"cart", // name
"topic", // type
true, // durable
false, // autoDelete
false, // internal
false, // noWait
nil, // args
); err != nil {
_ = ch.Close()
_ = conn.Close()
return err
}
// declare an exclusive, auto-deleted queue by default
q, err := ch.QueueDeclare(
"", // name -> let server generate
false, // durable
true, // autoDelete
true, // exclusive
false, // noWait
nil, // args
)
if err != nil {
_ = ch.Close()
_ = conn.Close()
return err
}
if err := ch.QueueBind(q.Name, "mutation", "cart", false, nil); err != nil {
_ = ch.Close()
_ = conn.Close()
return err
}
msgs, err := ch.Consume(q.Name, "backoffice", true, true, false, false, nil)
if err != nil {
_ = ch.Close()
_ = conn.Close()
return err
}
go func() {
defer ch.Close()
defer conn.Close()
for {
select {
case <-ctx.Done():
return
case m, ok := <-msgs:
if !ok {
return
}
// Log and broadcast to all websocket clients
log.Printf("mutation event: %s", string(m.Body))
if hub != nil {
select {
case hub.broadcast <- m.Body:
default:
// if hub queue is full, drop to avoid blocking
}
}
}
}
}()
return nil
}
func main() {
dataDir := envOrDefault("DATA_DIR", "data")
addr := envOrDefault("ADDR", ":8080")
amqpURL := os.Getenv("AMQP_URL")
_ = os.MkdirAll(dataDir, 0755)
reg := buildRegistry()
globalDisk = actor.NewDiskStorage[cart.CartGrain](dataDir, reg)
fs := NewFileServer(dataDir)
hub := NewHub()
go hub.Run()
mux := http.NewServeMux()
mux.HandleFunc("GET /carts", fs.CartsHandler)
mux.HandleFunc("GET /cart/{id}", fs.CartHandler)
mux.HandleFunc("/ws", hub.ServeWS)
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
// Global CORS middleware allowing all origins and handling preflight
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, PATCH, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With")
w.Header().Set("Access-Control-Expose-Headers", "*")
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
mux.ServeHTTP(w, r)
})
srv := &http.Server{
Addr: addr,
Handler: handler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 60 * time.Second,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if amqpURL != "" {
if err := startMutationConsumer(ctx, amqpURL, hub); err != nil {
log.Printf("AMQP listener disabled: %v", err)
} else {
log.Printf("AMQP listener connected")
}
}
log.Printf("backoffice HTTP listening on %s (dataDir=%s)", addr, dataDir)
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("http server error: %v", err)
}
// server stopped
}