refactor/checkout (#8)
Co-authored-by: matst80 <mats.tornberg@gmail.com> Reviewed-on: #8 Co-authored-by: Mats Törnberg <mats@tornberg.me> Co-committed-by: Mats Törnberg <mats@tornberg.me>
This commit was merged in pull request #8.
This commit is contained in:
196
cmd/checkout/main.go
Normal file
196
cmd/checkout/main.go
Normal file
@@ -0,0 +1,196 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"git.k6n.net/go-cart-actor/pkg/actor"
|
||||
"git.k6n.net/go-cart-actor/pkg/checkout"
|
||||
"git.k6n.net/go-cart-actor/pkg/proxy"
|
||||
"github.com/adyen/adyen-go-api-library/v21/src/adyen"
|
||||
"github.com/adyen/adyen-go-api-library/v21/src/common"
|
||||
"github.com/matst80/go-redis-inventory/pkg/inventory"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
)
|
||||
|
||||
var (
|
||||
grainSpawns = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "checkout_grain_spawned_total",
|
||||
Help: "The total number of spawned checkout grains",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
os.Mkdir("data", 0755)
|
||||
}
|
||||
|
||||
type App struct {
|
||||
pool *actor.SimpleGrainPool[checkout.CheckoutGrain]
|
||||
server *CheckoutPoolServer
|
||||
klarnaClient *KlarnaClient
|
||||
cartClient *CartClient // For internal communication to cart
|
||||
}
|
||||
|
||||
var podIp = os.Getenv("POD_IP")
|
||||
var name = os.Getenv("POD_NAME")
|
||||
var amqpUrl = os.Getenv("AMQP_URL")
|
||||
var redisAddress = os.Getenv("REDIS_ADDRESS")
|
||||
var redisPassword = os.Getenv("REDIS_PASSWORD")
|
||||
var cartInternalUrl = os.Getenv("CART_INTERNAL_URL") // e.g., http://cart-service:8081
|
||||
|
||||
func main() {
|
||||
|
||||
controlPlaneConfig := actor.DefaultServerConfig()
|
||||
|
||||
reg := checkout.NewCheckoutMutationRegistry(checkout.NewCheckoutMutationContext())
|
||||
reg.RegisterProcessor(
|
||||
actor.NewMutationProcessor(func(ctx context.Context, g *checkout.CheckoutGrain) error {
|
||||
g.Version++
|
||||
return nil
|
||||
}),
|
||||
)
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: redisAddress,
|
||||
Password: redisPassword,
|
||||
DB: 0,
|
||||
})
|
||||
inventoryService, err := inventory.NewRedisInventoryService(rdb)
|
||||
if err != nil {
|
||||
log.Fatalf("Error creating inventory service: %v\n", err)
|
||||
}
|
||||
|
||||
diskStorage := actor.NewDiskStorage[checkout.CheckoutGrain]("data", reg)
|
||||
|
||||
poolConfig := actor.GrainPoolConfig[checkout.CheckoutGrain]{
|
||||
MutationRegistry: reg,
|
||||
Storage: diskStorage,
|
||||
Spawn: func(ctx context.Context, id uint64) (actor.Grain[checkout.CheckoutGrain], error) {
|
||||
_, span := tracer.Start(ctx, fmt.Sprintf("Spawn checkout id %d", id))
|
||||
defer span.End()
|
||||
grainSpawns.Inc()
|
||||
|
||||
ret := checkout.NewCheckoutGrain(id, 0, 0, time.Now(), nil) // version to be set later
|
||||
return ret, nil
|
||||
},
|
||||
Destroy: func(grain actor.Grain[checkout.CheckoutGrain]) error {
|
||||
return nil
|
||||
},
|
||||
SpawnHost: func(host string) (actor.Host, error) {
|
||||
return proxy.NewRemoteHost[checkout.CheckoutGrain](host)
|
||||
},
|
||||
TTL: 1 * time.Hour, // Longer TTL for checkout
|
||||
PoolSize: 65535,
|
||||
Hostname: podIp,
|
||||
}
|
||||
|
||||
pool, err := actor.NewSimpleGrainPool(poolConfig)
|
||||
if err != nil {
|
||||
log.Fatalf("Error creating checkout pool: %v\n", err)
|
||||
}
|
||||
|
||||
adyenClient := adyen.NewClient(&common.Config{
|
||||
ApiKey: os.Getenv("ADYEN_API_KEY"),
|
||||
Environment: common.TestEnv,
|
||||
})
|
||||
|
||||
klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD"))
|
||||
|
||||
cartClient := NewCartClient(cartInternalUrl)
|
||||
|
||||
syncedServer := NewCheckoutPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient, cartClient, adyenClient)
|
||||
syncedServer.inventoryService = inventoryService
|
||||
|
||||
mux := http.NewServeMux()
|
||||
debugMux := http.NewServeMux()
|
||||
|
||||
if amqpUrl == "" {
|
||||
log.Fatalf("no connection to amqp defined")
|
||||
}
|
||||
|
||||
grpcSrv, err := actor.NewControlServer[*checkout.CheckoutGrain](controlPlaneConfig, pool)
|
||||
if err != nil {
|
||||
log.Fatalf("Error starting control plane gRPC server: %v\n", err)
|
||||
}
|
||||
defer grpcSrv.GracefulStop()
|
||||
|
||||
UseDiscovery(pool)
|
||||
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
defer stop()
|
||||
|
||||
otelShutdown, err := setupOTelSDK(ctx)
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to start otel %v", err)
|
||||
}
|
||||
|
||||
syncedServer.Serve(mux)
|
||||
|
||||
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
|
||||
grainCount, capacity := pool.LocalUsage()
|
||||
if grainCount >= capacity {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte("grain pool at capacity"))
|
||||
return
|
||||
}
|
||||
if !pool.IsHealthy() {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte("control plane not healthy"))
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("ok"))
|
||||
})
|
||||
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("ok"))
|
||||
})
|
||||
mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("ok"))
|
||||
})
|
||||
|
||||
mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("1.0.0"))
|
||||
})
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: ":8080",
|
||||
BaseContext: func(net.Listener) context.Context { return ctx },
|
||||
ReadTimeout: 10 * time.Second,
|
||||
WriteTimeout: 20 * time.Second,
|
||||
Handler: otelhttp.NewHandler(mux, "/"),
|
||||
}
|
||||
|
||||
defer func() {
|
||||
fmt.Println("Shutting down due to signal")
|
||||
otelShutdown(context.Background())
|
||||
diskStorage.Close()
|
||||
pool.Close()
|
||||
}()
|
||||
|
||||
srvErr := make(chan error, 1)
|
||||
go func() {
|
||||
srvErr <- srv.ListenAndServe()
|
||||
}()
|
||||
|
||||
log.Print("Checkout server started at port 8080")
|
||||
|
||||
go http.ListenAndServe(":8081", debugMux)
|
||||
|
||||
select {
|
||||
case err = <-srvErr:
|
||||
log.Fatalf("Unable to start server: %v", err)
|
||||
case <-ctx.Done():
|
||||
stop()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user