diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8fcbb00 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__debug* \ No newline at end of file diff --git a/cart-grain.go b/cart-grain.go new file mode 100644 index 0000000..8dee653 --- /dev/null +++ b/cart-grain.go @@ -0,0 +1,141 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "time" + + "github.com/matst80/slask-finder/pkg/index" +) + +type Message struct { + TimeStamp *int64 + Type string + Content interface{} +} + +type CartItem struct { + Sku string `json:"sku"` + Name string `json:"name"` + Price int `json:"price"` + Image string `json:"image"` +} + +type CartGrain struct { + storageMessages []Message + Id string `json:"id"` + Items []CartItem `json:"items"` + TotalPrice int `json:"totalPrice"` +} + +type Grain interface { + GetId() string + GetLastChange() int64 + HandleMessage(message *Message, isReplay bool, reply *CartGrain) error + GetStorageMessage(since int64) []Message +} + +func (c *CartGrain) GetId() string { + return c.Id +} + +func (c *CartGrain) GetLastChange() int64 { + if len(c.storageMessages) == 0 { + return 0 + } + return *c.storageMessages[len(c.storageMessages)-1].TimeStamp +} + +func getItemData(sku string) (*CartItem, error) { + res, err := http.Get("https://slask-finder.tornberg.me/api/get/" + sku) + if err != nil { + return nil, err + } + defer res.Body.Close() + var item index.DataItem + err = json.NewDecoder(res.Body).Decode(&item) + if err != nil { + return nil, err + } + price := item.GetPrice() + if price == 0 { + priceField, ok := item.GetFields()[4] + if ok { + + pricef, ok := priceField.(float64) + if !ok { + price, ok = priceField.(int) + if !ok { + return nil, fmt.Errorf("invalid price type") + } + } else { + price = int(pricef) + } + } + } + return &CartItem{ + Sku: item.Sku, + Name: item.Title, + Price: price, + Image: item.Img, + }, nil +} + +func (c *CartGrain) AddItem(sku string, reply *CartGrain) error { + cartItem, err := getItemData(sku) + if err != nil { + return err + } + return c.HandleMessage(&Message{ + Type: "append", + Content: *cartItem, + }, false, reply) +} + +func (c *CartGrain) GetStorageMessage(since int64) []Message { + if since == 0 { + return c.storageMessages + } + ret := make([]Message, 0) + for _, message := range c.storageMessages { + if *message.TimeStamp > since { + ret = append(ret, message) + } + } + return ret +} + +func (c *CartGrain) HandleMessage(message *Message, isReplay bool, reply *CartGrain) error { + log.Printf("Handling message %s", message.Type) + if message.TimeStamp == nil { + now := time.Now().Unix() + message.TimeStamp = &now + } + var err error + switch message.Type { + case "add": + sku, ok := message.Content.(string) + if !ok { + err = fmt.Errorf("invalid content type") + } else { + return c.AddItem(sku, reply) + } + case "append": + item, ok := message.Content.(CartItem) + if !ok { + err = fmt.Errorf("invalid content type") + } else { + c.Items = append(c.Items, item) + c.TotalPrice += item.Price + } + default: + err = fmt.Errorf("unknown message type") + } + if !isReplay { + c.storageMessages = append(c.storageMessages, *message) + } + *reply = *c + return err +} diff --git a/disk-storage.go b/disk-storage.go new file mode 100644 index 0000000..4b81044 --- /dev/null +++ b/disk-storage.go @@ -0,0 +1,124 @@ +package main + +import ( + "encoding/gob" + "encoding/json" + "errors" + "fmt" + "log" + "os" + "time" +) + +type DiskStorage struct { + stateFile string + lastSave int64 + LastSaves map[string]int64 +} + +func NewDiskStorage(stateFile string) (*DiskStorage, error) { + ret := &DiskStorage{ + stateFile: stateFile, + LastSaves: make(map[string]int64), + } + err := ret.loadState() + return ret, err +} + +func saveMessages(messages []Message, id string) error { + log.Printf("%d messages to save for %s", len(messages), id) + if len(messages) == 0 { + return nil + } + var file *os.File + var err error + path := getCartPath(id) + file, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer file.Close() + // z := gzip.NewWriter(file) + // defer z.Close() + enc := gob.NewEncoder(file) + for _, m := range messages { + err = enc.Encode(m) + } + return err +} + +func getCartPath(id string) string { + return fmt.Sprintf("data/%s.gob", id) +} + +func loadMessages(grain Grain, id string) error { + var err error + path := getCartPath(id) + if _, err = os.Stat(path); errors.Is(err, os.ErrNotExist) { + return err + } + file, err := os.Open(path) + if err != nil { + return err + } + defer file.Close() + // z, err := gzip.NewReader(file) + // if err != nil { + // return err + // } + // defer z.Close() + + var reply CartGrain + decoder := gob.NewDecoder(file) + for err == nil { + var message Message + err = decoder.Decode(&message) + if err != nil { + grain.HandleMessage(&message, true, &reply) + } + } + if err.Error() == "EOF" { + return nil + } + return err +} + +func (s *DiskStorage) saveState() error { + tmpFile := s.stateFile + "_tmp" + file, err := os.Create(tmpFile) + if err != nil { + return err + } + defer file.Close() + err = json.NewEncoder(file).Encode(s.LastSaves) + if err != nil { + return err + } + os.Remove(s.stateFile + ".bak") + os.Rename(s.stateFile, s.stateFile+".bak") + return os.Rename(tmpFile, s.stateFile) +} + +func (s *DiskStorage) loadState() error { + file, err := os.Open("data/state.json") + if err != nil { + return err + } + defer file.Close() + return json.NewDecoder(file).Decode(&s.LastSaves) +} + +func (s *DiskStorage) Store(id string, grain Grain) error { + lastSavedMessage, ok := s.LastSaves[id] + if ok && lastSavedMessage > grain.GetLastChange() { + return nil + } + err := saveMessages(grain.GetStorageMessage(lastSavedMessage), id) + if err != nil { + return err + } + ts := time.Now().Unix() + s.LastSaves[id] = ts + s.lastSave = ts + return nil +} diff --git a/go.mod b/go.mod index ec17698..61be3ee 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,26 @@ module git.tornberg.me/go-cart-actor -go 1.22.4 +go 1.23.0 + +toolchain go1.23.2 + +require ( + cloud.google.com/go/compute/metadata v0.3.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/golang-jwt/jwt v3.2.2+incompatible // indirect + github.com/gorilla/schema v1.4.1 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/matst80/slask-finder v0.0.0-20241104074525-3365cb1531ac // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_golang v1.20.4 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect + github.com/redis/go-redis/v9 v9.5.3 // indirect + golang.org/x/oauth2 v0.23.0 // indirect + golang.org/x/sys v0.22.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..005d338 --- /dev/null +++ b/go.sum @@ -0,0 +1,36 @@ +cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= +cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= +github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/gorilla/schema v1.4.1 h1:jUg5hUjCSDZpNGLuXQOgIWGdlgrIdYvgQ0wZtdK1M3E= +github.com/gorilla/schema v1.4.1/go.mod h1:Dg5SSm5PV60mhF2NFaTV1xuYYj8tV8NOPRo4FggUMnM= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/matst80/slask-finder v0.0.0-20241104074525-3365cb1531ac h1:zakA1ck6dY4mMUGoZWGCjV3YP/8TiPtdJYvvieC6v8U= +github.com/matst80/slask-finder v0.0.0-20241104074525-3365cb1531ac/go.mod h1:GCLeU45b+BNgLly5XbeB0A+47ctctp2SVHZ3NlfZqzs= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/prometheus/client_golang v1.20.4 h1:Tgh3Yr67PaOv/uTqloMsCEdeuFTatm5zIq5+qNN23vI= +github.com/prometheus/client_golang v1.20.4/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= +github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= +golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= diff --git a/grain-pool.go b/grain-pool.go new file mode 100644 index 0000000..82a8091 --- /dev/null +++ b/grain-pool.go @@ -0,0 +1,89 @@ +package main + +import ( + "fmt" + "log" + "time" +) + +type GrainPool interface { + GetOrSpawn(id string) (Grain, error) + Get(id string) (Grain, error) +} + +type Ttl struct { + Expires time.Time + Item Grain +} + +type GrainLocalPool struct { + grains map[string]Grain + expiry []Ttl + Ttl time.Duration + PoolSize int +} + +func NewGrainLocalPool(size int, ttl time.Duration) *GrainLocalPool { + ret := &GrainLocalPool{ + grains: make(map[string]Grain), + expiry: make([]Ttl, 0), + Ttl: ttl, + PoolSize: size, + } + cartPurge := time.NewTicker(time.Minute) + go func() { + <-cartPurge.C + ret.Purge() + }() + return ret +} + +func (p *GrainLocalPool) Purge() { + lastChangeTime := time.Now().Add(-p.Ttl) + keepChanged := lastChangeTime.Unix() + for i := 0; i < len(p.expiry); i++ { + item := p.expiry[i] + if item.Expires.Before(time.Now()) { + if item.Item.GetLastChange() > keepChanged { + log.Printf("Changed item %s expired, keeping", item.Item.GetId()) + p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) + p.expiry = append(p.expiry, item) + } else { + log.Printf("Item %s expired", item.Item.GetId()) + delete(p.grains, item.Item.GetId()) + p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) + } + } else { + break + } + } +} + +func (p *GrainLocalPool) GetGrains() map[string]Grain { + return p.grains +} + +func (p *GrainLocalPool) GetOrSpawn(id string, generator func(id string) Grain) (Grain, error) { + grain, ok := p.grains[id] + if !ok { + if len(p.grains) >= p.PoolSize { + if p.expiry[0].Expires.Before(time.Now()) { + delete(p.grains, p.expiry[0].Item.GetId()) + p.expiry = p.expiry[1:] + } else { + return nil, fmt.Errorf("pool is full") + } + } + grain = generator(id) + p.grains[id] = grain + } + return grain, nil +} + +func (p *GrainLocalPool) Get(id string) (Grain, error) { + grain, ok := p.grains[id] + if !ok { + return nil, fmt.Errorf("grain not found") + } + return grain, nil +} diff --git a/grain-server.go b/grain-server.go new file mode 100644 index 0000000..297f997 --- /dev/null +++ b/grain-server.go @@ -0,0 +1,23 @@ +package main + +import ( + "fmt" + "net" + "net/rpc" +) + +type GrainServer struct { + Host string +} + +func NewServer(hostname string) *GrainServer { + return &GrainServer{ + Host: hostname, + } +} + +func (s *GrainServer) Start(port int, instance Grain) (net.Listener, error) { + rpc.Register(instance) + rpc.HandleHTTP() + return net.Listen("tcp", fmt.Sprintf(":%d", port)) +} diff --git a/main.go b/main.go index 3c908f7..b9e35bd 100644 --- a/main.go +++ b/main.go @@ -1,143 +1,111 @@ package main import ( - "fmt" - "net" + "encoding/gob" + "encoding/json" + "log" "net/http" - "net/rpc" + "os" "time" ) -type GrainServer struct { - Host string -} +func spawn(id string) Grain { -type Grain interface { - HandleMessage(message *Message, reply *CartGrain) error -} - -func NewServer(hostname string) *GrainServer { - return &GrainServer{ - Host: hostname, + ret := &CartGrain{ + Id: id, + Items: []CartItem{}, + storageMessages: []Message{}, + TotalPrice: 0, } -} - -func (s *GrainServer) Start(port int, instance Grain) (net.Listener, error) { - rpc.Register(instance) - rpc.HandleHTTP() - return net.Listen("tcp", fmt.Sprintf(":%d", port)) -} - -type CartGrain struct { - Skus []string -} - -type Message struct { - Type string - Content string -} - -type Registry interface { - Register(address string, id string) error - Get(id string) (*string, error) -} - -type MemoryRegistry struct { - registry map[string]string -} - -func (r *MemoryRegistry) Register(address string, id string) error { - r.registry[id] = address - return nil -} - -func (r *MemoryRegistry) Get(id string) (*string, error) { - addr, ok := r.registry[id] - if !ok { - return nil, fmt.Errorf("id not found") + err := loadMessages(ret, id) + if err != nil { + log.Printf("Error loading messages for grain %s: %v\n", id, err) } - return &addr, nil + return ret } -func (c *CartGrain) HandleMessage(message *Message, reply *CartGrain) error { - fmt.Println("CartGrain received message: ", message) - c.Skus = append(c.Skus, message.Content) - *reply = *c - return nil +func init() { + os.Mkdir("data", 0755) + gob.Register(CartItem{}) + gob.Register(Message{}) } -type ServerPool interface { - GetOrSpawn(id string, ttl time.Time) (*string, error) +type App struct { + pool *GrainLocalPool + storage *DiskStorage } -type WebServer struct { - ServerPool ServerPool +func (a *App) HandleGet(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + grain, err := a.pool.GetOrSpawn(id, spawn) + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(grain) } -type MemoryServerPool struct { - port int - local *GrainServer - pool map[string]*string +func (a *App) HandleAddSku(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + sku := r.PathValue("sku") + grain, err := a.pool.GetOrSpawn(id, spawn) + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + message := &Message{ + Type: "add", + Content: sku, + } + var reply CartGrain + err = grain.HandleMessage(message, false, &reply) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(reply) } -func (p *MemoryServerPool) GetOrSpawn(id string, ttl time.Duration) (*string, error) { - addr, ok := p.pool[id] - if !ok { - prt := p.port - p.port++ - s := NewServer("localhost") - _, e := s.Start(prt, &CartGrain{}) - if e != nil { - return nil, e +func (a *App) Save() error { + for id, grain := range a.pool.GetGrains() { + err := a.storage.Store(id, grain) + if err != nil { + log.Printf("Error saving grain %s: %v\n", id, err) } - //go http.Serve(l, nil) - a := fmt.Sprintf("localhost:%d", prt) - p.pool[id] = &a - addr = &a } - return addr, nil + return a.storage.saveState() +} + +func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) { + err := a.Save() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + } else { + w.WriteHeader(http.StatusCreated) + } } func main() { // Create a new instance of the server - pool := &MemoryServerPool{ - port: 1337, - pool: make(map[string]*string), - local: NewServer("localhost"), + storage, err := NewDiskStorage("data/state.json") + if err != nil { + log.Printf("Error loading state: %v\n", err) + } + app := &App{ + pool: NewGrainLocalPool(1000, 5*time.Minute), + storage: storage, } mux := http.NewServeMux() - mux.HandleFunc("GET /{id}", func(w http.ResponseWriter, r *http.Request) { - id := r.PathValue("id") - addr, err := pool.GetOrSpawn(id, time.Hour*1) - if err != nil { - w.WriteHeader(http.StatusNotFound) - return - } - w.WriteHeader(http.StatusOK) - w.Write([]byte(*addr)) - }) - mux.HandleFunc("GET /add/{id}", func(w http.ResponseWriter, r *http.Request) { - id := r.PathValue("id") - addr, err := pool.GetOrSpawn(id, time.Hour*1) - if err != nil { - w.WriteHeader(http.StatusNotFound) - return - } - w.WriteHeader(http.StatusOK) - var cart CartGrain - client, err := rpc.DialHTTP("tcp", *addr) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - err = client.Call("CartGrain.HandleMessage", &Message{Type: "add", Content: "123"}, &cart) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - w.Write([]byte(fmt.Sprintf("Cart: %v", cart.Skus))) - }) + mux.HandleFunc("GET /{id}", app.HandleGet) + mux.HandleFunc("GET /{id}/add/{sku}", app.HandleAddSku) + mux.HandleFunc("GET /save", app.HandleSave) http.ListenAndServe(":8080", mux) } diff --git a/server-registry.go b/server-registry.go new file mode 100644 index 0000000..4719536 --- /dev/null +++ b/server-registry.go @@ -0,0 +1,25 @@ +package main + +import "fmt" + +type Registry interface { + Register(address string, id string) error + Get(id string) (*string, error) +} + +type MemoryRegistry struct { + registry map[string]string +} + +func (r *MemoryRegistry) Register(address string, id string) error { + r.registry[id] = address + return nil +} + +func (r *MemoryRegistry) Get(id string) (*string, error) { + addr, ok := r.registry[id] + if !ok { + return nil, fmt.Errorf("id not found") + } + return &addr, nil +}