Files
go-cart-actor/main.go
matst80 ed7b025134
Some checks failed
Build and Publish / BuildAndDeploy (push) Failing after 2m55s
Build and Publish / BuildAndDeployAmd64 (push) Has been cancelled
validate
2025-05-13 20:40:48 +02:00

385 lines
9.9 KiB
Go

package main
import (
"encoding/json"
"fmt"
messages "git.tornberg.me/go-cart-actor/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"log"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
)
var (
grainSpawns = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_spawned_total",
Help: "The total number of spawned grains",
})
grainMutations = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_mutations_total",
Help: "The total number of mutations",
})
grainLookups = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_lookups_total",
Help: "The total number of lookups",
})
)
func spawn(id CartId) (*CartGrain, error) {
grainSpawns.Inc()
ret := &CartGrain{
lastItemId: 0,
lastDeliveryId: 0,
Deliveries: []*CartDelivery{},
Id: id,
Items: []*CartItem{},
storageMessages: []Message{},
TotalPrice: 0,
}
err := loadMessages(ret, id)
return ret, err
}
func init() {
os.Mkdir("data", 0755)
}
type App struct {
pool *GrainLocalPool
storage *DiskStorage
}
func (a *App) Save() error {
hasChanges := false
a.pool.mu.RLock()
defer a.pool.mu.RUnlock()
for id, grain := range a.pool.GetGrains() {
if grain == nil {
continue
}
if grain.GetLastChange() > a.storage.LastSaves[id] {
hasChanges = true
err := a.storage.Store(id, grain)
if err != nil {
log.Printf("Error saving grain %s: %v\n", id, err)
}
}
}
if !hasChanges {
return nil
}
return a.storage.saveState()
}
func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) {
err := a.Save()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
} else {
w.WriteHeader(http.StatusCreated)
}
}
var podIp = os.Getenv("POD_IP")
var name = os.Getenv("POD_NAME")
var amqpUrl = os.Getenv("AMQP_URL")
var KlarnaInstance = NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD"))
func GetDiscovery() Discovery {
if podIp == "" {
return nil
}
config, kerr := rest.InClusterConfig()
if kerr != nil {
log.Fatalf("Error creating kubernetes client: %v\n", kerr)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating client: %v\n", err)
}
return NewK8sDiscovery(client)
}
var tpl = `<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>s10r testing - checkout</title>
</head>
<body>
%s
</body>
</html>
`
func main() {
// Create a new instance of the server
storage, err := NewDiskStorage(fmt.Sprintf("data/%s_state.gob", name))
if err != nil {
log.Printf("Error loading state: %v\n", err)
}
app := &App{
pool: NewGrainLocalPool(65535, 5*time.Minute, spawn),
storage: storage,
}
syncedPool, err := NewSyncedPool(app.pool, podIp, GetDiscovery())
if err != nil {
log.Fatalf("Error creating synced pool: %v\n", err)
}
hg, err := NewGrainHandler(app.pool, ":1337")
if err != nil {
log.Fatalf("Error creating handler: %v\n", err)
}
go func() {
for range time.Tick(time.Minute * 10) {
err := app.Save()
if err != nil {
log.Printf("Error saving: %v\n", err)
}
}
}()
orderHandler := &AmqpOrderHandler{
Url: amqpUrl,
}
syncedServer := NewPoolServer(syncedPool, fmt.Sprintf("%s, %s", name, podIp))
mux := http.NewServeMux()
mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve()))
// only for local
// mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) {
// syncedPool.AddRemote(r.PathValue("host"))
// })
// mux.HandleFunc("GET /save", app.HandleSave)
//mux.HandleFunc("/", app.RewritePath)
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
if !hg.IsHealthy() {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("handler not healthy"))
return
}
if !syncedPool.IsHealthy() {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("pool not healthy"))
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
mux.HandleFunc("/checkout", func(w http.ResponseWriter, r *http.Request) {
orderId := r.URL.Query().Get("order_id")
order := &CheckoutOrder{}
if orderId == "" {
cookie, err := r.Cookie("cartid")
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
if cookie.Value == "" {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("no cart id to checkout is empty"))
return
}
cartId := ToCartId(cookie.Value)
reply, err := syncedServer.pool.Process(cartId, Message{
Type: CreateCheckoutOrderType,
Content: &messages.CreateCheckoutOrder{
Terms: "https://slask-finder.tornberg.me/terms",
Checkout: "https://slask-finder.tornberg.me/checkout?order_id={checkout.order.id}",
Confirmation: "https://slask-finder.tornberg.me/confirmation/{checkout.order.id}",
Validation: "https://cart.tornberg.me/validation",
Push: "https://cart.tornberg.me/push?order_id={checkout.order.id}",
},
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
}
err = json.Unmarshal(reply.Payload, &order)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
} else {
prevOrder, err := KlarnaInstance.GetOrder(orderId)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
order = prevOrder
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")")
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf(tpl, order.HTMLSnippet)))
})
mux.HandleFunc("/confirmation/{order_id}", func(w http.ResponseWriter, r *http.Request) {
orderId := r.PathValue("order_id")
order, err := KlarnaInstance.GetOrder(orderId)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if order.Status == "checkout_complete" {
http.SetCookie(w, &http.Cookie{
Name: "cartid",
Value: "",
Path: "/",
Expires: time.Unix(0, 0),
SameSite: http.SameSiteLaxMode,
})
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf(tpl, order.HTMLSnippet)))
})
mux.HandleFunc("/validate", func(w http.ResponseWriter, r *http.Request) {
log.Printf("Klarna order validation, method: %s", r.Method)
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
order := &CheckoutOrder{}
err := json.NewDecoder(r.Body).Decode(order)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
}
log.Printf("Klarna order validation: %s", order.ID)
err = confirmOrder(order, orderHandler)
if err != nil {
log.Printf("Error validating order: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = triggerOrderCompleted(err, syncedServer, order)
if err != nil {
log.Printf("Error processing cart message: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
})
mux.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
orderId := r.URL.Query().Get("order_id")
log.Printf("Order confirmation push: %s", orderId)
order, err := KlarnaInstance.GetOrder(orderId)
if err != nil {
log.Printf("Error creating request: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = confirmOrder(order, orderHandler)
if err != nil {
log.Printf("Error confirming order: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = triggerOrderCompleted(err, syncedServer, order)
if err != nil {
log.Printf("Error processing cart message: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
})
mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("1.0.0"))
})
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Println("Shutting down due to signal:", sig)
go syncedPool.Close()
app.Save()
done <- true
}()
go http.ListenAndServe(":8080", mux)
<-done
}
func triggerOrderCompleted(err error, syncedServer *PoolServer, order *CheckoutOrder) error {
_, err = syncedServer.pool.Process(ToCartId(order.MerchantReference1), Message{
Type: OrderCompletedType,
Content: &messages.OrderCreated{
OrderId: order.ID,
Status: order.Status,
},
})
return err
}
func confirmOrder(order *CheckoutOrder, orderHandler *AmqpOrderHandler) error {
orderToSend, err := json.Marshal(order)
if err != nil {
return err
}
err = orderHandler.Connect()
if err != nil {
return err
}
defer orderHandler.Close()
err = orderHandler.OrderCompleted(orderToSend)
if err != nil {
return err
}
return nil
}