package main import ( "context" "fmt" "log" "net" "net/http" "net/http/pprof" "os" "os/signal" "strings" "time" "git.k6n.net/go-cart-actor/pkg/actor" "git.k6n.net/go-cart-actor/pkg/cart" "git.k6n.net/go-cart-actor/pkg/promotions" "git.k6n.net/go-cart-actor/pkg/proxy" "git.k6n.net/go-cart-actor/pkg/voucher" "github.com/matst80/go-redis-inventory/pkg/inventory" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/redis/go-redis/v9" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) var ( grainSpawns = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_spawned_total", Help: "The total number of spawned grains", }) ) func init() { os.Mkdir("data", 0755) } type App struct { pool *actor.SimpleGrainPool[cart.CartGrain] server *PoolServer } var podIp = os.Getenv("POD_IP") var name = os.Getenv("POD_NAME") var amqpUrl = os.Getenv("AMQP_URL") var redisAddress = os.Getenv("REDIS_ADDRESS") var redisPassword = os.Getenv("REDIS_PASSWORD") func getCountryFromHost(host string) string { if strings.Contains(strings.ToLower(host), "-no") { return "no" } if strings.Contains(strings.ToLower(host), "-se") { return "se" } return "" } type MutationContext struct { VoucherService voucher.Service } type CartChangeEvent struct { CartId cart.CartId `json:"cartId"` Mutations []actor.ApplyResult `json:"mutations"` } func matchesSkuAndLocation(update inventory.InventoryResult, item cart.CartItem) bool { if string(update.SKU) == item.Sku { if update.LocationID == "se" && item.StoreId == nil { return true } if item.StoreId == nil { return false } if *item.StoreId == string(update.LocationID) { return true } } return false } func main() { controlPlaneConfig := actor.DefaultServerConfig() promotionData, err := promotions.LoadStateFile("data/promotions.json") if err != nil { log.Printf("Error loading promotions: %v\n", err) } log.Printf("loaded %d promotions", len(promotionData.State.Promotions)) inventoryPubSub := actor.NewPubSub[inventory.InventoryChange]() // promotionService := promotions.NewPromotionService(nil) rdb := redis.NewClient(&redis.Options{ Addr: redisAddress, Password: redisPassword, DB: 0, }) inventoryService, err := inventory.NewRedisInventoryService(rdb) if err != nil { log.Fatalf("Error creating inventory service: %v\n", err) } inventoryReservationService, err := inventory.NewRedisCartReservationService(rdb) if err != nil { log.Fatalf("Error creating inventory reservation service: %v\n", err) } reg := cart.NewCartMultationRegistry(cart.NewCartMutationContext(inventoryReservationService)) reg.RegisterProcessor( actor.NewMutationProcessor(func(ctx context.Context, g *cart.CartGrain) error { _, span := tracer.Start(ctx, "Totals and promotions") defer span.End() g.UpdateTotals() g.Version++ // promotionCtx := promotions.NewContextFromCart(g, promotions.WithNow(time.Now()), promotions.WithCustomerSegment("vip")) // _, actions := promotionService.EvaluateAll(promotionData.State.Promotions, promotionCtx) // for _, action := range actions { // log.Printf("apply: %+v", action) // g.UpdateTotals() // } return nil }), ) diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg) poolConfig := actor.GrainPoolConfig[cart.CartGrain]{ MutationRegistry: reg, Storage: diskStorage, Spawn: func(ctx context.Context, id uint64) (actor.Grain[cart.CartGrain], error) { _, span := tracer.Start(ctx, fmt.Sprintf("Spawn cart id %d", id)) defer span.End() grainSpawns.Inc() ret := cart.NewCartGrain(id, time.Now()) // Set baseline lastChange at spawn; replay may update it to last event timestamp. inventoryPubSub.Subscribe(ret.HandleInventoryChange) err := diskStorage.LoadEvents(ctx, id, ret) if err == nil && inventoryService != nil { refs := make([]*inventory.InventoryReference, 0) for _, item := range ret.Items { refs = append(refs, &inventory.InventoryReference{ SKU: inventory.SKU(item.Sku), LocationID: getLocationId(item), }) } _, span := tracer.Start(ctx, "update inventory") defer span.End() res, err := inventoryService.GetInventoryBatch(ctx, refs...) if err != nil { log.Printf("unable to update inventory %v", err) } else { for _, update := range res { for _, item := range ret.Items { if matchesSkuAndLocation(update, *item) && update.Quantity != uint32(item.Stock) { // maybe apply an update to give visibility to the cart item.Stock = uint16(update.Quantity) } } } } } return ret, err }, Destroy: func(grain actor.Grain[cart.CartGrain]) error { cart, err := grain.GetCurrentState() if err != nil { return err } inventoryPubSub.Unsubscribe(cart.HandleInventoryChange) return nil }, SpawnHost: func(host string) (actor.Host[cart.CartGrain], error) { return proxy.NewRemoteHost[cart.CartGrain](host) }, TTL: 5 * time.Minute, PoolSize: 2 * 65535, Hostname: podIp, } pool, err := actor.NewSimpleGrainPool(poolConfig) if err != nil { log.Fatalf("Error creating cart pool: %v\n", err) } syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), inventoryService, inventoryReservationService) app := &App{ pool: pool, server: syncedServer, } mux := http.NewServeMux() debugMux := http.NewServeMux() grpcSrv, err := actor.NewControlServer[cart.CartGrain](controlPlaneConfig, pool) if err != nil { log.Fatalf("Error starting control plane gRPC server: %v\n", err) } defer grpcSrv.GracefulStop() // go diskStorage.SaveLoop(10 * time.Second) UseDiscovery(pool) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() otelShutdown, err := setupOTelSDK(ctx) if err != nil { log.Fatalf("Unable to start otel %v", err) } syncedServer.Serve(mux) // only for local mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) { pool.AddRemote(r.PathValue("host")) }) debugMux.HandleFunc("/debug/pprof/", pprof.Index) debugMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) debugMux.HandleFunc("/debug/pprof/profile", pprof.Profile) debugMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) debugMux.HandleFunc("/debug/pprof/trace", pprof.Trace) debugMux.Handle("/metrics", promhttp.Handler()) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { // Grain pool health: simple capacity check (mirrors previous GrainHandler.IsHealthy) grainCount, capacity := app.pool.LocalUsage() if grainCount >= capacity { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("grain pool at capacity")) return } if !pool.IsHealthy() { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("control plane not healthy")) return } w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("1.0.0")) }) mux.HandleFunc("/openapi.json", ServeEmbeddedOpenAPI) srv := &http.Server{ Addr: ":8080", BaseContext: func(net.Listener) context.Context { return ctx }, ReadTimeout: 10 * time.Second, WriteTimeout: 20 * time.Second, Handler: otelhttp.NewHandler(mux, "/"), } defer func() { fmt.Println("Shutting down due to signal") otelShutdown(context.Background()) diskStorage.Close() pool.Close() }() srvErr := make(chan error, 1) go func() { srvErr <- srv.ListenAndServe() }() listener := inventory.NewInventoryChangeListener(rdb, context.Background(), func(changes []inventory.InventoryChange) { for _, change := range changes { log.Printf("inventory change: %v", change) inventoryPubSub.Publish(change) } }) go func() { err := listener.Start() if err != nil { log.Fatalf("Unable to start inventory listener: %v", err) } }() log.Print("Server started at port 8080") go http.ListenAndServe(":8081", debugMux) select { case err = <-srvErr: // Error when starting HTTP server. log.Fatalf("Unable to start server: %v", err) case <-ctx.Done(): // Wait for first CTRL+C. // Stop receiving signal notifications as soon as possible. stop() } }