This commit is contained in:
matst80
2024-11-07 22:57:18 +01:00
parent 0f23ae0e9a
commit ae421674d2
9 changed files with 542 additions and 112 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
__debug*

141
cart-grain.go Normal file
View File

@@ -0,0 +1,141 @@
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/matst80/slask-finder/pkg/index"
)
type Message struct {
TimeStamp *int64
Type string
Content interface{}
}
type CartItem struct {
Sku string `json:"sku"`
Name string `json:"name"`
Price int `json:"price"`
Image string `json:"image"`
}
type CartGrain struct {
storageMessages []Message
Id string `json:"id"`
Items []CartItem `json:"items"`
TotalPrice int `json:"totalPrice"`
}
type Grain interface {
GetId() string
GetLastChange() int64
HandleMessage(message *Message, isReplay bool, reply *CartGrain) error
GetStorageMessage(since int64) []Message
}
func (c *CartGrain) GetId() string {
return c.Id
}
func (c *CartGrain) GetLastChange() int64 {
if len(c.storageMessages) == 0 {
return 0
}
return *c.storageMessages[len(c.storageMessages)-1].TimeStamp
}
func getItemData(sku string) (*CartItem, error) {
res, err := http.Get("https://slask-finder.tornberg.me/api/get/" + sku)
if err != nil {
return nil, err
}
defer res.Body.Close()
var item index.DataItem
err = json.NewDecoder(res.Body).Decode(&item)
if err != nil {
return nil, err
}
price := item.GetPrice()
if price == 0 {
priceField, ok := item.GetFields()[4]
if ok {
pricef, ok := priceField.(float64)
if !ok {
price, ok = priceField.(int)
if !ok {
return nil, fmt.Errorf("invalid price type")
}
} else {
price = int(pricef)
}
}
}
return &CartItem{
Sku: item.Sku,
Name: item.Title,
Price: price,
Image: item.Img,
}, nil
}
func (c *CartGrain) AddItem(sku string, reply *CartGrain) error {
cartItem, err := getItemData(sku)
if err != nil {
return err
}
return c.HandleMessage(&Message{
Type: "append",
Content: *cartItem,
}, false, reply)
}
func (c *CartGrain) GetStorageMessage(since int64) []Message {
if since == 0 {
return c.storageMessages
}
ret := make([]Message, 0)
for _, message := range c.storageMessages {
if *message.TimeStamp > since {
ret = append(ret, message)
}
}
return ret
}
func (c *CartGrain) HandleMessage(message *Message, isReplay bool, reply *CartGrain) error {
log.Printf("Handling message %s", message.Type)
if message.TimeStamp == nil {
now := time.Now().Unix()
message.TimeStamp = &now
}
var err error
switch message.Type {
case "add":
sku, ok := message.Content.(string)
if !ok {
err = fmt.Errorf("invalid content type")
} else {
return c.AddItem(sku, reply)
}
case "append":
item, ok := message.Content.(CartItem)
if !ok {
err = fmt.Errorf("invalid content type")
} else {
c.Items = append(c.Items, item)
c.TotalPrice += item.Price
}
default:
err = fmt.Errorf("unknown message type")
}
if !isReplay {
c.storageMessages = append(c.storageMessages, *message)
}
*reply = *c
return err
}

124
disk-storage.go Normal file
View File

@@ -0,0 +1,124 @@
package main
import (
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"time"
)
type DiskStorage struct {
stateFile string
lastSave int64
LastSaves map[string]int64
}
func NewDiskStorage(stateFile string) (*DiskStorage, error) {
ret := &DiskStorage{
stateFile: stateFile,
LastSaves: make(map[string]int64),
}
err := ret.loadState()
return ret, err
}
func saveMessages(messages []Message, id string) error {
log.Printf("%d messages to save for %s", len(messages), id)
if len(messages) == 0 {
return nil
}
var file *os.File
var err error
path := getCartPath(id)
file, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
// z := gzip.NewWriter(file)
// defer z.Close()
enc := gob.NewEncoder(file)
for _, m := range messages {
err = enc.Encode(m)
}
return err
}
func getCartPath(id string) string {
return fmt.Sprintf("data/%s.gob", id)
}
func loadMessages(grain Grain, id string) error {
var err error
path := getCartPath(id)
if _, err = os.Stat(path); errors.Is(err, os.ErrNotExist) {
return err
}
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
// z, err := gzip.NewReader(file)
// if err != nil {
// return err
// }
// defer z.Close()
var reply CartGrain
decoder := gob.NewDecoder(file)
for err == nil {
var message Message
err = decoder.Decode(&message)
if err != nil {
grain.HandleMessage(&message, true, &reply)
}
}
if err.Error() == "EOF" {
return nil
}
return err
}
func (s *DiskStorage) saveState() error {
tmpFile := s.stateFile + "_tmp"
file, err := os.Create(tmpFile)
if err != nil {
return err
}
defer file.Close()
err = json.NewEncoder(file).Encode(s.LastSaves)
if err != nil {
return err
}
os.Remove(s.stateFile + ".bak")
os.Rename(s.stateFile, s.stateFile+".bak")
return os.Rename(tmpFile, s.stateFile)
}
func (s *DiskStorage) loadState() error {
file, err := os.Open("data/state.json")
if err != nil {
return err
}
defer file.Close()
return json.NewDecoder(file).Decode(&s.LastSaves)
}
func (s *DiskStorage) Store(id string, grain Grain) error {
lastSavedMessage, ok := s.LastSaves[id]
if ok && lastSavedMessage > grain.GetLastChange() {
return nil
}
err := saveMessages(grain.GetStorageMessage(lastSavedMessage), id)
if err != nil {
return err
}
ts := time.Now().Unix()
s.LastSaves[id] = ts
s.lastSave = ts
return nil
}

25
go.mod
View File

@@ -1,3 +1,26 @@
module git.tornberg.me/go-cart-actor
go 1.22.4
go 1.23.0
toolchain go1.23.2
require (
cloud.google.com/go/compute/metadata v0.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/gorilla/schema v1.4.1 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/matst80/slask-finder v0.0.0-20241104074525-3365cb1531ac // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/client_golang v1.20.4 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
github.com/redis/go-redis/v9 v9.5.3 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sys v0.22.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)

36
go.sum Normal file
View File

@@ -0,0 +1,36 @@
cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc=
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/gorilla/schema v1.4.1 h1:jUg5hUjCSDZpNGLuXQOgIWGdlgrIdYvgQ0wZtdK1M3E=
github.com/gorilla/schema v1.4.1/go.mod h1:Dg5SSm5PV60mhF2NFaTV1xuYYj8tV8NOPRo4FggUMnM=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/matst80/slask-finder v0.0.0-20241104074525-3365cb1531ac h1:zakA1ck6dY4mMUGoZWGCjV3YP/8TiPtdJYvvieC6v8U=
github.com/matst80/slask-finder v0.0.0-20241104074525-3365cb1531ac/go.mod h1:GCLeU45b+BNgLly5XbeB0A+47ctctp2SVHZ3NlfZqzs=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/prometheus/client_golang v1.20.4 h1:Tgh3Yr67PaOv/uTqloMsCEdeuFTatm5zIq5+qNN23vI=
github.com/prometheus/client_golang v1.20.4/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU=
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs=
golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=

89
grain-pool.go Normal file
View File

@@ -0,0 +1,89 @@
package main
import (
"fmt"
"log"
"time"
)
type GrainPool interface {
GetOrSpawn(id string) (Grain, error)
Get(id string) (Grain, error)
}
type Ttl struct {
Expires time.Time
Item Grain
}
type GrainLocalPool struct {
grains map[string]Grain
expiry []Ttl
Ttl time.Duration
PoolSize int
}
func NewGrainLocalPool(size int, ttl time.Duration) *GrainLocalPool {
ret := &GrainLocalPool{
grains: make(map[string]Grain),
expiry: make([]Ttl, 0),
Ttl: ttl,
PoolSize: size,
}
cartPurge := time.NewTicker(time.Minute)
go func() {
<-cartPurge.C
ret.Purge()
}()
return ret
}
func (p *GrainLocalPool) Purge() {
lastChangeTime := time.Now().Add(-p.Ttl)
keepChanged := lastChangeTime.Unix()
for i := 0; i < len(p.expiry); i++ {
item := p.expiry[i]
if item.Expires.Before(time.Now()) {
if item.Item.GetLastChange() > keepChanged {
log.Printf("Changed item %s expired, keeping", item.Item.GetId())
p.expiry = append(p.expiry[:i], p.expiry[i+1:]...)
p.expiry = append(p.expiry, item)
} else {
log.Printf("Item %s expired", item.Item.GetId())
delete(p.grains, item.Item.GetId())
p.expiry = append(p.expiry[:i], p.expiry[i+1:]...)
}
} else {
break
}
}
}
func (p *GrainLocalPool) GetGrains() map[string]Grain {
return p.grains
}
func (p *GrainLocalPool) GetOrSpawn(id string, generator func(id string) Grain) (Grain, error) {
grain, ok := p.grains[id]
if !ok {
if len(p.grains) >= p.PoolSize {
if p.expiry[0].Expires.Before(time.Now()) {
delete(p.grains, p.expiry[0].Item.GetId())
p.expiry = p.expiry[1:]
} else {
return nil, fmt.Errorf("pool is full")
}
}
grain = generator(id)
p.grains[id] = grain
}
return grain, nil
}
func (p *GrainLocalPool) Get(id string) (Grain, error) {
grain, ok := p.grains[id]
if !ok {
return nil, fmt.Errorf("grain not found")
}
return grain, nil
}

23
grain-server.go Normal file
View File

@@ -0,0 +1,23 @@
package main
import (
"fmt"
"net"
"net/rpc"
)
type GrainServer struct {
Host string
}
func NewServer(hostname string) *GrainServer {
return &GrainServer{
Host: hostname,
}
}
func (s *GrainServer) Start(port int, instance Grain) (net.Listener, error) {
rpc.Register(instance)
rpc.HandleHTTP()
return net.Listen("tcp", fmt.Sprintf(":%d", port))
}

194
main.go
View File

@@ -1,143 +1,111 @@
package main
import (
"fmt"
"net"
"encoding/gob"
"encoding/json"
"log"
"net/http"
"net/rpc"
"os"
"time"
)
type GrainServer struct {
Host string
func spawn(id string) Grain {
ret := &CartGrain{
Id: id,
Items: []CartItem{},
storageMessages: []Message{},
TotalPrice: 0,
}
err := loadMessages(ret, id)
if err != nil {
log.Printf("Error loading messages for grain %s: %v\n", id, err)
}
return ret
}
type Grain interface {
HandleMessage(message *Message, reply *CartGrain) error
func init() {
os.Mkdir("data", 0755)
gob.Register(CartItem{})
gob.Register(Message{})
}
func NewServer(hostname string) *GrainServer {
return &GrainServer{
Host: hostname,
}
type App struct {
pool *GrainLocalPool
storage *DiskStorage
}
func (s *GrainServer) Start(port int, instance Grain) (net.Listener, error) {
rpc.Register(instance)
rpc.HandleHTTP()
return net.Listen("tcp", fmt.Sprintf(":%d", port))
func (a *App) HandleGet(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
grain, err := a.pool.GetOrSpawn(id, spawn)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(grain)
}
type CartGrain struct {
Skus []string
func (a *App) HandleAddSku(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
sku := r.PathValue("sku")
grain, err := a.pool.GetOrSpawn(id, spawn)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
message := &Message{
Type: "add",
Content: sku,
}
var reply CartGrain
err = grain.HandleMessage(message, false, &reply)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(reply)
}
type Message struct {
Type string
Content string
func (a *App) Save() error {
for id, grain := range a.pool.GetGrains() {
err := a.storage.Store(id, grain)
if err != nil {
log.Printf("Error saving grain %s: %v\n", id, err)
}
}
return a.storage.saveState()
}
type Registry interface {
Register(address string, id string) error
Get(id string) (*string, error)
func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) {
err := a.Save()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
} else {
w.WriteHeader(http.StatusCreated)
}
type MemoryRegistry struct {
registry map[string]string
}
func (r *MemoryRegistry) Register(address string, id string) error {
r.registry[id] = address
return nil
}
func (r *MemoryRegistry) Get(id string) (*string, error) {
addr, ok := r.registry[id]
if !ok {
return nil, fmt.Errorf("id not found")
}
return &addr, nil
}
func (c *CartGrain) HandleMessage(message *Message, reply *CartGrain) error {
fmt.Println("CartGrain received message: ", message)
c.Skus = append(c.Skus, message.Content)
*reply = *c
return nil
}
type ServerPool interface {
GetOrSpawn(id string, ttl time.Time) (*string, error)
}
type WebServer struct {
ServerPool ServerPool
}
type MemoryServerPool struct {
port int
local *GrainServer
pool map[string]*string
}
func (p *MemoryServerPool) GetOrSpawn(id string, ttl time.Duration) (*string, error) {
addr, ok := p.pool[id]
if !ok {
prt := p.port
p.port++
s := NewServer("localhost")
_, e := s.Start(prt, &CartGrain{})
if e != nil {
return nil, e
}
//go http.Serve(l, nil)
a := fmt.Sprintf("localhost:%d", prt)
p.pool[id] = &a
addr = &a
}
return addr, nil
}
func main() {
// Create a new instance of the server
pool := &MemoryServerPool{
port: 1337,
pool: make(map[string]*string),
local: NewServer("localhost"),
storage, err := NewDiskStorage("data/state.json")
if err != nil {
log.Printf("Error loading state: %v\n", err)
}
app := &App{
pool: NewGrainLocalPool(1000, 5*time.Minute),
storage: storage,
}
mux := http.NewServeMux()
mux.HandleFunc("GET /{id}", func(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
addr, err := pool.GetOrSpawn(id, time.Hour*1)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(*addr))
})
mux.HandleFunc("GET /add/{id}", func(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
addr, err := pool.GetOrSpawn(id, time.Hour*1)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.WriteHeader(http.StatusOK)
var cart CartGrain
client, err := rpc.DialHTTP("tcp", *addr)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
err = client.Call("CartGrain.HandleMessage", &Message{Type: "add", Content: "123"}, &cart)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write([]byte(fmt.Sprintf("Cart: %v", cart.Skus)))
})
mux.HandleFunc("GET /{id}", app.HandleGet)
mux.HandleFunc("GET /{id}/add/{sku}", app.HandleAddSku)
mux.HandleFunc("GET /save", app.HandleSave)
http.ListenAndServe(":8080", mux)
}

25
server-registry.go Normal file
View File

@@ -0,0 +1,25 @@
package main
import "fmt"
type Registry interface {
Register(address string, id string) error
Get(id string) (*string, error)
}
type MemoryRegistry struct {
registry map[string]string
}
func (r *MemoryRegistry) Register(address string, id string) error {
r.registry[id] = address
return nil
}
func (r *MemoryRegistry) Get(id string) (*string, error) {
addr, ok := r.registry[id]
if !ok {
return nil, fmt.Errorf("id not found")
}
return &addr, nil
}