package main import ( "context" "encoding/json" "fmt" "log" "net/http" "net/http/pprof" "os" "os/signal" "strings" "syscall" "time" "git.tornberg.me/go-cart-actor/pkg/actor" "git.tornberg.me/go-cart-actor/pkg/cart" messages "git.tornberg.me/go-cart-actor/pkg/messages" "git.tornberg.me/go-cart-actor/pkg/promotions" "git.tornberg.me/go-cart-actor/pkg/proxy" "git.tornberg.me/go-cart-actor/pkg/voucher" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" ) var ( grainSpawns = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_spawned_total", Help: "The total number of spawned grains", }) ) func init() { os.Mkdir("data", 0755) } type App struct { pool *actor.SimpleGrainPool[cart.CartGrain] server *PoolServer klarnaClient *KlarnaClient } var podIp = os.Getenv("POD_IP") var name = os.Getenv("POD_NAME") var amqpUrl = os.Getenv("AMQP_URL") var tpl = ` s10r testing - checkout %s ` func getCountryFromHost(host string) string { if strings.Contains(strings.ToLower(host), "-no") { return "no" } if strings.Contains(strings.ToLower(host), "-se") { return "se" } return "" } type MutationContext struct { VoucherService voucher.Service } type CartChangeEvent struct { CartId cart.CartId `json:"cartId"` Mutations []actor.ApplyResult `json:"mutations"` } func main() { controlPlaneConfig := actor.DefaultServerConfig() promotionData, err := promotions.LoadStateFile("data/promotions.json") if err != nil { log.Printf("Error loading promotions: %v\n", err) } log.Printf("loaded %d promotions", len(promotionData.State.Promotions)) promotionService := promotions.NewPromotionService(nil) reg := cart.NewCartMultationRegistry() reg.RegisterProcessor( actor.NewMutationProcessor(func(g *cart.CartGrain) error { g.UpdateTotals() ctx := promotions.NewContextFromCart(g, promotions.WithNow(time.Now()), promotions.WithCustomerSegment("vip")) _, actions := promotionService.EvaluateAll(promotionData.State.Promotions, ctx) for _, action := range actions { log.Printf("apply: %+v", action) g.UpdateTotals() } return nil }), ) diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg) poolConfig := actor.GrainPoolConfig[cart.CartGrain]{ MutationRegistry: reg, Storage: diskStorage, Spawn: func(id uint64) (actor.Grain[cart.CartGrain], error) { grainSpawns.Inc() ret := cart.NewCartGrain(id, time.Now()) // Set baseline lastChange at spawn; replay may update it to last event timestamp. err := diskStorage.LoadEvents(id, ret) return ret, err }, SpawnHost: func(host string) (actor.Host, error) { return proxy.NewRemoteHost(host) }, TTL: 15 * time.Minute, PoolSize: 2 * 65535, Hostname: podIp, } pool, err := actor.NewSimpleGrainPool(poolConfig) if err != nil { log.Fatalf("Error creating cart pool: %v\n", err) } klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient) app := &App{ pool: pool, server: syncedServer, klarnaClient: klarnaClient, } mux := http.NewServeMux() debugMux := http.NewServeMux() if amqpUrl == "" { log.Printf("no connection to amqp defined") } else { app.HandleCheckoutRequests(amqpUrl, mux) } grpcSrv, err := actor.NewControlServer[*cart.CartGrain](controlPlaneConfig, pool) if err != nil { log.Fatalf("Error starting control plane gRPC server: %v\n", err) } defer grpcSrv.GracefulStop() // go diskStorage.SaveLoop(10 * time.Second) UseDiscovery(pool) sigs := make(chan os.Signal, 1) done := make(chan bool, 1) signal.Notify(sigs, syscall.SIGTERM) otelShutdown, err := setupOTelSDK(context.Background()) if err != nil { log.Fatalf("Unable to start otel %v", err) } mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve())) // only for local mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) { pool.AddRemote(r.PathValue("host")) }) debugMux.HandleFunc("/debug/pprof/", pprof.Index) debugMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) debugMux.HandleFunc("/debug/pprof/profile", pprof.Profile) debugMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) debugMux.HandleFunc("/debug/pprof/trace", pprof.Trace) debugMux.Handle("/metrics", promhttp.Handler()) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { // Grain pool health: simple capacity check (mirrors previous GrainHandler.IsHealthy) grainCount, capacity := app.pool.LocalUsage() if grainCount >= capacity { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("grain pool at capacity")) return } if !pool.IsHealthy() { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("control plane not healthy")) return } w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("1.0.0")) }) mux.HandleFunc("/openapi.json", ServeEmbeddedOpenAPI) go func() { sig := <-sigs fmt.Println("Shutting down due to signal:", sig) otelShutdown(context.Background()) diskStorage.Close() pool.Close() done <- true }() log.Print("Server started at port 8080") go http.ListenAndServe(":8080", mux) go http.ListenAndServe(":8081", debugMux) <-done } func triggerOrderCompleted(syncedServer *PoolServer, order *CheckoutOrder) error { mutation := &messages.OrderCreated{ OrderId: order.ID, Status: order.Status, } cid, ok := cart.ParseCartId(order.MerchantReference1) if !ok { return fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1) } _, applyErr := syncedServer.Apply(uint64(cid), mutation) return applyErr } func confirmOrder(order *CheckoutOrder, orderHandler *AmqpOrderHandler) error { orderToSend, err := json.Marshal(order) if err != nil { return err } err = orderHandler.OrderCompleted(orderToSend) if err != nil { return err } return nil }