package main import ( "encoding/json" "fmt" messages "git.tornberg.me/go-cart-actor/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "log" "net/http" "net/http/pprof" "os" "os/signal" "syscall" "time" ) var ( grainSpawns = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_spawned_total", Help: "The total number of spawned grains", }) grainMutations = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_mutations_total", Help: "The total number of mutations", }) grainLookups = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_lookups_total", Help: "The total number of lookups", }) ) func spawn(id CartId) (*CartGrain, error) { grainSpawns.Inc() ret := &CartGrain{ lastItemId: 0, lastDeliveryId: 0, Deliveries: []*CartDelivery{}, Id: id, Items: []*CartItem{}, storageMessages: []Message{}, TotalPrice: 0, } err := loadMessages(ret, id) return ret, err } func init() { os.Mkdir("data", 0755) } type App struct { pool *GrainLocalPool storage *DiskStorage } func (a *App) Save() error { hasChanges := false a.pool.mu.RLock() defer a.pool.mu.RUnlock() for id, grain := range a.pool.GetGrains() { if grain == nil { continue } if grain.GetLastChange() > a.storage.LastSaves[id] { hasChanges = true err := a.storage.Store(id, grain) if err != nil { log.Printf("Error saving grain %s: %v\n", id, err) } } } if !hasChanges { return nil } return a.storage.saveState() } func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) { err := a.Save() if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) } else { w.WriteHeader(http.StatusCreated) } } var podIp = os.Getenv("POD_IP") var name = os.Getenv("POD_NAME") var amqpUrl = os.Getenv("AMQP_URL") var KlarnaInstance = NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) func GetDiscovery() Discovery { if podIp == "" { return nil } config, kerr := rest.InClusterConfig() if kerr != nil { log.Fatalf("Error creating kubernetes client: %v\n", kerr) } client, err := kubernetes.NewForConfig(config) if err != nil { log.Fatalf("Error creating client: %v\n", err) } return NewK8sDiscovery(client) } var tpl = ` s10r testing - checkout %s ` func main() { // Create a new instance of the server storage, err := NewDiskStorage(fmt.Sprintf("data/%s_state.gob", name)) if err != nil { log.Printf("Error loading state: %v\n", err) } app := &App{ pool: NewGrainLocalPool(65535, 5*time.Minute, spawn), storage: storage, } syncedPool, err := NewSyncedPool(app.pool, podIp, GetDiscovery()) if err != nil { log.Fatalf("Error creating synced pool: %v\n", err) } hg, err := NewGrainHandler(app.pool, ":1337") if err != nil { log.Fatalf("Error creating handler: %v\n", err) } go func() { for range time.Tick(time.Minute * 10) { err := app.Save() if err != nil { log.Printf("Error saving: %v\n", err) } } }() orderHandler := &AmqpOrderHandler{ Url: amqpUrl, } syncedServer := NewPoolServer(syncedPool, fmt.Sprintf("%s, %s", name, podIp)) mux := http.NewServeMux() mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve())) // only for local // mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) { // syncedPool.AddRemote(r.PathValue("host")) // }) // mux.HandleFunc("GET /save", app.HandleSave) //mux.HandleFunc("/", app.RewritePath) mux.HandleFunc("/debug/pprof/", pprof.Index) mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) mux.HandleFunc("/debug/pprof/profile", pprof.Profile) mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) mux.Handle("/metrics", promhttp.Handler()) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { if !hg.IsHealthy() { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("handler not healthy")) return } if !syncedPool.IsHealthy() { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("pool not healthy")) return } w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) mux.HandleFunc("/checkout", func(w http.ResponseWriter, r *http.Request) { orderId := r.URL.Query().Get("order_id") order := &CheckoutOrder{} if orderId == "" { cookie, err := r.Cookie("cartid") if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(err.Error())) return } if cookie.Value == "" { w.WriteHeader(http.StatusBadRequest) w.Write([]byte("no cart id to checkout is empty")) return } cartId := ToCartId(cookie.Value) reply, err := syncedServer.pool.Process(cartId, Message{ Type: CreateCheckoutOrderType, Content: &messages.CreateCheckoutOrder{ Terms: "https://slask-finder.tornberg.me/terms", Checkout: "https://slask-finder.tornberg.me/checkout?order_id={checkout.order.id}", Confirmation: "https://slask-finder.tornberg.me/confirmation/{checkout.order.id}", Validation: "https://cart.tornberg.me/validation", Push: "https://cart.tornberg.me/push?order_id={checkout.order.id}", }, }) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) } err = json.Unmarshal(reply.Payload, &order) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } } else { prevOrder, err := KlarnaInstance.GetOrder(orderId) if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(err.Error())) return } order = prevOrder } w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")") w.WriteHeader(http.StatusOK) w.Write([]byte(fmt.Sprintf(tpl, order.HTMLSnippet))) }) mux.HandleFunc("/confirmation/{order_id}", func(w http.ResponseWriter, r *http.Request) { orderId := r.PathValue("order_id") order, err := KlarnaInstance.GetOrder(orderId) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "text/html; charset=utf-8") if order.Status == "checkout_complete" { http.SetCookie(w, &http.Cookie{ Name: "cartid", Value: "", Path: "/", Expires: time.Unix(0, 0), SameSite: http.SameSiteLaxMode, }) } w.WriteHeader(http.StatusOK) w.Write([]byte(fmt.Sprintf(tpl, order.HTMLSnippet))) }) mux.HandleFunc("/validate", func(w http.ResponseWriter, r *http.Request) { log.Printf("Klarna order validation, method: %s", r.Method) if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } order := &CheckoutOrder{} err := json.NewDecoder(r.Body).Decode(order) if err != nil { w.WriteHeader(http.StatusBadRequest) } log.Printf("Klarna order validation: %s", order.ID) err = confirmOrder(order, orderHandler) if err != nil { log.Printf("Error confirming order: %v\n", err) w.WriteHeader(http.StatusInternalServerError) return } err = triggerOrderCompleted(err, syncedServer, order) if err != nil { log.Printf("Error processing cart message: %v\n", err) w.WriteHeader(http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) }) mux.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } orderId := r.URL.Query().Get("order_id") log.Printf("Order confirmation push: %s", orderId) order, err := KlarnaInstance.GetOrder(orderId) if err != nil { log.Printf("Error creating request: %v\n", err) w.WriteHeader(http.StatusInternalServerError) return } //err = confirmOrder(order, orderHandler) //if err != nil { // log.Printf("Error confirming order: %v\n", err) // w.WriteHeader(http.StatusInternalServerError) // return //} err = triggerOrderCompleted(err, syncedServer, order) if err != nil { log.Printf("Error processing cart message: %v\n", err) w.WriteHeader(http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) }) mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("1.0.0")) }) sigs := make(chan os.Signal, 1) done := make(chan bool, 1) signal.Notify(sigs, syscall.SIGTERM) go func() { sig := <-sigs fmt.Println("Shutting down due to signal:", sig) go syncedPool.Close() app.Save() done <- true }() go http.ListenAndServe(":8080", mux) <-done } func triggerOrderCompleted(err error, syncedServer *PoolServer, order *CheckoutOrder) error { _, err = syncedServer.pool.Process(ToCartId(order.MerchantReference1), Message{ Type: OrderCompletedType, Content: &messages.OrderCreated{ OrderId: order.ID, Status: order.Status, }, }) return err } func confirmOrder(order *CheckoutOrder, orderHandler *AmqpOrderHandler) error { orderToSend, err := json.Marshal(order) if err != nil { return err } err = orderHandler.Connect() if err != nil { return err } defer orderHandler.Close() err = orderHandler.OrderCompleted(orderToSend) if err != nil { return err } return nil }