Files
go-cart-actor/main.go
matst80 7de9693083
Some checks failed
Build and Publish / BuildAndDeploy (push) Has been cancelled
log only saved grains
2024-11-10 00:13:21 +01:00

192 lines
4.4 KiB
Go

package main
import (
"fmt"
"log"
"net/http"
"os"
"time"
messages "git.tornberg.me/go-cart-actor/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
var (
grainSpawns = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_spawned_total",
Help: "The total number of spawned grains",
})
grainMutations = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_mutations_total",
Help: "The total number of mutations",
})
grainLookups = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_lookups_total",
Help: "The total number of lookups",
})
)
func spawn(id CartId) (*CartGrain, error) {
grainSpawns.Inc()
ret := &CartGrain{
Id: id,
Items: []CartItem{},
storageMessages: []Message{},
TotalPrice: 0,
}
err := loadMessages(ret, id)
return ret, err
}
func init() {
os.Mkdir("data", 0755)
}
type App struct {
pool *GrainLocalPool
storage *DiskStorage
}
func (a *App) Save() error {
for id, grain := range a.pool.GetGrains() {
if grain == nil {
continue
}
log.Printf("Saving grain %s\n", id)
err := a.storage.Store(id, grain)
if err != nil {
log.Printf("Error saving grain %s: %v\n", id, err)
}
}
return a.storage.saveState()
}
func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) {
err := a.Save()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
} else {
w.WriteHeader(http.StatusCreated)
}
}
type PoolServer struct {
pool GrainPool
}
func NewPoolServer(pool GrainPool) *PoolServer {
return &PoolServer{
pool: pool,
}
}
func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
data, err := s.pool.Get(ToCartId(id))
if err != nil {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(data)
}
func (s *PoolServer) HandleAddSku(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
sku := r.PathValue("sku")
data, err := s.pool.Process(ToCartId(id), Message{
Type: AddRequestType,
Content: &messages.AddRequest{Sku: sku},
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(data)
}
func (s *PoolServer) Serve() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("GET /{id}", s.HandleGet)
mux.HandleFunc("GET /{id}/add/{sku}", s.HandleAddSku)
return mux
}
var clientName = os.Getenv("POD_IP")
var name = os.Getenv("POD_NAME")
func main() {
// Create a new instance of the server
storage, err := NewDiskStorage(fmt.Sprintf("data/%s_state.gob", name))
if err != nil {
log.Printf("Error loading state: %v\n", err)
}
app := &App{
pool: NewGrainLocalPool(1000, 5*time.Minute, spawn),
storage: storage,
}
var config *rest.Config
var kerr error
if clientName == "" {
config, kerr = clientcmd.BuildConfigFromFlags("", "/Users/mats/.kube/config")
} else {
config, kerr = rest.InClusterConfig()
}
if kerr != nil {
log.Fatalf("Error creating kubernetes client: %v\n", err)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating client: %v\n", err)
}
d := NewK8sDiscovery(client)
syncedPool, err := NewSyncedPool(app.pool, clientName, d)
if err != nil {
log.Fatalf("Error creating synced pool: %v\n", err)
}
// if local
//syncedPool.AddRemote("localhost")
rpcHandler, err := NewGrainHandler(app.pool, ":1337")
if err != nil {
log.Fatalf("Error creating handler: %v\n", err)
}
go rpcHandler.Serve()
go func() {
for range time.Tick(time.Minute) {
err := app.Save()
if err != nil {
log.Printf("Error saving: %v\n", err)
}
}
}()
syncedServer := NewPoolServer(syncedPool)
mux := http.NewServeMux()
mux.Handle("/api/", http.StripPrefix("/api", syncedServer.Serve()))
mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) {
syncedPool.AddRemote(r.PathValue("host"))
})
mux.HandleFunc("GET /save", app.HandleSave)
mux.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(":8080", mux)
}