Files
go-cart-actor/main.go
matst80 c70c5cd930
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m58s
maybe
2024-11-10 21:43:40 +01:00

171 lines
3.8 KiB
Go

package main
import (
"fmt"
"log"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
var (
grainSpawns = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_spawned_total",
Help: "The total number of spawned grains",
})
grainMutations = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_mutations_total",
Help: "The total number of mutations",
})
grainLookups = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_lookups_total",
Help: "The total number of lookups",
})
)
func spawn(id CartId) (*CartGrain, error) {
grainSpawns.Inc()
ret := &CartGrain{
Id: id,
Items: []CartItem{},
storageMessages: []Message{},
TotalPrice: 0,
}
err := loadMessages(ret, id)
return ret, err
}
func init() {
os.Mkdir("data", 0755)
}
type App struct {
pool *GrainLocalPool
storage *DiskStorage
}
func (a *App) Save() error {
hasChanges := false
for id, grain := range a.pool.GetGrains() {
if grain == nil {
continue
}
if grain.GetLastChange() > a.storage.LastSaves[id] {
hasChanges = true
err := a.storage.Store(id, grain)
if err != nil {
log.Printf("Error saving grain %s: %v\n", id, err)
}
}
}
if !hasChanges {
return nil
}
return a.storage.saveState()
}
func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) {
err := a.Save()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
} else {
w.WriteHeader(http.StatusCreated)
}
}
var podIp = os.Getenv("POD_IP")
var name = os.Getenv("POD_NAME")
func main() {
// Create a new instance of the server
storage, err := NewDiskStorage(fmt.Sprintf("data/%s_state.gob", name))
if err != nil {
log.Printf("Error loading state: %v\n", err)
}
app := &App{
pool: NewGrainLocalPool(1000, 5*time.Minute, spawn),
storage: storage,
}
var config *rest.Config
var kerr error
if podIp == "" {
config, kerr = clientcmd.BuildConfigFromFlags("", "/home/mats/.kube/config")
} else {
config, kerr = rest.InClusterConfig()
}
if kerr != nil {
log.Fatalf("Error creating kubernetes client: %v\n", err)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating client: %v\n", err)
}
d := NewK8sDiscovery(client)
syncedPool, err := NewSyncedPool(app.pool, podIp, d)
if err != nil {
log.Fatalf("Error creating synced pool: %v\n", err)
}
// if local
//syncedPool.AddRemote("localhost")
_, err = NewGrainHandler(app.pool, ":1337")
if err != nil {
log.Fatalf("Error creating handler: %v\n", err)
}
go func() {
for range time.Tick(time.Minute) {
err := app.Save()
if err != nil {
log.Printf("Error saving: %v\n", err)
}
}
}()
syncedServer := NewPoolServer(syncedPool, fmt.Sprintf("%s, %s", name, podIp))
mux := http.NewServeMux()
mux.Handle("/api/", http.StripPrefix("/api", syncedServer.Serve()))
// only for local
// mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) {
// syncedPool.AddRemote(r.PathValue("host"))
// })
// mux.HandleFunc("GET /save", app.HandleSave)
mux.HandleFunc("/pprof/", pprof.Index)
mux.HandleFunc("/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/pprof/profile", pprof.Profile)
mux.HandleFunc("/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/pprof/trace", pprof.Trace)
mux.Handle("/metrics", promhttp.Handler())
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Println("Shutting down due to signal:", sig)
app.Save()
done <- true
}()
go http.ListenAndServe(":8080", mux)
<-done
}