222 lines
5.1 KiB
Go
222 lines
5.1 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"net/http/pprof"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
messages "git.tornberg.me/go-cart-actor/proto"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
)
|
|
|
|
var (
|
|
grainSpawns = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_grain_spawned_total",
|
|
Help: "The total number of spawned grains",
|
|
})
|
|
grainMutations = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_grain_mutations_total",
|
|
Help: "The total number of mutations",
|
|
})
|
|
grainLookups = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_grain_lookups_total",
|
|
Help: "The total number of lookups",
|
|
})
|
|
)
|
|
|
|
func spawn(id CartId) (*CartGrain, error) {
|
|
grainSpawns.Inc()
|
|
ret := &CartGrain{
|
|
Id: id,
|
|
Items: []CartItem{},
|
|
storageMessages: []Message{},
|
|
TotalPrice: 0,
|
|
}
|
|
err := loadMessages(ret, id)
|
|
return ret, err
|
|
}
|
|
|
|
func init() {
|
|
os.Mkdir("data", 0755)
|
|
}
|
|
|
|
type App struct {
|
|
pool *GrainLocalPool
|
|
storage *DiskStorage
|
|
}
|
|
|
|
func (a *App) Save() error {
|
|
hasChanges := false
|
|
for id, grain := range a.pool.GetGrains() {
|
|
if grain == nil {
|
|
continue
|
|
}
|
|
if grain.GetLastChange() > a.storage.LastSaves[id] {
|
|
hasChanges = true
|
|
err := a.storage.Store(id, grain)
|
|
if err != nil {
|
|
log.Printf("Error saving grain %s: %v\n", id, err)
|
|
}
|
|
}
|
|
}
|
|
if !hasChanges {
|
|
return nil
|
|
}
|
|
return a.storage.saveState()
|
|
}
|
|
|
|
func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) {
|
|
err := a.Save()
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
w.Write([]byte(err.Error()))
|
|
} else {
|
|
w.WriteHeader(http.StatusCreated)
|
|
}
|
|
}
|
|
|
|
type PoolServer struct {
|
|
pod_name string
|
|
pool GrainPool
|
|
}
|
|
|
|
func NewPoolServer(pool GrainPool, pod_name string) *PoolServer {
|
|
return &PoolServer{
|
|
pod_name: pod_name,
|
|
pool: pool,
|
|
}
|
|
}
|
|
|
|
func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request) {
|
|
id := r.PathValue("id")
|
|
data, err := s.pool.Get(ToCartId(id))
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("X-Pod-Name", s.pod_name)
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write(data)
|
|
}
|
|
|
|
func (s *PoolServer) HandleAddSku(w http.ResponseWriter, r *http.Request) {
|
|
id := r.PathValue("id")
|
|
sku := r.PathValue("sku")
|
|
data, err := s.pool.Process(ToCartId(id), Message{
|
|
Type: AddRequestType,
|
|
Content: &messages.AddRequest{Sku: sku},
|
|
})
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("X-Pod-Name", s.pod_name)
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write(data)
|
|
}
|
|
|
|
func (s *PoolServer) Serve() *http.ServeMux {
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("GET /{id}", s.HandleGet)
|
|
mux.HandleFunc("GET /{id}/add/{sku}", s.HandleAddSku)
|
|
return mux
|
|
}
|
|
|
|
var podIp = os.Getenv("POD_IP")
|
|
var name = os.Getenv("POD_NAME")
|
|
|
|
func main() {
|
|
// Create a new instance of the server
|
|
storage, err := NewDiskStorage(fmt.Sprintf("data/%s_state.gob", name))
|
|
if err != nil {
|
|
log.Printf("Error loading state: %v\n", err)
|
|
}
|
|
app := &App{
|
|
pool: NewGrainLocalPool(1000, 5*time.Minute, spawn),
|
|
storage: storage,
|
|
}
|
|
|
|
var config *rest.Config
|
|
var kerr error
|
|
if podIp == "" {
|
|
config, kerr = clientcmd.BuildConfigFromFlags("", "/Users/mats/.kube/config")
|
|
} else {
|
|
config, kerr = rest.InClusterConfig()
|
|
}
|
|
if kerr != nil {
|
|
log.Fatalf("Error creating kubernetes client: %v\n", err)
|
|
}
|
|
client, err := kubernetes.NewForConfig(config)
|
|
if err != nil {
|
|
log.Fatalf("Error creating client: %v\n", err)
|
|
}
|
|
d := NewK8sDiscovery(client)
|
|
|
|
syncedPool, err := NewSyncedPool(app.pool, podIp, d)
|
|
if err != nil {
|
|
log.Fatalf("Error creating synced pool: %v\n", err)
|
|
}
|
|
|
|
// if local
|
|
//syncedPool.AddRemote("localhost")
|
|
|
|
_, err = NewGrainHandler(app.pool, ":1337")
|
|
if err != nil {
|
|
log.Fatalf("Error creating handler: %v\n", err)
|
|
}
|
|
|
|
go func() {
|
|
for range time.Tick(time.Minute) {
|
|
|
|
err := app.Save()
|
|
if err != nil {
|
|
log.Printf("Error saving: %v\n", err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
syncedServer := NewPoolServer(syncedPool, name)
|
|
mux := http.NewServeMux()
|
|
mux.Handle("/api/", http.StripPrefix("/api", syncedServer.Serve()))
|
|
// only for local
|
|
// mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) {
|
|
// syncedPool.AddRemote(r.PathValue("host"))
|
|
// })
|
|
// mux.HandleFunc("GET /save", app.HandleSave)
|
|
|
|
mux.HandleFunc("/debug/pprof/", pprof.Index)
|
|
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
|
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
|
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
|
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
|
mux.Handle("/metrics", promhttp.Handler())
|
|
|
|
sigs := make(chan os.Signal, 1)
|
|
signal.Notify(sigs, syscall.SIGTERM)
|
|
done := make(chan bool, 1)
|
|
go func() {
|
|
sig := <-sigs
|
|
fmt.Println("Shutting down due to signal:", sig)
|
|
app.Save()
|
|
done <- true
|
|
}()
|
|
|
|
go http.ListenAndServe(":8080", mux)
|
|
<-done
|
|
|
|
}
|