Files
go-cart-actor/main.go
matst80 ef9d191625
Some checks failed
Build and Publish / BuildAndDeploy (push) Successful in 3m13s
Build and Publish / BuildAndDeployAmd64 (push) Has been cancelled
update
2025-04-18 19:04:39 +02:00

317 lines
7.8 KiB
Go

package main
import (
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"
messages "git.tornberg.me/go-cart-actor/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
var (
grainSpawns = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_spawned_total",
Help: "The total number of spawned grains",
})
grainMutations = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_mutations_total",
Help: "The total number of mutations",
})
grainLookups = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_lookups_total",
Help: "The total number of lookups",
})
)
func spawn(id CartId) (*CartGrain, error) {
grainSpawns.Inc()
ret := &CartGrain{
lastItemId: 0,
lastDeliveryId: 0,
Deliveries: []*CartDelivery{},
Id: id,
Items: []*CartItem{},
storageMessages: []Message{},
TotalPrice: 0,
}
err := loadMessages(ret, id)
return ret, err
}
func init() {
os.Mkdir("data", 0755)
}
type App struct {
pool *GrainLocalPool
storage *DiskStorage
}
func (a *App) Save() error {
hasChanges := false
a.pool.mu.RLock()
defer a.pool.mu.RUnlock()
for id, grain := range a.pool.GetGrains() {
if grain == nil {
continue
}
if grain.GetLastChange() > a.storage.LastSaves[id] {
hasChanges = true
err := a.storage.Store(id, grain)
if err != nil {
log.Printf("Error saving grain %s: %v\n", id, err)
}
}
}
if !hasChanges {
return nil
}
return a.storage.saveState()
}
func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) {
err := a.Save()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
} else {
w.WriteHeader(http.StatusCreated)
}
}
var podIp = os.Getenv("POD_IP")
var name = os.Getenv("POD_NAME")
var amqpUrl = os.Getenv("AMQP_URL")
func GetDiscovery() Discovery {
if podIp == "" {
return nil
}
config, kerr := rest.InClusterConfig()
if kerr != nil {
log.Fatalf("Error creating kubernetes client: %v\n", kerr)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating client: %v\n", err)
}
return NewK8sDiscovery(client)
}
func main() {
// Create a new instance of the server
storage, err := NewDiskStorage(fmt.Sprintf("data/%s_state.gob", name))
if err != nil {
log.Printf("Error loading state: %v\n", err)
}
app := &App{
pool: NewGrainLocalPool(65535, 5*time.Minute, spawn),
storage: storage,
}
syncedPool, err := NewSyncedPool(app.pool, podIp, GetDiscovery())
if err != nil {
log.Fatalf("Error creating synced pool: %v\n", err)
}
hg, err := NewGrainHandler(app.pool, ":1337")
if err != nil {
log.Fatalf("Error creating handler: %v\n", err)
}
go func() {
for range time.Tick(time.Minute * 10) {
err := app.Save()
if err != nil {
log.Printf("Error saving: %v\n", err)
}
}
}()
orderHandler := &AmqpOrderHandler{
Url: amqpUrl,
}
syncedServer := NewPoolServer(syncedPool, fmt.Sprintf("%s, %s", name, podIp))
mux := http.NewServeMux()
mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve()))
// only for local
// mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) {
// syncedPool.AddRemote(r.PathValue("host"))
// })
// mux.HandleFunc("GET /save", app.HandleSave)
//mux.HandleFunc("/", app.RewritePath)
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
if !hg.IsHealthy() {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("handler not healthy"))
return
}
if !syncedPool.IsHealthy() {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("pool not healthy"))
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
mux.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
orderId := r.URL.Query().Get("order_id")
log.Printf("Order confirmation push: %s", orderId)
req, err := http.NewRequest("GET", fmt.Sprintf("https://api.playground.klarna.com/checkout/v3/orders/%s", orderId), nil)
if err != nil {
log.Printf("Error creating request: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
req.Header.Add("Content-Type", "application/json")
req.SetBasicAuth(APIUsername, APIPassword)
res, err := http.DefaultClient.Do(req)
if nil != err {
log.Printf("Error making request: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
defer res.Body.Close()
var klarnaOrderResponse KlarnaOrderResponse
err = json.NewDecoder(res.Body).Decode(&klarnaOrderResponse)
if err != nil {
log.Printf("Error decoding response: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
orderToSend, err := json.Marshal(klarnaOrderResponse)
if err != nil {
log.Printf("Error marshaling order: %v\n", err)
} else {
err = orderHandler.Connect()
if err != nil {
log.Printf("Error connecting to order handler: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
defer orderHandler.Close()
err = orderHandler.OrderCompleted(orderToSend)
if err != nil {
log.Printf("Error sending order: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
}
_, err = syncedServer.pool.Process(ToCartId(klarnaOrderResponse.MerchantReference1), Message{
Type: OrderCompletedType,
Content: messages.OrderCreated{
OrderId: klarnaOrderResponse.OrderID,
Status: klarnaOrderResponse.Status,
},
})
if err != nil {
log.Printf("Error processing cart message: %v\n", err)
}
w.WriteHeader(http.StatusOK)
})
mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("1.0.0"))
})
l, err := net.Listen("tcp", ":1234")
if err == nil {
go func(listener net.Listener) {
conn, err := l.Accept()
log.Printf("Accepted connection from %s\n", conn.RemoteAddr())
if err != nil {
log.Fatalf("Error accepting echo connection: %v\n", err)
}
go func(c net.Conn) {
defer c.Close()
buf := make([]byte, 1024)
for {
n, err := c.Read(buf)
if err != nil {
log.Printf("Error reading from connection: %v\n", err)
break
}
if n == 0 {
break
}
str := string(buf[:n])
if strings.HasPrefix(str, "exit") {
break
}
log.Println("Echoing", str)
_, err = c.Write([]byte(str))
if err != nil {
log.Printf("Error writing to connection: %v\n", err)
break
}
}
}(conn)
}(l)
} else {
log.Printf("Error creating echo server: %v\n", err)
}
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Println("Shutting down due to signal:", sig)
go syncedPool.Close()
app.Save()
done <- true
}()
go http.ListenAndServe(":8080", mux)
<-done
}