package main import ( "context" "fmt" "log" "net" "net/http" "os" "os/signal" "time" "git.k6n.net/go-cart-actor/pkg/actor" "git.k6n.net/go-cart-actor/pkg/checkout" "git.k6n.net/go-cart-actor/pkg/proxy" "github.com/adyen/adyen-go-api-library/v21/src/adyen" "github.com/adyen/adyen-go-api-library/v21/src/common" "github.com/matst80/go-redis-inventory/pkg/inventory" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/redis/go-redis/v9" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) var ( grainSpawns = promauto.NewCounter(prometheus.CounterOpts{ Name: "checkout_grain_spawned_total", Help: "The total number of spawned checkout grains", }) ) func init() { os.Mkdir("data", 0755) } type App struct { pool *actor.SimpleGrainPool[checkout.CheckoutGrain] server *CheckoutPoolServer klarnaClient *KlarnaClient cartClient *CartClient // For internal communication to cart } var podIp = os.Getenv("POD_IP") var name = os.Getenv("POD_NAME") var amqpUrl = os.Getenv("AMQP_URL") var redisAddress = os.Getenv("REDIS_ADDRESS") var redisPassword = os.Getenv("REDIS_PASSWORD") var cartInternalUrl = os.Getenv("CART_INTERNAL_URL") // e.g., http://cart-service:8081 func main() { controlPlaneConfig := actor.DefaultServerConfig() reg := checkout.NewCheckoutMutationRegistry(checkout.NewCheckoutMutationContext()) reg.RegisterProcessor( actor.NewMutationProcessor(func(ctx context.Context, g *checkout.CheckoutGrain) error { g.Version++ return nil }), ) rdb := redis.NewClient(&redis.Options{ Addr: redisAddress, Password: redisPassword, DB: 0, }) inventoryService, err := inventory.NewRedisInventoryService(rdb) if err != nil { log.Fatalf("Error creating inventory service: %v\n", err) } diskStorage := actor.NewDiskStorage[checkout.CheckoutGrain]("data", reg) poolConfig := actor.GrainPoolConfig[checkout.CheckoutGrain]{ MutationRegistry: reg, Storage: diskStorage, Spawn: func(ctx context.Context, id uint64) (actor.Grain[checkout.CheckoutGrain], error) { _, span := tracer.Start(ctx, fmt.Sprintf("Spawn checkout id %d", id)) defer span.End() grainSpawns.Inc() ret := checkout.NewCheckoutGrain(id, 0, 0, time.Now(), nil) // version to be set later // Load persisted events/state for this checkout if present if err := diskStorage.LoadEvents(ctx, id, ret); err != nil { // Return the grain along with error (e.g., not found) so callers can decide return ret, err } return ret, nil }, Destroy: func(grain actor.Grain[checkout.CheckoutGrain]) error { return nil }, SpawnHost: func(host string) (actor.Host[checkout.CheckoutGrain], error) { return proxy.NewRemoteHost[checkout.CheckoutGrain](host) }, TTL: 1 * time.Hour, // Longer TTL for checkout PoolSize: 65535, Hostname: podIp, } pool, err := actor.NewSimpleGrainPool(poolConfig) if err != nil { log.Fatalf("Error creating checkout pool: %v\n", err) } adyenClient := adyen.NewClient(&common.Config{ ApiKey: os.Getenv("ADYEN_API_KEY"), Environment: common.TestEnv, }) klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) cartClient := NewCartClient(cartInternalUrl) syncedServer := NewCheckoutPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient, cartClient, adyenClient) syncedServer.inventoryService = inventoryService mux := http.NewServeMux() debugMux := http.NewServeMux() if amqpUrl == "" { log.Fatalf("no connection to amqp defined") } grpcSrv, err := actor.NewControlServer[checkout.CheckoutGrain](controlPlaneConfig, pool) if err != nil { log.Fatalf("Error starting control plane gRPC server: %v\n", err) } defer grpcSrv.GracefulStop() UseDiscovery(pool) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() otelShutdown, err := setupOTelSDK(ctx) if err != nil { log.Fatalf("Unable to start otel %v", err) } syncedServer.Serve(mux) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { grainCount, capacity := pool.LocalUsage() if grainCount >= capacity { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("grain pool at capacity")) return } if !pool.IsHealthy() { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("control plane not healthy")) return } w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("1.0.0")) }) srv := &http.Server{ Addr: ":8080", BaseContext: func(net.Listener) context.Context { return ctx }, ReadTimeout: 10 * time.Second, WriteTimeout: 20 * time.Second, Handler: otelhttp.NewHandler(mux, "/"), } defer func() { fmt.Println("Shutting down due to signal") otelShutdown(context.Background()) diskStorage.Close() pool.Close() }() srvErr := make(chan error, 1) go func() { srvErr <- srv.ListenAndServe() }() log.Print("Checkout server started at port 8080") go http.ListenAndServe(":8081", debugMux) select { case err = <-srvErr: log.Fatalf("Unable to start server: %v", err) case <-ctx.Done(): stop() } }