316 lines
8.5 KiB
Go
316 lines
8.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"net/http/pprof"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.k6n.net/go-cart-actor/pkg/actor"
|
|
"git.k6n.net/go-cart-actor/pkg/cart"
|
|
"git.k6n.net/go-cart-actor/pkg/promotions"
|
|
"git.k6n.net/go-cart-actor/pkg/proxy"
|
|
"git.k6n.net/go-cart-actor/pkg/voucher"
|
|
"github.com/matst80/go-redis-inventory/pkg/inventory"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/redis/go-redis/v9"
|
|
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
|
)
|
|
|
|
var (
|
|
grainSpawns = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_grain_spawned_total",
|
|
Help: "The total number of spawned grains",
|
|
})
|
|
)
|
|
|
|
func init() {
|
|
os.Mkdir("data", 0755)
|
|
}
|
|
|
|
type App struct {
|
|
pool *actor.SimpleGrainPool[cart.CartGrain]
|
|
server *PoolServer
|
|
}
|
|
|
|
var podIp = os.Getenv("POD_IP")
|
|
var name = os.Getenv("POD_NAME")
|
|
var amqpUrl = os.Getenv("AMQP_URL")
|
|
var redisAddress = os.Getenv("REDIS_ADDRESS")
|
|
var redisPassword = os.Getenv("REDIS_PASSWORD")
|
|
|
|
func getCountryFromHost(host string) string {
|
|
if strings.Contains(strings.ToLower(host), "-no") {
|
|
return "no"
|
|
}
|
|
if strings.Contains(strings.ToLower(host), "-se") {
|
|
return "se"
|
|
}
|
|
return ""
|
|
}
|
|
|
|
type MutationContext struct {
|
|
VoucherService voucher.Service
|
|
}
|
|
|
|
type CartChangeEvent struct {
|
|
CartId cart.CartId `json:"cartId"`
|
|
Mutations []actor.ApplyResult `json:"mutations"`
|
|
}
|
|
|
|
func matchesSkuAndLocation(update inventory.InventoryResult, item cart.CartItem) bool {
|
|
if string(update.SKU) == item.Sku {
|
|
if update.LocationID == "se" && item.StoreId == nil {
|
|
return true
|
|
}
|
|
if item.StoreId == nil {
|
|
return false
|
|
}
|
|
if *item.StoreId == string(update.LocationID) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func main() {
|
|
|
|
controlPlaneConfig := actor.DefaultServerConfig()
|
|
|
|
promotionData, err := promotions.LoadStateFile("data/promotions.json")
|
|
if err != nil {
|
|
log.Printf("Error loading promotions: %v\n", err)
|
|
}
|
|
|
|
log.Printf("loaded %d promotions", len(promotionData.State.Promotions))
|
|
|
|
inventoryPubSub := actor.NewPubSub[inventory.InventoryChange]()
|
|
|
|
// promotionService := promotions.NewPromotionService(nil)
|
|
rdb := redis.NewClient(&redis.Options{
|
|
Addr: redisAddress,
|
|
Password: redisPassword,
|
|
DB: 0,
|
|
})
|
|
inventoryService, err := inventory.NewRedisInventoryService(rdb)
|
|
if err != nil {
|
|
log.Fatalf("Error creating inventory service: %v\n", err)
|
|
}
|
|
|
|
inventoryReservationService, err := inventory.NewRedisCartReservationService(rdb)
|
|
if err != nil {
|
|
log.Fatalf("Error creating inventory reservation service: %v\n", err)
|
|
}
|
|
|
|
reg := cart.NewCartMultationRegistry(cart.NewCartMutationContext(inventoryReservationService))
|
|
reg.RegisterProcessor(
|
|
actor.NewMutationProcessor(func(ctx context.Context, g *cart.CartGrain) error {
|
|
_, span := tracer.Start(ctx, "Totals and promotions")
|
|
defer span.End()
|
|
g.UpdateTotals()
|
|
g.Version++
|
|
// promotionCtx := promotions.NewContextFromCart(g, promotions.WithNow(time.Now()), promotions.WithCustomerSegment("vip"))
|
|
// _, actions := promotionService.EvaluateAll(promotionData.State.Promotions, promotionCtx)
|
|
// for _, action := range actions {
|
|
|
|
// log.Printf("apply: %+v", action)
|
|
|
|
// g.UpdateTotals()
|
|
// }
|
|
return nil
|
|
}),
|
|
)
|
|
|
|
diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg)
|
|
poolConfig := actor.GrainPoolConfig[cart.CartGrain]{
|
|
MutationRegistry: reg,
|
|
Storage: diskStorage,
|
|
Spawn: func(ctx context.Context, id uint64) (actor.Grain[cart.CartGrain], error) {
|
|
_, span := tracer.Start(ctx, fmt.Sprintf("Spawn cart id %d", id))
|
|
defer span.End()
|
|
grainSpawns.Inc()
|
|
ret := cart.NewCartGrain(id, time.Now())
|
|
// Set baseline lastChange at spawn; replay may update it to last event timestamp.
|
|
|
|
inventoryPubSub.Subscribe(ret.HandleInventoryChange)
|
|
err := diskStorage.LoadEvents(ctx, id, ret)
|
|
if err == nil && inventoryService != nil {
|
|
refs := make([]*inventory.InventoryReference, 0)
|
|
for _, item := range ret.Items {
|
|
refs = append(refs, &inventory.InventoryReference{
|
|
SKU: inventory.SKU(item.Sku),
|
|
LocationID: getLocationId(item),
|
|
})
|
|
}
|
|
_, span := tracer.Start(ctx, "update inventory")
|
|
defer span.End()
|
|
res, err := inventoryService.GetInventoryBatch(ctx, refs...)
|
|
if err != nil {
|
|
log.Printf("unable to update inventory %v", err)
|
|
} else {
|
|
for _, update := range res {
|
|
for _, item := range ret.Items {
|
|
if matchesSkuAndLocation(update, *item) && update.Quantity != uint32(item.Stock) {
|
|
// maybe apply an update to give visibility to the cart
|
|
item.Stock = uint16(update.Quantity)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return ret, err
|
|
},
|
|
Destroy: func(grain actor.Grain[cart.CartGrain]) error {
|
|
cart, err := grain.GetCurrentState()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
inventoryPubSub.Unsubscribe(cart.HandleInventoryChange)
|
|
|
|
return nil
|
|
},
|
|
SpawnHost: func(host string) (actor.Host, error) {
|
|
return proxy.NewRemoteHost[cart.CartGrain](host)
|
|
},
|
|
TTL: 5 * time.Minute,
|
|
PoolSize: 2 * 65535,
|
|
Hostname: podIp,
|
|
}
|
|
|
|
pool, err := actor.NewSimpleGrainPool(poolConfig)
|
|
if err != nil {
|
|
log.Fatalf("Error creating cart pool: %v\n", err)
|
|
}
|
|
|
|
syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), inventoryService, inventoryReservationService)
|
|
|
|
app := &App{
|
|
pool: pool,
|
|
server: syncedServer,
|
|
}
|
|
|
|
mux := http.NewServeMux()
|
|
debugMux := http.NewServeMux()
|
|
|
|
grpcSrv, err := actor.NewControlServer[*cart.CartGrain](controlPlaneConfig, pool)
|
|
if err != nil {
|
|
log.Fatalf("Error starting control plane gRPC server: %v\n", err)
|
|
}
|
|
defer grpcSrv.GracefulStop()
|
|
|
|
// go diskStorage.SaveLoop(10 * time.Second)
|
|
UseDiscovery(pool)
|
|
|
|
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
|
defer stop()
|
|
|
|
otelShutdown, err := setupOTelSDK(ctx)
|
|
if err != nil {
|
|
log.Fatalf("Unable to start otel %v", err)
|
|
}
|
|
|
|
syncedServer.Serve(mux)
|
|
// only for local
|
|
mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) {
|
|
pool.AddRemote(r.PathValue("host"))
|
|
})
|
|
|
|
debugMux.HandleFunc("/debug/pprof/", pprof.Index)
|
|
debugMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
|
debugMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
|
debugMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
|
debugMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
|
debugMux.Handle("/metrics", promhttp.Handler())
|
|
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
|
|
// Grain pool health: simple capacity check (mirrors previous GrainHandler.IsHealthy)
|
|
grainCount, capacity := app.pool.LocalUsage()
|
|
if grainCount >= capacity {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
w.Write([]byte("grain pool at capacity"))
|
|
return
|
|
}
|
|
if !pool.IsHealthy() {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
w.Write([]byte("control plane not healthy"))
|
|
return
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("ok"))
|
|
})
|
|
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("ok"))
|
|
})
|
|
mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("ok"))
|
|
})
|
|
|
|
mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("1.0.0"))
|
|
})
|
|
|
|
mux.HandleFunc("/openapi.json", ServeEmbeddedOpenAPI)
|
|
|
|
srv := &http.Server{
|
|
Addr: ":8080",
|
|
BaseContext: func(net.Listener) context.Context { return ctx },
|
|
ReadTimeout: 10 * time.Second,
|
|
WriteTimeout: 20 * time.Second,
|
|
Handler: otelhttp.NewHandler(mux, "/"),
|
|
}
|
|
|
|
defer func() {
|
|
|
|
fmt.Println("Shutting down due to signal")
|
|
otelShutdown(context.Background())
|
|
diskStorage.Close()
|
|
pool.Close()
|
|
|
|
}()
|
|
|
|
srvErr := make(chan error, 1)
|
|
go func() {
|
|
srvErr <- srv.ListenAndServe()
|
|
}()
|
|
|
|
listener := inventory.NewInventoryChangeListener(rdb, context.Background(), func(changes []inventory.InventoryChange) {
|
|
for _, change := range changes {
|
|
log.Printf("inventory change: %v", change)
|
|
inventoryPubSub.Publish(change)
|
|
}
|
|
})
|
|
|
|
go func() {
|
|
err := listener.Start()
|
|
if err != nil {
|
|
log.Fatalf("Unable to start inventory listener: %v", err)
|
|
}
|
|
}()
|
|
|
|
log.Print("Server started at port 8080")
|
|
|
|
go http.ListenAndServe(":8081", debugMux)
|
|
|
|
select {
|
|
case err = <-srvErr:
|
|
// Error when starting HTTP server.
|
|
log.Fatalf("Unable to start server: %v", err)
|
|
case <-ctx.Done():
|
|
// Wait for first CTRL+C.
|
|
// Stop receiving signal notifications as soon as possible.
|
|
stop()
|
|
}
|
|
|
|
}
|