Files
go-cart-actor/cmd/cart/main.go
matst80 00fcacf1be
All checks were successful
Build and Publish / Metadata (push) Successful in 11s
Build and Publish / BuildAndDeployAmd64 (push) Successful in 53s
Build and Publish / BuildAndDeployArm64 (push) Successful in 4m23s
test again
2025-11-05 15:02:24 +01:00

259 lines
6.6 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"
"git.tornberg.me/go-cart-actor/pkg/actor"
"git.tornberg.me/go-cart-actor/pkg/cart"
messages "git.tornberg.me/go-cart-actor/pkg/messages"
"git.tornberg.me/go-cart-actor/pkg/promotions"
"git.tornberg.me/go-cart-actor/pkg/proxy"
"git.tornberg.me/go-cart-actor/pkg/voucher"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
grainSpawns = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_spawned_total",
Help: "The total number of spawned grains",
})
)
func init() {
os.Mkdir("data", 0755)
}
type App struct {
pool *actor.SimpleGrainPool[cart.CartGrain]
server *PoolServer
klarnaClient *KlarnaClient
}
var podIp = os.Getenv("POD_IP")
var name = os.Getenv("POD_NAME")
var amqpUrl = os.Getenv("AMQP_URL")
var tpl = `<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>s10r testing - checkout</title>
</head>
<body>
%s
</body>
</html>
`
func getCountryFromHost(host string) string {
if strings.Contains(strings.ToLower(host), "-no") {
return "no"
}
if strings.Contains(strings.ToLower(host), "-se") {
return "se"
}
return ""
}
type MutationContext struct {
VoucherService voucher.Service
}
type CartChangeEvent struct {
CartId cart.CartId `json:"cartId"`
Mutations []actor.ApplyResult `json:"mutations"`
}
func main() {
controlPlaneConfig := actor.DefaultServerConfig()
promotionData, err := promotions.LoadStateFile("data/promotions.json")
if err != nil {
log.Printf("Error loading promotions: %v\n", err)
}
log.Printf("loaded %d promotions", len(promotionData.State.Promotions))
promotionService := promotions.NewPromotionService(nil)
reg := cart.NewCartMultationRegistry()
reg.RegisterProcessor(
actor.NewMutationProcessor(func(g *cart.CartGrain) error {
g.UpdateTotals()
ctx := promotions.NewContextFromCart(g, promotions.WithNow(time.Now()), promotions.WithCustomerSegment("vip"))
_, actions := promotionService.EvaluateAll(promotionData.State.Promotions, ctx)
for _, action := range actions {
log.Printf("apply: %+v", action)
g.UpdateTotals()
}
return nil
}),
)
diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg)
poolConfig := actor.GrainPoolConfig[cart.CartGrain]{
MutationRegistry: reg,
Storage: diskStorage,
Spawn: func(id uint64) (actor.Grain[cart.CartGrain], error) {
grainSpawns.Inc()
ret := cart.NewCartGrain(id, time.Now())
// Set baseline lastChange at spawn; replay may update it to last event timestamp.
err := diskStorage.LoadEvents(id, ret)
return ret, err
},
SpawnHost: func(host string) (actor.Host, error) {
return proxy.NewRemoteHost(host)
},
TTL: 15 * time.Minute,
PoolSize: 2 * 65535,
Hostname: podIp,
}
pool, err := actor.NewSimpleGrainPool(poolConfig)
if err != nil {
log.Fatalf("Error creating cart pool: %v\n", err)
}
klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD"))
syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient)
app := &App{
pool: pool,
server: syncedServer,
klarnaClient: klarnaClient,
}
mux := http.NewServeMux()
debugMux := http.NewServeMux()
if amqpUrl == "" {
log.Printf("no connection to amqp defined")
} else {
app.HandleCheckoutRequests(amqpUrl, mux)
}
grpcSrv, err := actor.NewControlServer[*cart.CartGrain](controlPlaneConfig, pool)
if err != nil {
log.Fatalf("Error starting control plane gRPC server: %v\n", err)
}
defer grpcSrv.GracefulStop()
// go diskStorage.SaveLoop(10 * time.Second)
UseDiscovery(pool)
mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve()))
// only for local
mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) {
pool.AddRemote(r.PathValue("host"))
})
debugMux.HandleFunc("/debug/pprof/", pprof.Index)
debugMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
debugMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
debugMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
debugMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
debugMux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
// Grain pool health: simple capacity check (mirrors previous GrainHandler.IsHealthy)
grainCount, capacity := app.pool.LocalUsage()
if grainCount >= capacity {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("grain pool at capacity"))
return
}
if !pool.IsHealthy() {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("control plane not healthy"))
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("1.0.0"))
})
mux.HandleFunc("/openapi.json", ServeEmbeddedOpenAPI)
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGTERM)
otelShutdown, err := setupOTelSDK(context.Background())
if err != nil {
log.Fatalf("Unable to start otel %v", err)
}
go func() {
sig := <-sigs
fmt.Println("Shutting down due to signal:", sig)
otelShutdown(context.Background())
diskStorage.Close()
pool.Close()
done <- true
}()
log.Print("Server started at port 8080")
go http.ListenAndServe(":8080", mux)
go http.ListenAndServe(":8081", debugMux)
<-done
}
func triggerOrderCompleted(syncedServer *PoolServer, order *CheckoutOrder) error {
mutation := &messages.OrderCreated{
OrderId: order.ID,
Status: order.Status,
}
cid, ok := cart.ParseCartId(order.MerchantReference1)
if !ok {
return fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1)
}
_, applyErr := syncedServer.Apply(uint64(cid), mutation)
return applyErr
}
func confirmOrder(order *CheckoutOrder, orderHandler *AmqpOrderHandler) error {
orderToSend, err := json.Marshal(order)
if err != nil {
return err
}
err = orderHandler.OrderCompleted(orderToSend)
if err != nil {
return err
}
return nil
}