package main import ( "encoding/json" "fmt" "log" "net/http" "net/http/pprof" "os" "os/signal" "strings" "syscall" "time" "git.tornberg.me/go-cart-actor/pkg/actor" "git.tornberg.me/go-cart-actor/pkg/discovery" messages "git.tornberg.me/go-cart-actor/pkg/messages" "git.tornberg.me/go-cart-actor/pkg/proxy" "git.tornberg.me/go-cart-actor/pkg/voucher" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) var ( grainSpawns = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_spawned_total", Help: "The total number of spawned grains", }) grainMutations = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_mutations_total", Help: "The total number of mutations", }) grainLookups = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_lookups_total", Help: "The total number of lookups", }) ) func init() { os.Mkdir("data", 0755) } type App struct { pool *actor.SimpleGrainPool[CartGrain] } var podIp = os.Getenv("POD_IP") var name = os.Getenv("POD_NAME") var amqpUrl = os.Getenv("AMQP_URL") var tpl = ` s10r testing - checkout %s ` func getCountryFromHost(host string) string { if strings.Contains(strings.ToLower(host), "-no") { return "no" } if strings.Contains(strings.ToLower(host), "-se") { return "se" } return "" } func GetDiscovery() discovery.Discovery { if podIp == "" { return nil } config, kerr := rest.InClusterConfig() if kerr != nil { log.Fatalf("Error creating kubernetes client: %v\n", kerr) } client, err := kubernetes.NewForConfig(config) if err != nil { log.Fatalf("Error creating client: %v\n", err) } return discovery.NewK8sDiscovery(client) } type MutationContext struct { VoucherService voucher.Service } func main() { controlPlaneConfig := actor.DefaultServerConfig() ctx := MutationContext{} reg := actor.NewMutationRegistry() reg.RegisterMutations( actor.NewMutation(AddItem, func() *messages.AddItem { return &messages.AddItem{} }), actor.NewMutation(ChangeQuantity, func() *messages.ChangeQuantity { return &messages.ChangeQuantity{} }), actor.NewMutation(RemoveItem, func() *messages.RemoveItem { return &messages.RemoveItem{} }), actor.NewMutation(InitializeCheckout, func() *messages.InitializeCheckout { return &messages.InitializeCheckout{} }), actor.NewMutation(OrderCreated, func() *messages.OrderCreated { return &messages.OrderCreated{} }), actor.NewMutation(RemoveDelivery, func() *messages.RemoveDelivery { return &messages.RemoveDelivery{} }), actor.NewMutation(SetDelivery, func() *messages.SetDelivery { return &messages.SetDelivery{} }), actor.NewMutation(SetPickupPoint, func() *messages.SetPickupPoint { return &messages.SetPickupPoint{} }), actor.NewMutation(ClearCart, func() *messages.ClearCartRequest { return &messages.ClearCartRequest{} }), actor.NewMutation(ctx.AddVoucher, func() *messages.AddVoucher { return &messages.AddVoucher{} }), ) diskStorage := actor.NewDiskStorage[CartGrain]("data", reg) poolConfig := actor.GrainPoolConfig[CartGrain]{ MutationRegistry: reg, Storage: diskStorage, Spawn: func(id uint64) (actor.Grain[CartGrain], error) { grainSpawns.Inc() ret := &CartGrain{ lastItemId: 0, lastDeliveryId: 0, Deliveries: []*CartDelivery{}, Id: CartId(id), Items: []*CartItem{}, TotalPrice: 0, } // Set baseline lastChange at spawn; replay may update it to last event timestamp. ret.lastChange = time.Now() ret.lastAccess = time.Now() err := diskStorage.LoadEvents(id, ret) return ret, err }, SpawnHost: func(host string) (actor.Host, error) { return proxy.NewRemoteHost(host) }, TTL: 15 * time.Minute, PoolSize: 2 * 65535, Hostname: podIp, } pool, err := actor.NewSimpleGrainPool(poolConfig) if err != nil { log.Fatalf("Error creating cart pool: %v\n", err) } app := &App{ pool: pool, } grpcSrv, err := actor.NewControlServer[*CartGrain](controlPlaneConfig, pool) if err != nil { log.Fatalf("Error starting control plane gRPC server: %v\n", err) } defer grpcSrv.GracefulStop() go diskStorage.SaveLoop(10 * time.Second) go func(hw discovery.Discovery) { if hw == nil { log.Print("No discovery service available") return } ch, err := hw.Watch() if err != nil { log.Printf("Discovery error: %v", err) return } for evt := range ch { if evt.Host == "" { continue } switch evt.Type { case watch.Deleted: if pool.IsKnown(evt.Host) { pool.RemoveHost(evt.Host) } default: if !pool.IsKnown(evt.Host) { log.Printf("Discovered host %s", evt.Host) pool.AddRemote(evt.Host) } } } }(GetDiscovery()) orderHandler := &AmqpOrderHandler{ Url: amqpUrl, } klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient) mux := http.NewServeMux() mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve())) // only for local mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) { pool.AddRemote(r.PathValue("host")) }) // mux.HandleFunc("GET /save", app.HandleSave) //mux.HandleFunc("/", app.RewritePath) mux.HandleFunc("/debug/pprof/", pprof.Index) mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) mux.HandleFunc("/debug/pprof/profile", pprof.Profile) mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) mux.Handle("/metrics", promhttp.Handler()) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { // Grain pool health: simple capacity check (mirrors previous GrainHandler.IsHealthy) grainCount, capacity := app.pool.LocalUsage() if grainCount >= capacity { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("grain pool at capacity")) return } if !pool.IsHealthy() { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("control plane not healthy")) return } w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/checkout", func(w http.ResponseWriter, r *http.Request) { orderId := r.URL.Query().Get("order_id") order := &CheckoutOrder{} if orderId == "" { cookie, err := r.Cookie("cartid") if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(err.Error())) return } if cookie.Value == "" { w.WriteHeader(http.StatusBadRequest) w.Write([]byte("no cart id to checkout is empty")) return } parsed, ok := ParseCartId(cookie.Value) if !ok { w.WriteHeader(http.StatusBadRequest) w.Write([]byte("invalid cart id format")) return } cartId := parsed syncedServer.ProxyHandler(func(w http.ResponseWriter, r *http.Request, cartId CartId) error { order, err = syncedServer.CreateOrUpdateCheckout(r.Host, cartId) if err != nil { return err } w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")") w.WriteHeader(http.StatusOK) fmt.Fprintf(w, tpl, order.HTMLSnippet) return nil })(cartId, w, r) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) } // v2: Apply now returns *CartGrain; order creation handled inside grain (no payload to unmarshal) } else { order, err = klarnaClient.GetOrder(orderId) if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")") w.WriteHeader(http.StatusOK) fmt.Fprintf(w, tpl, order.HTMLSnippet) } }) mux.HandleFunc("/confirmation/{order_id}", func(w http.ResponseWriter, r *http.Request) { orderId := r.PathValue("order_id") order, err := klarnaClient.GetOrder(orderId) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "text/html; charset=utf-8") if order.Status == "checkout_complete" { http.SetCookie(w, &http.Cookie{ Name: "cartid", Value: "", Path: "/", Secure: true, HttpOnly: true, Expires: time.Unix(0, 0), SameSite: http.SameSiteLaxMode, }) } w.WriteHeader(http.StatusOK) fmt.Fprintf(w, tpl, order.HTMLSnippet) }) mux.HandleFunc("/validate", func(w http.ResponseWriter, r *http.Request) { log.Printf("Klarna order validation, method: %s", r.Method) if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } order := &CheckoutOrder{} err := json.NewDecoder(r.Body).Decode(order) if err != nil { w.WriteHeader(http.StatusBadRequest) } log.Printf("Klarna order validation: %s", order.ID) //err = confirmOrder(order, orderHandler) //if err != nil { // log.Printf("Error validating order: %v\n", err) // w.WriteHeader(http.StatusInternalServerError) // return //} // //err = triggerOrderCompleted(err, syncedServer, order) //if err != nil { // log.Printf("Error processing cart message: %v\n", err) // w.WriteHeader(http.StatusInternalServerError) // return //} w.WriteHeader(http.StatusOK) }) mux.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } orderId := r.URL.Query().Get("order_id") log.Printf("Order confirmation push: %s", orderId) order, err := klarnaClient.GetOrder(orderId) if err != nil { log.Printf("Error creating request: %v\n", err) w.WriteHeader(http.StatusInternalServerError) return } err = confirmOrder(order, orderHandler) if err != nil { log.Printf("Error confirming order: %v\n", err) w.WriteHeader(http.StatusInternalServerError) return } err = triggerOrderCompleted(syncedServer, order) if err != nil { log.Printf("Error processing cart message: %v\n", err) w.WriteHeader(http.StatusInternalServerError) return } err = klarnaClient.AcknowledgeOrder(orderId) if err != nil { log.Printf("Error acknowledging order: %v\n", err) } w.WriteHeader(http.StatusOK) }) mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("1.0.0")) }) mux.HandleFunc("/openapi.json", ServeEmbeddedOpenAPI) sigs := make(chan os.Signal, 1) done := make(chan bool, 1) signal.Notify(sigs, syscall.SIGTERM) go func() { sig := <-sigs fmt.Println("Shutting down due to signal:", sig) diskStorage.Close() pool.Close() done <- true }() log.Print("Server started at port 8080") go http.ListenAndServe(":8080", mux) <-done } func triggerOrderCompleted(syncedServer *PoolServer, order *CheckoutOrder) error { mutation := &messages.OrderCreated{ OrderId: order.ID, Status: order.Status, } cid, ok := ParseCartId(order.MerchantReference1) if !ok { return fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1) } _, applyErr := syncedServer.Apply(uint64(cid), mutation) return applyErr } func confirmOrder(order *CheckoutOrder, orderHandler *AmqpOrderHandler) error { orderToSend, err := json.Marshal(order) if err != nil { return err } err = orderHandler.Connect() if err != nil { return err } defer orderHandler.Close() err = orderHandler.OrderCompleted(orderToSend) if err != nil { return err } return nil }