package main import ( "fmt" "log" "net/http" "os" "os/signal" "syscall" "time" messages "git.tornberg.me/go-cart-actor/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) var ( grainSpawns = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_spawned_total", Help: "The total number of spawned grains", }) grainMutations = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_mutations_total", Help: "The total number of mutations", }) grainLookups = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_lookups_total", Help: "The total number of lookups", }) ) func spawn(id CartId) (*CartGrain, error) { grainSpawns.Inc() ret := &CartGrain{ Id: id, Items: []CartItem{}, storageMessages: []Message{}, TotalPrice: 0, } err := loadMessages(ret, id) return ret, err } func init() { os.Mkdir("data", 0755) } type App struct { pool *GrainLocalPool storage *DiskStorage } func (a *App) Save() error { hasChanges := false for id, grain := range a.pool.GetGrains() { if grain == nil { continue } if grain.GetLastChange() > a.storage.LastSaves[id] { hasChanges = true err := a.storage.Store(id, grain) if err != nil { log.Printf("Error saving grain %s: %v\n", id, err) } } } if !hasChanges { return nil } return a.storage.saveState() } func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) { err := a.Save() if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) } else { w.WriteHeader(http.StatusCreated) } } type PoolServer struct { pod_name string pool GrainPool } func NewPoolServer(pool GrainPool, pod_name string) *PoolServer { return &PoolServer{ pod_name: pod_name, pool: pool, } } func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") data, err := s.pool.Get(ToCartId(id)) if err != nil { w.WriteHeader(http.StatusNotFound) w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "application/json") w.Header().Set("X-Pod-Name", s.pod_name) w.WriteHeader(http.StatusOK) w.Write(data) } func (s *PoolServer) HandleAddSku(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") sku := r.PathValue("sku") data, err := s.pool.Process(ToCartId(id), Message{ Type: AddRequestType, Content: &messages.AddRequest{Sku: sku}, }) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "application/json") w.Header().Set("X-Pod-Name", s.pod_name) w.WriteHeader(http.StatusOK) w.Write(data) } func (s *PoolServer) Serve() *http.ServeMux { mux := http.NewServeMux() mux.HandleFunc("GET /{id}", s.HandleGet) mux.HandleFunc("GET /{id}/add/{sku}", s.HandleAddSku) return mux } var podIp = os.Getenv("POD_IP") var name = os.Getenv("POD_NAME") func main() { // Create a new instance of the server storage, err := NewDiskStorage(fmt.Sprintf("data/%s_state.gob", name)) if err != nil { log.Printf("Error loading state: %v\n", err) } app := &App{ pool: NewGrainLocalPool(1000, 5*time.Minute, spawn), storage: storage, } var config *rest.Config var kerr error if podIp == "" { config, kerr = clientcmd.BuildConfigFromFlags("", "/Users/mats/.kube/config") } else { config, kerr = rest.InClusterConfig() } if kerr != nil { log.Fatalf("Error creating kubernetes client: %v\n", err) } client, err := kubernetes.NewForConfig(config) if err != nil { log.Fatalf("Error creating client: %v\n", err) } d := NewK8sDiscovery(client) syncedPool, err := NewSyncedPool(app.pool, podIp, d) if err != nil { log.Fatalf("Error creating synced pool: %v\n", err) } // if local //syncedPool.AddRemote("localhost") _, err = NewGrainHandler(app.pool, ":1337") if err != nil { log.Fatalf("Error creating handler: %v\n", err) } go func() { for range time.Tick(time.Minute) { err := app.Save() if err != nil { log.Printf("Error saving: %v\n", err) } } }() syncedServer := NewPoolServer(syncedPool, name) mux := http.NewServeMux() mux.Handle("/api/", http.StripPrefix("/api", syncedServer.Serve())) // only for local // mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) { // syncedPool.AddRemote(r.PathValue("host")) // }) // mux.HandleFunc("GET /save", app.HandleSave) mux.Handle("/metrics", promhttp.Handler()) sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) done := make(chan bool, 1) go func() { sig := <-sigs fmt.Println("Shutting down due to signal:", sig) app.Save() done <- true }() go http.ListenAndServe(":8080", mux) <-done }