cleanup
All checks were successful
Build and Publish / Metadata (push) Successful in 10s
Build and Publish / BuildAndDeployAmd64 (push) Successful in 1m10s
Build and Publish / BuildAndDeployArm64 (push) Successful in 4m35s

This commit is contained in:
matst80
2025-10-22 12:39:48 +02:00
parent e91433eda7
commit 2202c149b8
7 changed files with 230 additions and 180 deletions

120
cmd/cart/checkout_server.go Normal file
View File

@@ -0,0 +1,120 @@
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"git.tornberg.me/go-cart-actor/pkg/actor"
"git.tornberg.me/go-cart-actor/pkg/cart"
amqp "github.com/rabbitmq/amqp091-go"
)
func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux) {
conn, err := amqp.Dial(amqpUrl)
if err != nil {
log.Fatalf("failed to connect to RabbitMQ: %v", err)
}
amqpListener := actor.NewAmqpListener(conn, func(id uint64, msg []actor.ApplyResult) (any, error) {
return &CartChangeEvent{
CartId: cart.CartId(id),
Mutations: msg,
}, nil
})
amqpListener.DefineTopics()
a.pool.AddListener(amqpListener)
orderHandler := NewAmqpOrderHandler(conn)
orderHandler.DefineTopics()
mux.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
orderId := r.URL.Query().Get("order_id")
log.Printf("Order confirmation push: %s", orderId)
order, err := a.klarnaClient.GetOrder(orderId)
if err != nil {
log.Printf("Error creating request: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = confirmOrder(order, orderHandler)
if err != nil {
log.Printf("Error confirming order: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = triggerOrderCompleted(a.server, order)
if err != nil {
log.Printf("Error processing cart message: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = a.klarnaClient.AcknowledgeOrder(orderId)
if err != nil {
log.Printf("Error acknowledging order: %v\n", err)
}
w.WriteHeader(http.StatusOK)
})
mux.HandleFunc("/checkout", a.server.CheckoutHandler(func(order *CheckoutOrder, w http.ResponseWriter) error {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")")
w.WriteHeader(http.StatusOK)
_, err := fmt.Fprintf(w, tpl, order.HTMLSnippet)
return err
}))
mux.HandleFunc("/confirmation/{order_id}", func(w http.ResponseWriter, r *http.Request) {
orderId := r.PathValue("order_id")
order, err := a.klarnaClient.GetOrder(orderId)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if order.Status == "checkout_complete" {
http.SetCookie(w, &http.Cookie{
Name: "cartid",
Value: "",
Path: "/",
Secure: true,
HttpOnly: true,
Expires: time.Unix(0, 0),
SameSite: http.SameSiteLaxMode,
})
}
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, tpl, order.HTMLSnippet)
})
mux.HandleFunc("/validate", func(w http.ResponseWriter, r *http.Request) {
log.Printf("Klarna order validation, method: %s", r.Method)
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
order := &CheckoutOrder{}
err := json.NewDecoder(r.Body).Decode(order)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
}
log.Printf("Klarna order validation: %s", order.ID)
w.WriteHeader(http.StatusOK)
})
}

View File

@@ -0,0 +1,60 @@
package main
import (
"log"
"git.tornberg.me/go-cart-actor/pkg/actor"
"git.tornberg.me/go-cart-actor/pkg/cart"
"git.tornberg.me/go-cart-actor/pkg/discovery"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
func GetDiscovery() discovery.Discovery {
if podIp == "" {
return nil
}
config, kerr := rest.InClusterConfig()
if kerr != nil {
log.Fatalf("Error creating kubernetes client: %v\n", kerr)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating client: %v\n", err)
}
return discovery.NewK8sDiscovery(client)
}
func UseDiscovery(pool actor.GrainPool[*cart.CartGrain]) {
go func(hw discovery.Discovery) {
if hw == nil {
log.Print("No discovery service available")
return
}
ch, err := hw.Watch()
if err != nil {
log.Printf("Discovery error: %v", err)
return
}
for evt := range ch {
if evt.Host == "" {
continue
}
switch evt.Type {
case watch.Deleted:
if pool.IsKnown(evt.Host) {
pool.RemoveHost(evt.Host)
}
default:
if !pool.IsKnown(evt.Host) {
log.Printf("Discovered host %s", evt.Host)
pool.AddRemoteHost(evt.Host)
}
}
}
}(GetDiscovery())
}

View File

@@ -14,7 +14,6 @@ import (
"git.tornberg.me/go-cart-actor/pkg/actor"
"git.tornberg.me/go-cart-actor/pkg/cart"
"git.tornberg.me/go-cart-actor/pkg/discovery"
messages "git.tornberg.me/go-cart-actor/pkg/messages"
"git.tornberg.me/go-cart-actor/pkg/promotions"
"git.tornberg.me/go-cart-actor/pkg/proxy"
@@ -22,10 +21,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
amqp "github.com/rabbitmq/amqp091-go"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
var (
@@ -33,14 +28,6 @@ var (
Name: "cart_grain_spawned_total",
Help: "The total number of spawned grains",
})
grainMutations = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_mutations_total",
Help: "The total number of mutations",
})
grainLookups = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_lookups_total",
Help: "The total number of lookups",
})
)
func init() {
@@ -48,7 +35,9 @@ func init() {
}
type App struct {
pool *actor.SimpleGrainPool[cart.CartGrain]
pool *actor.SimpleGrainPool[cart.CartGrain]
server *PoolServer
klarnaClient *KlarnaClient
}
var podIp = os.Getenv("POD_IP")
@@ -79,23 +68,6 @@ func getCountryFromHost(host string) string {
return ""
}
func GetDiscovery() discovery.Discovery {
if podIp == "" {
return nil
}
config, kerr := rest.InClusterConfig()
if kerr != nil {
log.Fatalf("Error creating kubernetes client: %v\n", kerr)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating client: %v\n", err)
}
return discovery.NewK8sDiscovery(client)
}
type MutationContext struct {
VoucherService voucher.Service
}
@@ -121,17 +93,17 @@ func main() {
reg := cart.NewCartMultationRegistry()
reg.RegisterProcessor(
actor.NewMutationProcessor(func(g *cart.CartGrain) error {
g.UpdateTotals()
ctx := promotions.NewContextFromCart(g, promotions.WithNow(time.Now()), promotions.WithCustomerSegment("vip"))
_, actions := promotionService.EvaluateAll(promotionData.State.Promotions, ctx)
for _, action := range actions {
log.Printf("apply: %V", action)
log.Printf("apply: %+v", action)
g.UpdateTotals()
}
return nil
}),
actor.NewMutationProcessor(func(g *cart.CartGrain) error {
g.UpdateTotals()
return nil
}))
)
diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg)
poolConfig := actor.GrainPoolConfig[cart.CartGrain]{
MutationRegistry: reg,
@@ -157,72 +129,24 @@ func main() {
if err != nil {
log.Fatalf("Error creating cart pool: %v\n", err)
}
app := &App{
pool: pool,
}
klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD"))
syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient)
app := &App{
pool: pool,
server: syncedServer,
klarnaClient: klarnaClient,
}
mux := http.NewServeMux()
debugMux := http.NewServeMux()
if amqpUrl == "" {
log.Printf("no connection to amqp defined")
} else {
conn, err := amqp.Dial(amqpUrl)
if err != nil {
log.Fatalf("failed to connect to RabbitMQ: %w", err)
}
amqpListener := actor.NewAmqpListener(conn, func(id uint64, msg []actor.ApplyResult) (any, error) {
return &CartChangeEvent{
CartId: cart.CartId(id),
Mutations: msg,
}, nil
})
amqpListener.DefineTopics()
pool.AddListener(amqpListener)
orderHandler := NewAmqpOrderHandler(conn)
orderHandler.DefineTopics()
mux.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
orderId := r.URL.Query().Get("order_id")
log.Printf("Order confirmation push: %s", orderId)
order, err := klarnaClient.GetOrder(orderId)
if err != nil {
log.Printf("Error creating request: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = confirmOrder(order, orderHandler)
if err != nil {
log.Printf("Error confirming order: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = triggerOrderCompleted(syncedServer, order)
if err != nil {
log.Printf("Error processing cart message: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = klarnaClient.AcknowledgeOrder(orderId)
if err != nil {
log.Printf("Error acknowledging order: %v\n", err)
}
w.WriteHeader(http.StatusOK)
})
app.HandleCheckoutRequests(amqpUrl, mux)
}
grpcSrv, err := actor.NewControlServer[*cart.CartGrain](controlPlaneConfig, pool)
@@ -232,48 +156,20 @@ func main() {
defer grpcSrv.GracefulStop()
// go diskStorage.SaveLoop(10 * time.Second)
go func(hw discovery.Discovery) {
if hw == nil {
log.Print("No discovery service available")
return
}
ch, err := hw.Watch()
if err != nil {
log.Printf("Discovery error: %v", err)
return
}
for evt := range ch {
if evt.Host == "" {
continue
}
switch evt.Type {
case watch.Deleted:
if pool.IsKnown(evt.Host) {
pool.RemoveHost(evt.Host)
}
default:
if !pool.IsKnown(evt.Host) {
log.Printf("Discovered host %s", evt.Host)
pool.AddRemote(evt.Host)
}
}
}
}(GetDiscovery())
UseDiscovery(pool)
mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve()))
// only for local
mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) {
pool.AddRemote(r.PathValue("host"))
})
// mux.HandleFunc("GET /save", app.HandleSave)
//mux.HandleFunc("/", app.RewritePath)
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.Handle("/metrics", promhttp.Handler())
debugMux.HandleFunc("/debug/pprof/", pprof.Index)
debugMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
debugMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
debugMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
debugMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
debugMux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
// Grain pool health: simple capacity check (mirrors previous GrainHandler.IsHealthy)
grainCount, capacity := app.pool.LocalUsage()
@@ -299,57 +195,6 @@ func main() {
w.Write([]byte("ok"))
})
mux.HandleFunc("/checkout", syncedServer.CheckoutHandler(func(order *CheckoutOrder, w http.ResponseWriter) error {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")")
w.WriteHeader(http.StatusOK)
_, err := fmt.Fprintf(w, tpl, order.HTMLSnippet)
return err
}))
mux.HandleFunc("/confirmation/{order_id}", func(w http.ResponseWriter, r *http.Request) {
orderId := r.PathValue("order_id")
order, err := klarnaClient.GetOrder(orderId)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if order.Status == "checkout_complete" {
http.SetCookie(w, &http.Cookie{
Name: "cartid",
Value: "",
Path: "/",
Secure: true,
HttpOnly: true,
Expires: time.Unix(0, 0),
SameSite: http.SameSiteLaxMode,
})
}
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, tpl, order.HTMLSnippet)
})
mux.HandleFunc("/validate", func(w http.ResponseWriter, r *http.Request) {
log.Printf("Klarna order validation, method: %s", r.Method)
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
order := &CheckoutOrder{}
err := json.NewDecoder(r.Body).Decode(order)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
}
log.Printf("Klarna order validation: %s", order.ID)
w.WriteHeader(http.StatusOK)
})
mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("1.0.0"))
@@ -372,6 +217,7 @@ func main() {
log.Print("Server started at port 8080")
go http.ListenAndServe(":8080", mux)
go http.ListenAndServe(":8081", debugMux)
<-done
}

View File

@@ -15,6 +15,19 @@ import (
messages "git.tornberg.me/go-cart-actor/pkg/messages"
"git.tornberg.me/go-cart-actor/pkg/voucher"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
grainMutations = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_mutations_total",
Help: "The total number of mutations",
})
grainLookups = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_lookups_total",
Help: "The total number of lookups",
})
)
type PoolServer struct {
@@ -54,6 +67,7 @@ func (s *PoolServer) AddSkuToCartHandler(w http.ResponseWriter, r *http.Request,
if err != nil {
return err
}
grainMutations.Add(float64(len(data.Mutations)))
return s.WriteResult(w, data)
}
@@ -436,6 +450,7 @@ func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request
return func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error {
if ownerHost, ok := s.OwnerHost(uint64(cartId)); ok {
handled, err := ownerHost.Proxy(uint64(cartId), w, r)
grainLookups.Inc()
if err == nil && handled {
return nil
}

View File

@@ -143,6 +143,8 @@ spec:
ports:
- containerPort: 8080
name: web
- containerPort: 8081
name: debug
- containerPort: 1337
name: rpc
livenessProbe:
@@ -245,6 +247,8 @@ spec:
ports:
- containerPort: 8080
name: web
- containerPort: 8081
name: debug
- containerPort: 1337
name: rpc
livenessProbe:
@@ -300,7 +304,7 @@ apiVersion: v1
metadata:
name: cart-actor
annotations:
prometheus.io/port: "8080"
prometheus.io/port: "8081"
prometheus.io/scrape: "true"
prometheus.io/path: "/metrics"
spec:

View File

@@ -22,6 +22,7 @@ type GrainPool[V any] interface {
Negotiate(otherHosts []string)
GetLocalIds() []uint64
RemoveHost(host string)
AddRemoteHost(host string)
IsHealthy() bool
IsKnown(string) bool
Close()

View File

@@ -157,6 +157,10 @@ func (p *SimpleGrainPool[V]) TakeOwnership(id uint64) {
p.broadcastOwnership([]uint64{id})
}
func (p *SimpleGrainPool[V]) AddRemoteHost(host string) {
p.AddRemote(host)
}
func (p *SimpleGrainPool[V]) AddRemote(host string) (Host, error) {
if host == "" {
return nil, fmt.Errorf("host is empty")