package main import ( "context" "encoding/json" "fmt" "log" "net" "net/http" "net/http/pprof" "os" "os/signal" "strings" "time" "git.k6n.net/go-cart-actor/pkg/actor" "git.k6n.net/go-cart-actor/pkg/cart" messages "git.k6n.net/go-cart-actor/pkg/messages" "git.k6n.net/go-cart-actor/pkg/promotions" "git.k6n.net/go-cart-actor/pkg/proxy" "git.k6n.net/go-cart-actor/pkg/voucher" "github.com/adyen/adyen-go-api-library/v21/src/adyen" "github.com/adyen/adyen-go-api-library/v21/src/common" "github.com/matst80/go-redis-inventory/pkg/inventory" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/redis/go-redis/v9" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) var ( grainSpawns = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_spawned_total", Help: "The total number of spawned grains", }) ) func init() { os.Mkdir("data", 0755) } type App struct { pool *actor.SimpleGrainPool[cart.CartGrain] server *PoolServer klarnaClient *KlarnaClient } var podIp = os.Getenv("POD_IP") var name = os.Getenv("POD_NAME") var amqpUrl = os.Getenv("AMQP_URL") var redisAddress = os.Getenv("REDIS_ADDRESS") var redisPassword = os.Getenv("REDIS_PASSWORD") func getCountryFromHost(host string) string { if strings.Contains(strings.ToLower(host), "-no") { return "no" } if strings.Contains(strings.ToLower(host), "-se") { return "se" } return "" } type MutationContext struct { VoucherService voucher.Service } type CartChangeEvent struct { CartId cart.CartId `json:"cartId"` Mutations []actor.ApplyResult `json:"mutations"` } func main() { controlPlaneConfig := actor.DefaultServerConfig() promotionData, err := promotions.LoadStateFile("data/promotions.json") if err != nil { log.Printf("Error loading promotions: %v\n", err) } log.Printf("loaded %d promotions", len(promotionData.State.Promotions)) inventoryPubSub := actor.NewPubSub[inventory.InventoryChange]() // promotionService := promotions.NewPromotionService(nil) rdb := redis.NewClient(&redis.Options{ Addr: redisAddress, Password: redisPassword, DB: 0, }) inventoryService, err := inventory.NewRedisInventoryService(rdb) if err != nil { log.Fatalf("Error creating inventory service: %v\n", err) } inventoryReservationService, err := inventory.NewRedisCartReservationService(rdb) if err != nil { log.Fatalf("Error creating inventory reservation service: %v\n", err) } reg := cart.NewCartMultationRegistry(cart.NewCartMutationContext(inventoryReservationService)) reg.RegisterProcessor( actor.NewMutationProcessor(func(ctx context.Context, g *cart.CartGrain) error { _, span := tracer.Start(ctx, "Totals and promotions") defer span.End() g.UpdateTotals() g.Version++ // promotionCtx := promotions.NewContextFromCart(g, promotions.WithNow(time.Now()), promotions.WithCustomerSegment("vip")) // _, actions := promotionService.EvaluateAll(promotionData.State.Promotions, promotionCtx) // for _, action := range actions { // log.Printf("apply: %+v", action) // g.UpdateTotals() // } return nil }), ) diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg) poolConfig := actor.GrainPoolConfig[cart.CartGrain]{ MutationRegistry: reg, Storage: diskStorage, Spawn: func(ctx context.Context, id uint64) (actor.Grain[cart.CartGrain], error) { _, span := tracer.Start(ctx, fmt.Sprintf("Spawn cart id %d", id)) defer span.End() grainSpawns.Inc() ret := cart.NewCartGrain(id, time.Now()) // Set baseline lastChange at spawn; replay may update it to last event timestamp. inventoryPubSub.Subscribe(ret.HandleInventoryChange) err := diskStorage.LoadEvents(ctx, id, ret) return ret, err }, Destroy: func(grain actor.Grain[cart.CartGrain]) error { cart, err := grain.GetCurrentState() if err != nil { return err } inventoryPubSub.Unsubscribe(cart.HandleInventoryChange) return nil }, SpawnHost: func(host string) (actor.Host, error) { return proxy.NewRemoteHost(host) }, TTL: 5 * time.Minute, PoolSize: 2 * 65535, Hostname: podIp, } pool, err := actor.NewSimpleGrainPool(poolConfig) if err != nil { log.Fatalf("Error creating cart pool: %v\n", err) } adyenClient := adyen.NewClient(&common.Config{ ApiKey: os.Getenv("ADYEN_API_KEY"), Environment: common.TestEnv, }) klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient, inventoryService, inventoryReservationService, adyenClient) app := &App{ pool: pool, server: syncedServer, klarnaClient: klarnaClient, } mux := http.NewServeMux() debugMux := http.NewServeMux() if amqpUrl == "" { log.Printf("no connection to amqp defined") } else { app.HandleCheckoutRequests(amqpUrl, mux, inventoryService) } grpcSrv, err := actor.NewControlServer[*cart.CartGrain](controlPlaneConfig, pool) if err != nil { log.Fatalf("Error starting control plane gRPC server: %v\n", err) } defer grpcSrv.GracefulStop() // go diskStorage.SaveLoop(10 * time.Second) UseDiscovery(pool) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() otelShutdown, err := setupOTelSDK(ctx) if err != nil { log.Fatalf("Unable to start otel %v", err) } syncedServer.Serve(mux) // only for local mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) { pool.AddRemote(r.PathValue("host")) }) debugMux.HandleFunc("/debug/pprof/", pprof.Index) debugMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) debugMux.HandleFunc("/debug/pprof/profile", pprof.Profile) debugMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) debugMux.HandleFunc("/debug/pprof/trace", pprof.Trace) debugMux.Handle("/metrics", promhttp.Handler()) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { // Grain pool health: simple capacity check (mirrors previous GrainHandler.IsHealthy) grainCount, capacity := app.pool.LocalUsage() if grainCount >= capacity { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("grain pool at capacity")) return } if !pool.IsHealthy() { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("control plane not healthy")) return } w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("1.0.0")) }) mux.HandleFunc("/openapi.json", ServeEmbeddedOpenAPI) srv := &http.Server{ Addr: ":8080", BaseContext: func(net.Listener) context.Context { return ctx }, ReadTimeout: 10 * time.Second, WriteTimeout: 20 * time.Second, Handler: otelhttp.NewHandler(mux, "/"), } defer func() { fmt.Println("Shutting down due to signal") otelShutdown(context.Background()) diskStorage.Close() pool.Close() }() srvErr := make(chan error, 1) go func() { srvErr <- srv.ListenAndServe() }() listener := inventory.NewInventoryChangeListener(rdb, context.Background(), func(changes []inventory.InventoryChange) { for _, change := range changes { log.Printf("inventory change: %v", change) inventoryPubSub.Publish(change) } }) go func() { err := listener.Start() if err != nil { log.Fatalf("Unable to start inventory listener: %v", err) } }() log.Print("Server started at port 8080") go http.ListenAndServe(":8081", debugMux) select { case err = <-srvErr: // Error when starting HTTP server. log.Fatalf("Unable to start server: %v", err) case <-ctx.Done(): // Wait for first CTRL+C. // Stop receiving signal notifications as soon as possible. stop() } } func triggerOrderCompleted(ctx context.Context, syncedServer *PoolServer, order *CheckoutOrder) error { mutation := &messages.OrderCreated{ OrderId: order.ID, Status: order.Status, } cid, ok := cart.ParseCartId(order.MerchantReference1) if !ok { return fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1) } _, applyErr := syncedServer.Apply(ctx, uint64(cid), mutation) return applyErr } func confirmOrder(ctx context.Context, order *CheckoutOrder, orderHandler *AmqpOrderHandler) error { orderToSend, err := json.Marshal(order) if err != nil { return err } err = orderHandler.OrderCompleted(orderToSend) if err != nil { return err } return nil }