From 2202c149b88433b4a2f3d114dd6a9af34be3726d Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 22 Oct 2025 12:39:48 +0200 Subject: [PATCH] cleanup --- cmd/cart/checkout_server.go | 120 +++++++++++++++++++ cmd/cart/k8s-host-discovery.go | 60 ++++++++++ cmd/cart/main.go | 204 ++++----------------------------- cmd/cart/pool-server.go | 15 +++ deployment/deployment.yaml | 6 +- pkg/actor/grain_pool.go | 1 + pkg/actor/simple_grain_pool.go | 4 + 7 files changed, 230 insertions(+), 180 deletions(-) create mode 100644 cmd/cart/checkout_server.go create mode 100644 cmd/cart/k8s-host-discovery.go diff --git a/cmd/cart/checkout_server.go b/cmd/cart/checkout_server.go new file mode 100644 index 0000000..5097e35 --- /dev/null +++ b/cmd/cart/checkout_server.go @@ -0,0 +1,120 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "time" + + "git.tornberg.me/go-cart-actor/pkg/actor" + "git.tornberg.me/go-cart-actor/pkg/cart" + amqp "github.com/rabbitmq/amqp091-go" +) + +func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux) { + conn, err := amqp.Dial(amqpUrl) + if err != nil { + log.Fatalf("failed to connect to RabbitMQ: %v", err) + } + + amqpListener := actor.NewAmqpListener(conn, func(id uint64, msg []actor.ApplyResult) (any, error) { + return &CartChangeEvent{ + CartId: cart.CartId(id), + Mutations: msg, + }, nil + }) + amqpListener.DefineTopics() + a.pool.AddListener(amqpListener) + orderHandler := NewAmqpOrderHandler(conn) + orderHandler.DefineTopics() + + mux.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) { + + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + orderId := r.URL.Query().Get("order_id") + log.Printf("Order confirmation push: %s", orderId) + + order, err := a.klarnaClient.GetOrder(orderId) + + if err != nil { + log.Printf("Error creating request: %v\n", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + err = confirmOrder(order, orderHandler) + if err != nil { + log.Printf("Error confirming order: %v\n", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + err = triggerOrderCompleted(a.server, order) + if err != nil { + log.Printf("Error processing cart message: %v\n", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + err = a.klarnaClient.AcknowledgeOrder(orderId) + if err != nil { + log.Printf("Error acknowledging order: %v\n", err) + } + + w.WriteHeader(http.StatusOK) + }) + + mux.HandleFunc("/checkout", a.server.CheckoutHandler(func(order *CheckoutOrder, w http.ResponseWriter) error { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")") + w.WriteHeader(http.StatusOK) + _, err := fmt.Fprintf(w, tpl, order.HTMLSnippet) + return err + })) + + mux.HandleFunc("/confirmation/{order_id}", func(w http.ResponseWriter, r *http.Request) { + + orderId := r.PathValue("order_id") + order, err := a.klarnaClient.GetOrder(orderId) + + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + + w.Header().Set("Content-Type", "text/html; charset=utf-8") + if order.Status == "checkout_complete" { + http.SetCookie(w, &http.Cookie{ + Name: "cartid", + Value: "", + Path: "/", + Secure: true, + HttpOnly: true, + Expires: time.Unix(0, 0), + SameSite: http.SameSiteLaxMode, + }) + } + + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, tpl, order.HTMLSnippet) + }) + mux.HandleFunc("/validate", func(w http.ResponseWriter, r *http.Request) { + log.Printf("Klarna order validation, method: %s", r.Method) + if r.Method != "POST" { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + order := &CheckoutOrder{} + err := json.NewDecoder(r.Body).Decode(order) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + } + log.Printf("Klarna order validation: %s", order.ID) + + w.WriteHeader(http.StatusOK) + }) +} diff --git a/cmd/cart/k8s-host-discovery.go b/cmd/cart/k8s-host-discovery.go new file mode 100644 index 0000000..bf552ab --- /dev/null +++ b/cmd/cart/k8s-host-discovery.go @@ -0,0 +1,60 @@ +package main + +import ( + "log" + + "git.tornberg.me/go-cart-actor/pkg/actor" + "git.tornberg.me/go-cart-actor/pkg/cart" + "git.tornberg.me/go-cart-actor/pkg/discovery" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +func GetDiscovery() discovery.Discovery { + if podIp == "" { + return nil + } + + config, kerr := rest.InClusterConfig() + + if kerr != nil { + log.Fatalf("Error creating kubernetes client: %v\n", kerr) + } + client, err := kubernetes.NewForConfig(config) + if err != nil { + log.Fatalf("Error creating client: %v\n", err) + } + return discovery.NewK8sDiscovery(client) +} + +func UseDiscovery(pool actor.GrainPool[*cart.CartGrain]) { + + go func(hw discovery.Discovery) { + if hw == nil { + log.Print("No discovery service available") + return + } + ch, err := hw.Watch() + if err != nil { + log.Printf("Discovery error: %v", err) + return + } + for evt := range ch { + if evt.Host == "" { + continue + } + switch evt.Type { + case watch.Deleted: + if pool.IsKnown(evt.Host) { + pool.RemoveHost(evt.Host) + } + default: + if !pool.IsKnown(evt.Host) { + log.Printf("Discovered host %s", evt.Host) + pool.AddRemoteHost(evt.Host) + } + } + } + }(GetDiscovery()) +} diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 1f9ee8d..d81d5a1 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -14,7 +14,6 @@ import ( "git.tornberg.me/go-cart-actor/pkg/actor" "git.tornberg.me/go-cart-actor/pkg/cart" - "git.tornberg.me/go-cart-actor/pkg/discovery" messages "git.tornberg.me/go-cart-actor/pkg/messages" "git.tornberg.me/go-cart-actor/pkg/promotions" "git.tornberg.me/go-cart-actor/pkg/proxy" @@ -22,10 +21,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" - amqp "github.com/rabbitmq/amqp091-go" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" ) var ( @@ -33,14 +28,6 @@ var ( Name: "cart_grain_spawned_total", Help: "The total number of spawned grains", }) - grainMutations = promauto.NewCounter(prometheus.CounterOpts{ - Name: "cart_grain_mutations_total", - Help: "The total number of mutations", - }) - grainLookups = promauto.NewCounter(prometheus.CounterOpts{ - Name: "cart_grain_lookups_total", - Help: "The total number of lookups", - }) ) func init() { @@ -48,7 +35,9 @@ func init() { } type App struct { - pool *actor.SimpleGrainPool[cart.CartGrain] + pool *actor.SimpleGrainPool[cart.CartGrain] + server *PoolServer + klarnaClient *KlarnaClient } var podIp = os.Getenv("POD_IP") @@ -79,23 +68,6 @@ func getCountryFromHost(host string) string { return "" } -func GetDiscovery() discovery.Discovery { - if podIp == "" { - return nil - } - - config, kerr := rest.InClusterConfig() - - if kerr != nil { - log.Fatalf("Error creating kubernetes client: %v\n", kerr) - } - client, err := kubernetes.NewForConfig(config) - if err != nil { - log.Fatalf("Error creating client: %v\n", err) - } - return discovery.NewK8sDiscovery(client) -} - type MutationContext struct { VoucherService voucher.Service } @@ -121,17 +93,17 @@ func main() { reg := cart.NewCartMultationRegistry() reg.RegisterProcessor( actor.NewMutationProcessor(func(g *cart.CartGrain) error { + g.UpdateTotals() ctx := promotions.NewContextFromCart(g, promotions.WithNow(time.Now()), promotions.WithCustomerSegment("vip")) _, actions := promotionService.EvaluateAll(promotionData.State.Promotions, ctx) for _, action := range actions { - log.Printf("apply: %V", action) + log.Printf("apply: %+v", action) + g.UpdateTotals() } return nil }), - actor.NewMutationProcessor(func(g *cart.CartGrain) error { - g.UpdateTotals() - return nil - })) + ) + diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg) poolConfig := actor.GrainPoolConfig[cart.CartGrain]{ MutationRegistry: reg, @@ -157,72 +129,24 @@ func main() { if err != nil { log.Fatalf("Error creating cart pool: %v\n", err) } - app := &App{ - pool: pool, - } klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient) + app := &App{ + pool: pool, + server: syncedServer, + klarnaClient: klarnaClient, + } + mux := http.NewServeMux() + debugMux := http.NewServeMux() if amqpUrl == "" { log.Printf("no connection to amqp defined") } else { - conn, err := amqp.Dial(amqpUrl) - if err != nil { - log.Fatalf("failed to connect to RabbitMQ: %w", err) - } - - amqpListener := actor.NewAmqpListener(conn, func(id uint64, msg []actor.ApplyResult) (any, error) { - return &CartChangeEvent{ - CartId: cart.CartId(id), - Mutations: msg, - }, nil - }) - amqpListener.DefineTopics() - pool.AddListener(amqpListener) - orderHandler := NewAmqpOrderHandler(conn) - orderHandler.DefineTopics() - - mux.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) { - - if r.Method != http.MethodPost { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - orderId := r.URL.Query().Get("order_id") - log.Printf("Order confirmation push: %s", orderId) - - order, err := klarnaClient.GetOrder(orderId) - - if err != nil { - log.Printf("Error creating request: %v\n", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - err = confirmOrder(order, orderHandler) - if err != nil { - log.Printf("Error confirming order: %v\n", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - err = triggerOrderCompleted(syncedServer, order) - if err != nil { - log.Printf("Error processing cart message: %v\n", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - err = klarnaClient.AcknowledgeOrder(orderId) - if err != nil { - log.Printf("Error acknowledging order: %v\n", err) - } - - w.WriteHeader(http.StatusOK) - }) + app.HandleCheckoutRequests(amqpUrl, mux) } grpcSrv, err := actor.NewControlServer[*cart.CartGrain](controlPlaneConfig, pool) @@ -232,48 +156,20 @@ func main() { defer grpcSrv.GracefulStop() // go diskStorage.SaveLoop(10 * time.Second) - - go func(hw discovery.Discovery) { - if hw == nil { - log.Print("No discovery service available") - return - } - ch, err := hw.Watch() - if err != nil { - log.Printf("Discovery error: %v", err) - return - } - for evt := range ch { - if evt.Host == "" { - continue - } - switch evt.Type { - case watch.Deleted: - if pool.IsKnown(evt.Host) { - pool.RemoveHost(evt.Host) - } - default: - if !pool.IsKnown(evt.Host) { - log.Printf("Discovered host %s", evt.Host) - pool.AddRemote(evt.Host) - } - } - } - }(GetDiscovery()) + UseDiscovery(pool) mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve())) // only for local mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) { pool.AddRemote(r.PathValue("host")) }) - // mux.HandleFunc("GET /save", app.HandleSave) - //mux.HandleFunc("/", app.RewritePath) - mux.HandleFunc("/debug/pprof/", pprof.Index) - mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - mux.HandleFunc("/debug/pprof/profile", pprof.Profile) - mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - mux.HandleFunc("/debug/pprof/trace", pprof.Trace) - mux.Handle("/metrics", promhttp.Handler()) + + debugMux.HandleFunc("/debug/pprof/", pprof.Index) + debugMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + debugMux.HandleFunc("/debug/pprof/profile", pprof.Profile) + debugMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + debugMux.HandleFunc("/debug/pprof/trace", pprof.Trace) + debugMux.Handle("/metrics", promhttp.Handler()) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { // Grain pool health: simple capacity check (mirrors previous GrainHandler.IsHealthy) grainCount, capacity := app.pool.LocalUsage() @@ -299,57 +195,6 @@ func main() { w.Write([]byte("ok")) }) - mux.HandleFunc("/checkout", syncedServer.CheckoutHandler(func(order *CheckoutOrder, w http.ResponseWriter) error { - w.Header().Set("Content-Type", "text/html; charset=utf-8") - w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")") - w.WriteHeader(http.StatusOK) - _, err := fmt.Fprintf(w, tpl, order.HTMLSnippet) - return err - })) - - mux.HandleFunc("/confirmation/{order_id}", func(w http.ResponseWriter, r *http.Request) { - - orderId := r.PathValue("order_id") - order, err := klarnaClient.GetOrder(orderId) - - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) - return - } - - w.Header().Set("Content-Type", "text/html; charset=utf-8") - if order.Status == "checkout_complete" { - http.SetCookie(w, &http.Cookie{ - Name: "cartid", - Value: "", - Path: "/", - Secure: true, - HttpOnly: true, - Expires: time.Unix(0, 0), - SameSite: http.SameSiteLaxMode, - }) - } - - w.WriteHeader(http.StatusOK) - fmt.Fprintf(w, tpl, order.HTMLSnippet) - }) - mux.HandleFunc("/validate", func(w http.ResponseWriter, r *http.Request) { - log.Printf("Klarna order validation, method: %s", r.Method) - if r.Method != "POST" { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - order := &CheckoutOrder{} - err := json.NewDecoder(r.Body).Decode(order) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - } - log.Printf("Klarna order validation: %s", order.ID) - - w.WriteHeader(http.StatusOK) - }) - mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("1.0.0")) @@ -372,6 +217,7 @@ func main() { log.Print("Server started at port 8080") go http.ListenAndServe(":8080", mux) + go http.ListenAndServe(":8081", debugMux) <-done } diff --git a/cmd/cart/pool-server.go b/cmd/cart/pool-server.go index bef6074..011d3a0 100644 --- a/cmd/cart/pool-server.go +++ b/cmd/cart/pool-server.go @@ -15,6 +15,19 @@ import ( messages "git.tornberg.me/go-cart-actor/pkg/messages" "git.tornberg.me/go-cart-actor/pkg/voucher" "github.com/gogo/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + grainMutations = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_grain_mutations_total", + Help: "The total number of mutations", + }) + grainLookups = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_grain_lookups_total", + Help: "The total number of lookups", + }) ) type PoolServer struct { @@ -54,6 +67,7 @@ func (s *PoolServer) AddSkuToCartHandler(w http.ResponseWriter, r *http.Request, if err != nil { return err } + grainMutations.Add(float64(len(data.Mutations))) return s.WriteResult(w, data) } @@ -436,6 +450,7 @@ func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request return func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error { if ownerHost, ok := s.OwnerHost(uint64(cartId)); ok { handled, err := ownerHost.Proxy(uint64(cartId), w, r) + grainLookups.Inc() if err == nil && handled { return nil } diff --git a/deployment/deployment.yaml b/deployment/deployment.yaml index 72097db..56f4bc9 100644 --- a/deployment/deployment.yaml +++ b/deployment/deployment.yaml @@ -143,6 +143,8 @@ spec: ports: - containerPort: 8080 name: web + - containerPort: 8081 + name: debug - containerPort: 1337 name: rpc livenessProbe: @@ -245,6 +247,8 @@ spec: ports: - containerPort: 8080 name: web + - containerPort: 8081 + name: debug - containerPort: 1337 name: rpc livenessProbe: @@ -300,7 +304,7 @@ apiVersion: v1 metadata: name: cart-actor annotations: - prometheus.io/port: "8080" + prometheus.io/port: "8081" prometheus.io/scrape: "true" prometheus.io/path: "/metrics" spec: diff --git a/pkg/actor/grain_pool.go b/pkg/actor/grain_pool.go index d81e2f3..2878493 100644 --- a/pkg/actor/grain_pool.go +++ b/pkg/actor/grain_pool.go @@ -22,6 +22,7 @@ type GrainPool[V any] interface { Negotiate(otherHosts []string) GetLocalIds() []uint64 RemoveHost(host string) + AddRemoteHost(host string) IsHealthy() bool IsKnown(string) bool Close() diff --git a/pkg/actor/simple_grain_pool.go b/pkg/actor/simple_grain_pool.go index c49111e..38a25e7 100644 --- a/pkg/actor/simple_grain_pool.go +++ b/pkg/actor/simple_grain_pool.go @@ -157,6 +157,10 @@ func (p *SimpleGrainPool[V]) TakeOwnership(id uint64) { p.broadcastOwnership([]uint64{id}) } +func (p *SimpleGrainPool[V]) AddRemoteHost(host string) { + p.AddRemote(host) +} + func (p *SimpleGrainPool[V]) AddRemote(host string) (Host, error) { if host == "" { return nil, fmt.Errorf("host is empty")