more stuff

This commit is contained in:
matst80
2024-11-09 01:02:17 +01:00
parent 356f5effba
commit c3d30ea0b3
12 changed files with 153 additions and 102 deletions

View File

@@ -3,7 +3,6 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"time" "time"
messages "git.tornberg.me/go-cart-actor/proto" messages "git.tornberg.me/go-cart-actor/proto"
@@ -63,7 +62,6 @@ func getItemData(sku string) (*messages.AddItem, error) {
price := 0 price := 0
priceField, ok := item.Fields[4] priceField, ok := item.Fields[4]
if ok { if ok {
priceFloat, ok := priceField.(float64) priceFloat, ok := priceField.(float64)
if !ok { if !ok {
price, ok = priceField.(int) price, ok = priceField.(int)
@@ -110,7 +108,6 @@ func (c *CartGrain) GetStorageMessage(since int64) []StorableMessage {
} }
func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) { func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
log.Printf("Handling message %d", message.Type)
if message.TimeStamp == nil { if message.TimeStamp == nil {
now := time.Now().Unix() now := time.Now().Unix()
message.TimeStamp = &now message.TimeStamp = &now
@@ -120,14 +117,14 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, erro
case AddRequestType: case AddRequestType:
msg, ok := message.Content.(*messages.AddRequest) msg, ok := message.Content.(*messages.AddRequest)
if !ok { if !ok {
err = fmt.Errorf("invalid content type") err = fmt.Errorf("expected AddRequest")
} else { } else {
return c.AddItem(msg.Sku) return c.AddItem(msg.Sku)
} }
case AddItemType: case AddItemType:
msg, ok := message.Content.(*messages.AddItem) msg, ok := message.Content.(*messages.AddItem)
if !ok { if !ok {
err = fmt.Errorf("invalid content type") err = fmt.Errorf("expected AddItem")
} else { } else {
c.Items = append(c.Items, CartItem{ c.Items = append(c.Items, CartItem{
Sku: msg.Sku, Sku: msg.Sku,
@@ -138,7 +135,7 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, erro
c.TotalPrice += msg.Price c.TotalPrice += msg.Price
} }
default: default:
err = fmt.Errorf("unknown message type") err = fmt.Errorf("unknown message type %d", message.Type)
} }
if err != nil { if err != nil {
return nil, err return nil, err

BIN
data/5.prot Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -65,7 +65,7 @@ func loadMessages(grain Grain, id CartId) error {
for err == nil { for err == nil {
var msg Message var msg Message
err = MessageFromReader(file, &msg) err = ReadMessage(file, &msg)
if err == nil { if err == nil {
grain.HandleMessage(&msg, true) grain.HandleMessage(&msg, true)
} }

View File

@@ -41,6 +41,18 @@ func NewGrainLocalPool(size int, ttl time.Duration, spawn func(id CartId) (*Cart
return ret return ret
} }
func (p *GrainLocalPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) {
for id := range availableWithLastChangeUnix {
if _, ok := p.grains[id]; !ok {
p.grains[id] = nil
p.expiry = append(p.expiry, Ttl{
Expires: time.Now().Add(p.Ttl),
Grain: nil,
})
}
}
}
func (p *GrainLocalPool) Purge() { func (p *GrainLocalPool) Purge() {
lastChangeTime := time.Now().Add(-p.Ttl) lastChangeTime := time.Now().Add(-p.Ttl)
keepChanged := lastChangeTime.Unix() keepChanged := lastChangeTime.Unix()
@@ -69,7 +81,7 @@ func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain {
func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) { func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) {
var err error var err error
grain, ok := p.grains[id] grain, ok := p.grains[id]
if !ok { if grain == nil || !ok {
if len(p.grains) >= p.PoolSize { if len(p.grains) >= p.PoolSize {
if p.expiry[0].Expires.Before(time.Now()) { if p.expiry[0].Expires.Before(time.Now()) {
delete(p.grains, p.expiry[0].Grain.GetId()) delete(p.grains, p.expiry[0].Grain.GetId())

117
main.go
View File

@@ -1,7 +1,6 @@
package main package main
import ( import (
"encoding/json"
"log" "log"
"net/http" "net/http"
"os" "os"
@@ -30,38 +29,11 @@ type App struct {
storage *DiskStorage storage *DiskStorage
} }
func (a *App) HandleGet(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
grain, err := a.pool.Get(ToCartId(id))
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(grain)
}
func (a *App) HandleAddSku(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
sku := r.PathValue("sku")
grain, err := a.pool.Process(ToCartId(id), Message{
Type: AddRequestType,
Content: &messages.AddRequest{Sku: sku},
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(grain)
}
func (a *App) Save() error { func (a *App) Save() error {
for id, grain := range a.pool.GetGrains() { for id, grain := range a.pool.GetGrains() {
if grain == nil {
continue
}
err := a.storage.Store(id, grain) err := a.storage.Store(id, grain)
if err != nil { if err != nil {
log.Printf("Error saving grain %s: %v\n", id, err) log.Printf("Error saving grain %s: %v\n", id, err)
@@ -80,6 +52,53 @@ func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) {
} }
} }
type PoolServer struct {
pool GrainPool
}
func NewPoolServer(pool GrainPool) *PoolServer {
return &PoolServer{
pool: pool,
}
}
func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
data, err := s.pool.Get(ToCartId(id))
if err != nil {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(data)
}
func (s *PoolServer) HandleAddSku(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
sku := r.PathValue("sku")
data, err := s.pool.Process(ToCartId(id), Message{
Type: AddRequestType,
Content: &messages.AddRequest{Sku: sku},
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(data)
}
func (s *PoolServer) Serve() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("GET /{id}", s.HandleGet)
mux.HandleFunc("GET /{id}/add/{sku}", s.HandleAddSku)
return mux
}
func main() { func main() {
// Create a new instance of the server // Create a new instance of the server
storage, err := NewDiskStorage("data/state.gob") storage, err := NewDiskStorage("data/state.gob")
@@ -98,39 +117,11 @@ func main() {
go rpcHandler.Serve() go rpcHandler.Serve()
remotePool := NewRemoteGrainPool("localhost:1337") remotePool := NewRemoteGrainPool("localhost:1337")
remoteServer := NewPoolServer(remotePool)
localServer := NewPoolServer(app.pool)
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("GET /api/{id}", app.HandleGet) mux.Handle("/remote/", http.StripPrefix("/remote", remoteServer.Serve()))
mux.HandleFunc("GET /api/{id}/add/{sku}", app.HandleAddSku) mux.Handle("/local/", http.StripPrefix("/local", localServer.Serve()))
mux.HandleFunc("GET /remote/{id}/add", func(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
ts := time.Now().Unix()
data, err := remotePool.Process(ToCartId(id), Message{
Type: AddRequestType,
TimeStamp: &ts,
Content: &messages.AddRequest{Sku: "49565"},
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(data)
})
mux.HandleFunc("GET /remote/{id}", func(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
data, err := remotePool.Get(ToCartId(id))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(data)
})
mux.HandleFunc("GET /save", app.HandleSave) mux.HandleFunc("GET /save", app.HandleSave)
http.ListenAndServe(":8080", mux) http.ListenAndServe(":8080", mux)

View File

@@ -78,7 +78,7 @@ func (m Message) Write(w io.Writer) error {
return err return err
} }
func MessageFromReader(reader io.Reader, m *Message) error { func ReadMessage(reader io.Reader, m *Message) error {
header := StorableMessageHeader{} header := StorableMessageHeader{}
err := binary.Read(reader, binary.LittleEndian, &header) err := binary.Read(reader, binary.LittleEndian, &header)
if err != nil { if err != nil {

View File

@@ -54,6 +54,16 @@ func SendPacket(conn io.Writer, messageType uint16, datafn func(w io.Writer) err
return err return err
} }
func SendRawResponse(conn io.Writer, data []byte) error {
binary.Write(conn, binary.LittleEndian, ResponsePacket{
Version: 1,
MessageType: ResponseBody,
DataLength: uint16(len(data)),
})
_, err := conn.Write(data)
return err
}
func SendProxyResponse(conn io.Writer, data any) error { func SendProxyResponse(conn io.Writer, data any) error {
return SendPacket(conn, ResponseBody, func(w io.Writer) error { return SendPacket(conn, ResponseBody, func(w io.Writer) error {
data, err := json.Marshal(data) data, err := json.Marshal(data)
@@ -65,17 +75,6 @@ func SendProxyResponse(conn io.Writer, data any) error {
}) })
} }
// func ReceiveCartPacket(conn io.Reader) (CartPacket, []byte, error) {
// var packet CartPacket
// err := binary.Read(conn, binary.LittleEndian, &packet)
// if err != nil {
// return packet, nil, err
// }
// data := make([]byte, packet.DataLength)
// _, err = conn.Read(data)
// return packet, data, err
// }
func ReceivePacket(conn io.Reader) (uint16, []byte, error) { func ReceivePacket(conn io.Reader) (uint16, []byte, error) {
var packet ResponsePacket var packet ResponsePacket
err := binary.Read(conn, binary.LittleEndian, &packet) err := binary.Read(conn, binary.LittleEndian, &packet)

View File

@@ -1,13 +1,14 @@
package main package main
import ( import (
"fmt"
"io" "io"
"net" "net"
"strings" "strings"
) )
type RemoteGrainPool struct { type RemoteGrainPool struct {
Hosts []string Host string
grains map[CartId]RemoteGrain grains map[CartId]RemoteGrain
} }
@@ -46,7 +47,6 @@ func (g *RemoteGrain) Connect() error {
} }
func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) { func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
err := SendCartPacket(g.client, g.Id, RemoteHandleMessage, message.Write) err := SendCartPacket(g.client, g.Id, RemoteHandleMessage, message.Write)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -60,7 +60,6 @@ func (g *RemoteGrain) GetId() CartId {
} }
func (g *RemoteGrain) GetCurrentState() ([]byte, error) { func (g *RemoteGrain) GetCurrentState() ([]byte, error) {
err := SendCartPacket(g.client, g.Id, RemoteGetState, func(w io.Writer) error { err := SendCartPacket(g.client, g.Id, RemoteGetState, func(w io.Writer) error {
return nil return nil
}) })
@@ -71,9 +70,9 @@ func (g *RemoteGrain) GetCurrentState() ([]byte, error) {
return data, err return data, err
} }
func NewRemoteGrainPool(addr ...string) *RemoteGrainPool { func NewRemoteGrainPool(addr string) *RemoteGrainPool {
return &RemoteGrainPool{ return &RemoteGrainPool{
Hosts: addr, Host: addr,
grains: make(map[CartId]RemoteGrain), grains: make(map[CartId]RemoteGrain),
} }
} }
@@ -83,17 +82,26 @@ func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain {
if !ok { if !ok {
return nil return nil
} }
grain.Connect()
return &grain return &grain
} }
func (p *RemoteGrainPool) findOrCreateGrain(id CartId) *RemoteGrain {
grain := p.findRemoteGrain(id)
if grain == nil {
grain = NewRemoteGrain(id, p.Host)
p.grains[id] = *grain
grain.Connect()
}
return grain
}
func (p *RemoteGrainPool) Process(id CartId, messages ...Message) ([]byte, error) { func (p *RemoteGrainPool) Process(id CartId, messages ...Message) ([]byte, error) {
var result []byte var result []byte
var err error var err error
grain := p.findRemoteGrain(id) grain := p.findOrCreateGrain(id)
if grain == nil { if grain == nil {
grain = NewRemoteGrain(id, p.Hosts[0]) return nil, fmt.Errorf("grain not found")
grain.Connect()
p.grains[id] = *grain
} }
for _, message := range messages { for _, message := range messages {
result, err = grain.HandleMessage(&message, false) result, err = grain.HandleMessage(&message, false)
@@ -102,9 +110,9 @@ func (p *RemoteGrainPool) Process(id CartId, messages ...Message) ([]byte, error
} }
func (p *RemoteGrainPool) Get(id CartId) ([]byte, error) { func (p *RemoteGrainPool) Get(id CartId) ([]byte, error) {
grain := p.findRemoteGrain(id) grain := p.findOrCreateGrain(id)
if grain == nil { if grain == nil {
return nil, nil return nil, fmt.Errorf("grain not found")
} }
return grain.GetCurrentState() return grain.GetCurrentState()
} }

View File

@@ -64,26 +64,24 @@ func (h *GrainHandler) handleClient(conn net.Conn) {
switch packet.MessageType { switch packet.MessageType {
case RemoteHandleMessage: case RemoteHandleMessage:
fmt.Printf("Handling message\n")
var msg Message var msg Message
err = MessageFromReader(conn, &msg) err = ReadMessage(conn, &msg)
if err != nil { if err != nil {
fmt.Println("Error reading message:", err) fmt.Println("Error reading message:", err)
} }
fmt.Printf("Message: %s, %v\n", packet.Id.String(), msg)
grain, err := h.pool.Process(packet.Id, msg) data, err := h.pool.Process(packet.Id, msg)
if err != nil { if err != nil {
fmt.Println("Error handling message:", err) fmt.Println("Error handling message:", err)
} }
SendProxyResponse(conn, grain) SendRawResponse(conn, data)
case RemoteGetState: case RemoteGetState:
fmt.Printf("Package: %s %v\n", packet.Id.String(), packet) data, err := h.pool.Get(packet.Id)
grain, err := h.pool.Get(packet.Id)
if err != nil { if err != nil {
fmt.Println("Error getting grain:", err) fmt.Println("Error getting grain:", err)
} }
SendProxyResponse(conn, grain) SendRawResponse(conn, data)
} }
} }

46
synced-pool.go Normal file
View File

@@ -0,0 +1,46 @@
package main
type SyncedPool struct {
local *GrainLocalPool
remotes []RemoteGrainPool
remoteIndex map[CartId]*RemoteGrainPool
}
func NewSyncedPool(local *GrainLocalPool) *SyncedPool {
return &SyncedPool{
local: local,
remotes: make([]RemoteGrainPool, 0),
remoteIndex: make(map[CartId]*RemoteGrainPool),
}
}
func (p *SyncedPool) AddRemote(remote RemoteGrainPool) {
p.remotes = append(p.remotes, remote)
// get all available grains from remote, and start syncing
}
func (p *SyncedPool) Process(id CartId, messages ...Message) ([]byte, error) {
// check if local grain exists
_, ok := p.local.grains[id]
if !ok {
// check if remote grain exists
remoteGrain, ok := p.remoteIndex[id]
if ok {
return remoteGrain.Process(id, messages...)
}
}
return p.local.Process(id, messages...)
}
func (p *SyncedPool) Get(id CartId) ([]byte, error) {
// check if local grain exists
_, ok := p.local.grains[id]
if !ok {
// check if remote grain exists
remoteGrain, ok := p.remoteIndex[id]
if ok {
return remoteGrain.Get(id)
}
}
return p.local.Get(id)
}