435 lines
11 KiB
Go
435 lines
11 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"net/http/pprof"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"git.tornberg.me/go-cart-actor/pkg/actor"
|
|
"git.tornberg.me/go-cart-actor/pkg/cart"
|
|
"git.tornberg.me/go-cart-actor/pkg/discovery"
|
|
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
|
"git.tornberg.me/go-cart-actor/pkg/proxy"
|
|
"git.tornberg.me/go-cart-actor/pkg/voucher"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
)
|
|
|
|
var (
|
|
grainSpawns = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_grain_spawned_total",
|
|
Help: "The total number of spawned grains",
|
|
})
|
|
grainMutations = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_grain_mutations_total",
|
|
Help: "The total number of mutations",
|
|
})
|
|
grainLookups = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_grain_lookups_total",
|
|
Help: "The total number of lookups",
|
|
})
|
|
)
|
|
|
|
func init() {
|
|
os.Mkdir("data", 0755)
|
|
}
|
|
|
|
type App struct {
|
|
pool *actor.SimpleGrainPool[cart.CartGrain]
|
|
}
|
|
|
|
var podIp = os.Getenv("POD_IP")
|
|
var name = os.Getenv("POD_NAME")
|
|
var amqpUrl = os.Getenv("AMQP_URL")
|
|
|
|
var tpl = `<!DOCTYPE html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="UTF-8" />
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
|
<title>s10r testing - checkout</title>
|
|
</head>
|
|
|
|
<body>
|
|
%s
|
|
</body>
|
|
</html>
|
|
`
|
|
|
|
func getCountryFromHost(host string) string {
|
|
if strings.Contains(strings.ToLower(host), "-no") {
|
|
return "no"
|
|
}
|
|
if strings.Contains(strings.ToLower(host), "-se") {
|
|
return "se"
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func GetDiscovery() discovery.Discovery {
|
|
if podIp == "" {
|
|
return nil
|
|
}
|
|
|
|
config, kerr := rest.InClusterConfig()
|
|
|
|
if kerr != nil {
|
|
log.Fatalf("Error creating kubernetes client: %v\n", kerr)
|
|
}
|
|
client, err := kubernetes.NewForConfig(config)
|
|
if err != nil {
|
|
log.Fatalf("Error creating client: %v\n", err)
|
|
}
|
|
return discovery.NewK8sDiscovery(client)
|
|
}
|
|
|
|
type MutationContext struct {
|
|
VoucherService voucher.Service
|
|
}
|
|
|
|
type CartChangeEvent struct {
|
|
CartId cart.CartId `json:"cartId"`
|
|
Mutations []actor.ApplyResult `json:"mutations"`
|
|
}
|
|
|
|
func main() {
|
|
|
|
controlPlaneConfig := actor.DefaultServerConfig()
|
|
|
|
reg := cart.NewCartMultationRegistry()
|
|
diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg)
|
|
poolConfig := actor.GrainPoolConfig[cart.CartGrain]{
|
|
MutationRegistry: reg,
|
|
Storage: diskStorage,
|
|
Spawn: func(id uint64) (actor.Grain[cart.CartGrain], error) {
|
|
grainSpawns.Inc()
|
|
ret := cart.NewCartGrain(id, time.Now())
|
|
// Set baseline lastChange at spawn; replay may update it to last event timestamp.
|
|
|
|
err := diskStorage.LoadEvents(id, ret)
|
|
|
|
return ret, err
|
|
},
|
|
SpawnHost: func(host string) (actor.Host, error) {
|
|
return proxy.NewRemoteHost(host)
|
|
},
|
|
TTL: 15 * time.Minute,
|
|
PoolSize: 2 * 65535,
|
|
Hostname: podIp,
|
|
}
|
|
|
|
pool, err := actor.NewSimpleGrainPool(poolConfig)
|
|
if err != nil {
|
|
log.Fatalf("Error creating cart pool: %v\n", err)
|
|
}
|
|
app := &App{
|
|
pool: pool,
|
|
}
|
|
|
|
conn, err := amqp.Dial(amqpUrl)
|
|
if err != nil {
|
|
fmt.Errorf("failed to connect to RabbitMQ: %w", err)
|
|
}
|
|
|
|
amqpListener := actor.NewAmqpListener(conn, func(id uint64, msg []actor.ApplyResult) (any, error) {
|
|
return &CartChangeEvent{
|
|
CartId: cart.CartId(id),
|
|
Mutations: msg,
|
|
}, nil
|
|
})
|
|
amqpListener.DefineTopics()
|
|
pool.AddListener(amqpListener)
|
|
|
|
grpcSrv, err := actor.NewControlServer[*cart.CartGrain](controlPlaneConfig, pool)
|
|
if err != nil {
|
|
log.Fatalf("Error starting control plane gRPC server: %v\n", err)
|
|
}
|
|
defer grpcSrv.GracefulStop()
|
|
|
|
// go diskStorage.SaveLoop(10 * time.Second)
|
|
|
|
go func(hw discovery.Discovery) {
|
|
if hw == nil {
|
|
log.Print("No discovery service available")
|
|
return
|
|
}
|
|
ch, err := hw.Watch()
|
|
if err != nil {
|
|
log.Printf("Discovery error: %v", err)
|
|
return
|
|
}
|
|
for evt := range ch {
|
|
if evt.Host == "" {
|
|
continue
|
|
}
|
|
switch evt.Type {
|
|
case watch.Deleted:
|
|
if pool.IsKnown(evt.Host) {
|
|
pool.RemoveHost(evt.Host)
|
|
}
|
|
default:
|
|
if !pool.IsKnown(evt.Host) {
|
|
log.Printf("Discovered host %s", evt.Host)
|
|
pool.AddRemote(evt.Host)
|
|
}
|
|
}
|
|
}
|
|
}(GetDiscovery())
|
|
|
|
orderHandler := NewAmqpOrderHandler(conn)
|
|
orderHandler.DefineTopics()
|
|
klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD"))
|
|
|
|
syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient)
|
|
|
|
mux := http.NewServeMux()
|
|
mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve()))
|
|
// only for local
|
|
mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) {
|
|
pool.AddRemote(r.PathValue("host"))
|
|
})
|
|
// mux.HandleFunc("GET /save", app.HandleSave)
|
|
//mux.HandleFunc("/", app.RewritePath)
|
|
mux.HandleFunc("/debug/pprof/", pprof.Index)
|
|
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
|
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
|
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
|
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
|
mux.Handle("/metrics", promhttp.Handler())
|
|
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
|
|
// Grain pool health: simple capacity check (mirrors previous GrainHandler.IsHealthy)
|
|
grainCount, capacity := app.pool.LocalUsage()
|
|
if grainCount >= capacity {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
w.Write([]byte("grain pool at capacity"))
|
|
return
|
|
}
|
|
if !pool.IsHealthy() {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
w.Write([]byte("control plane not healthy"))
|
|
return
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("ok"))
|
|
})
|
|
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("ok"))
|
|
})
|
|
mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("ok"))
|
|
})
|
|
|
|
mux.HandleFunc("/checkout", func(w http.ResponseWriter, r *http.Request) {
|
|
orderId := r.URL.Query().Get("order_id")
|
|
order := &CheckoutOrder{}
|
|
|
|
if orderId == "" {
|
|
cookie, err := r.Cookie("cartid")
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
if cookie.Value == "" {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
w.Write([]byte("no cart id to checkout is empty"))
|
|
return
|
|
}
|
|
parsed, ok := cart.ParseCartId(cookie.Value)
|
|
if !ok {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
w.Write([]byte("invalid cart id format"))
|
|
return
|
|
}
|
|
cartId := parsed
|
|
syncedServer.ProxyHandler(func(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
|
|
order, err = syncedServer.CreateOrUpdateCheckout(r.Host, cartId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")")
|
|
w.WriteHeader(http.StatusOK)
|
|
fmt.Fprintf(w, tpl, order.HTMLSnippet)
|
|
return nil
|
|
})(cartId, w, r)
|
|
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
w.Write([]byte(err.Error()))
|
|
}
|
|
|
|
// v2: Apply now returns *CartGrain; order creation handled inside grain (no payload to unmarshal)
|
|
} else {
|
|
order, err = klarnaClient.GetOrder(orderId)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")")
|
|
w.WriteHeader(http.StatusOK)
|
|
fmt.Fprintf(w, tpl, order.HTMLSnippet)
|
|
}
|
|
|
|
})
|
|
mux.HandleFunc("/confirmation/{order_id}", func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
orderId := r.PathValue("order_id")
|
|
order, err := klarnaClient.GetOrder(orderId)
|
|
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
if order.Status == "checkout_complete" {
|
|
http.SetCookie(w, &http.Cookie{
|
|
Name: "cartid",
|
|
Value: "",
|
|
Path: "/",
|
|
Secure: true,
|
|
HttpOnly: true,
|
|
Expires: time.Unix(0, 0),
|
|
SameSite: http.SameSiteLaxMode,
|
|
})
|
|
}
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
fmt.Fprintf(w, tpl, order.HTMLSnippet)
|
|
})
|
|
mux.HandleFunc("/validate", func(w http.ResponseWriter, r *http.Request) {
|
|
log.Printf("Klarna order validation, method: %s", r.Method)
|
|
if r.Method != "POST" {
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
order := &CheckoutOrder{}
|
|
err := json.NewDecoder(r.Body).Decode(order)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
}
|
|
log.Printf("Klarna order validation: %s", order.ID)
|
|
//err = confirmOrder(order, orderHandler)
|
|
//if err != nil {
|
|
// log.Printf("Error validating order: %v\n", err)
|
|
// w.WriteHeader(http.StatusInternalServerError)
|
|
// return
|
|
//}
|
|
//
|
|
//err = triggerOrderCompleted(err, syncedServer, order)
|
|
//if err != nil {
|
|
// log.Printf("Error processing cart message: %v\n", err)
|
|
// w.WriteHeader(http.StatusInternalServerError)
|
|
// return
|
|
//}
|
|
w.WriteHeader(http.StatusOK)
|
|
})
|
|
mux.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
if r.Method != http.MethodPost {
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
orderId := r.URL.Query().Get("order_id")
|
|
log.Printf("Order confirmation push: %s", orderId)
|
|
|
|
order, err := klarnaClient.GetOrder(orderId)
|
|
|
|
if err != nil {
|
|
log.Printf("Error creating request: %v\n", err)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
err = confirmOrder(order, orderHandler)
|
|
if err != nil {
|
|
log.Printf("Error confirming order: %v\n", err)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
err = triggerOrderCompleted(syncedServer, order)
|
|
if err != nil {
|
|
log.Printf("Error processing cart message: %v\n", err)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
err = klarnaClient.AcknowledgeOrder(orderId)
|
|
if err != nil {
|
|
log.Printf("Error acknowledging order: %v\n", err)
|
|
}
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
})
|
|
mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("1.0.0"))
|
|
})
|
|
|
|
mux.HandleFunc("/openapi.json", ServeEmbeddedOpenAPI)
|
|
|
|
sigs := make(chan os.Signal, 1)
|
|
done := make(chan bool, 1)
|
|
signal.Notify(sigs, syscall.SIGTERM)
|
|
|
|
go func() {
|
|
sig := <-sigs
|
|
fmt.Println("Shutting down due to signal:", sig)
|
|
diskStorage.Close()
|
|
pool.Close()
|
|
|
|
done <- true
|
|
}()
|
|
|
|
log.Print("Server started at port 8080")
|
|
go http.ListenAndServe(":8080", mux)
|
|
<-done
|
|
|
|
}
|
|
|
|
func triggerOrderCompleted(syncedServer *PoolServer, order *CheckoutOrder) error {
|
|
mutation := &messages.OrderCreated{
|
|
OrderId: order.ID,
|
|
Status: order.Status,
|
|
}
|
|
cid, ok := cart.ParseCartId(order.MerchantReference1)
|
|
if !ok {
|
|
return fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1)
|
|
}
|
|
_, applyErr := syncedServer.Apply(uint64(cid), mutation)
|
|
|
|
return applyErr
|
|
}
|
|
|
|
func confirmOrder(order *CheckoutOrder, orderHandler *AmqpOrderHandler) error {
|
|
orderToSend, err := json.Marshal(order)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = orderHandler.OrderCompleted(orderToSend)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|