refactor/serializing #1

Merged
mats merged 4 commits from refactor/serializing into main 2024-11-08 23:02:12 +01:00
13 changed files with 437 additions and 173 deletions
Showing only changes of commit 65a969443a - Show all commits

View File

@@ -7,10 +7,12 @@ import (
"net/http" "net/http"
"time" "time"
"git.tornberg.me/go-cart-actor/git.tornberg.me/go-cart-actor/messages" messages "git.tornberg.me/go-cart-actor/proto"
"github.com/matst80/slask-finder/pkg/index" "github.com/matst80/slask-finder/pkg/index"
) )
type CartId [16]byte
type CartItem struct { type CartItem struct {
Sku string `json:"sku"` Sku string `json:"sku"`
Name string `json:"name"` Name string `json:"name"`
@@ -20,19 +22,17 @@ type CartItem struct {
type CartGrain struct { type CartGrain struct {
storageMessages []Message storageMessages []Message
Id string `json:"id"` Id CartId `json:"id"`
Items []CartItem `json:"items"` Items []CartItem `json:"items"`
TotalPrice int64 `json:"totalPrice"` TotalPrice int64 `json:"totalPrice"`
} }
type Grain interface { type Grain interface {
GetId() string GetId() CartId
GetLastChange() int64 HandleMessage(message *Message, isReplay bool) ([]byte, error)
HandleMessage(message *Message, isReplay bool, reply *CartGrain) error
GetStorageMessage(since int64) []StorableMessage
} }
func (c *CartGrain) GetId() string { func (c *CartGrain) GetId() CartId {
return c.Id return c.Id
} }
@@ -79,15 +79,15 @@ func getItemData(sku string) (*messages.AddItem, error) {
}, nil }, nil
} }
func (c *CartGrain) AddItem(sku string, reply *CartGrain) error { func (c *CartGrain) AddItem(sku string) ([]byte, error) {
cartItem, err := getItemData(sku) cartItem, err := getItemData(sku)
if err != nil { if err != nil {
return err return nil, err
} }
return c.HandleMessage(&Message{ return c.HandleMessage(&Message{
Type: 2, Type: 2,
Content: cartItem, Content: cartItem,
}, false, reply) }, false)
} }
func (c *CartGrain) GetStorageMessage(since int64) []StorableMessage { func (c *CartGrain) GetStorageMessage(since int64) []StorableMessage {
@@ -101,7 +101,7 @@ func (c *CartGrain) GetStorageMessage(since int64) []StorableMessage {
return ret return ret
} }
func (c *CartGrain) HandleMessage(message *Message, isReplay bool, reply *CartGrain) error { func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
log.Printf("Handling message %d", message.Type) log.Printf("Handling message %d", message.Type)
if message.TimeStamp == nil { if message.TimeStamp == nil {
now := time.Now().Unix() now := time.Now().Unix()
@@ -114,7 +114,7 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool, reply *CartGr
if !ok { if !ok {
err = fmt.Errorf("invalid content type") err = fmt.Errorf("invalid content type")
} else { } else {
return c.AddItem(msg.Sku, reply) return c.AddItem(msg.Sku)
} }
case AddItemType: case AddItemType:
msg, ok := message.Content.(*messages.AddItem) msg, ok := message.Content.(*messages.AddItem)
@@ -132,9 +132,11 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool, reply *CartGr
default: default:
err = fmt.Errorf("unknown message type") err = fmt.Errorf("unknown message type")
} }
if err != nil {
return nil, err
}
if !isReplay { if !isReplay {
c.storageMessages = append(c.storageMessages, *message) c.storageMessages = append(c.storageMessages, *message)
} }
*reply = *c return json.Marshal(c)
return err
} }

Binary file not shown.

View File

@@ -1 +0,0 @@
{"1":1731051371}

View File

@@ -1 +0,0 @@
{"1":1731050604}

View File

@@ -12,26 +12,26 @@ import (
type DiskStorage struct { type DiskStorage struct {
stateFile string stateFile string
lastSave int64 lastSave int64
LastSaves map[string]int64 LastSaves map[CartId]int64
} }
func NewDiskStorage(stateFile string) (*DiskStorage, error) { func NewDiskStorage(stateFile string) (*DiskStorage, error) {
ret := &DiskStorage{ ret := &DiskStorage{
stateFile: stateFile, stateFile: stateFile,
LastSaves: make(map[string]int64), LastSaves: make(map[CartId]int64),
} }
err := ret.loadState() err := ret.loadState()
return ret, err return ret, err
} }
func saveMessages(messages []StorableMessage, id string) error { func saveMessages(messages []StorableMessage, id CartId) error {
log.Printf("%d messages to save for %s", len(messages), id) log.Printf("%d messages to save for %s", len(messages), id)
if len(messages) == 0 { if len(messages) == 0 {
return nil return nil
} }
var file *os.File var file *os.File
var err error var err error
path := getCartPath(id) path := getCartPath(id.String())
file, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) file, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
return err return err
@@ -39,11 +39,10 @@ func saveMessages(messages []StorableMessage, id string) error {
defer file.Close() defer file.Close()
for _, m := range messages { for _, m := range messages {
b, err := m.GetBytes() err := m.Write(file)
if err != nil { if err != nil {
return err return err
} }
file.Write(b)
} }
return err return err
} }
@@ -52,9 +51,9 @@ func getCartPath(id string) string {
return fmt.Sprintf("data/%s.prot", id) return fmt.Sprintf("data/%s.prot", id)
} }
func loadMessages(grain Grain, id string) error { func loadMessages(grain Grain, id CartId) error {
var err error var err error
path := getCartPath(id) path := getCartPath(id.String())
if _, err = os.Stat(path); errors.Is(err, os.ErrNotExist) { if _, err = os.Stat(path); errors.Is(err, os.ErrNotExist) {
return err return err
} }
@@ -64,13 +63,11 @@ func loadMessages(grain Grain, id string) error {
} }
defer file.Close() defer file.Close()
var reply CartGrain
for err == nil { for err == nil {
msg := &Message{} var msg Message
err = msg.FromReader(file, msg) err = MessageFromReader(file, &msg)
if err == nil { if err == nil {
grain.HandleMessage(msg, true, &reply) grain.HandleMessage(&msg, true)
} }
} }
@@ -105,7 +102,7 @@ func (s *DiskStorage) loadState() error {
return json.NewDecoder(file).Decode(&s.LastSaves) return json.NewDecoder(file).Decode(&s.LastSaves)
} }
func (s *DiskStorage) Store(id string, grain Grain) error { func (s *DiskStorage) Store(id CartId, grain *CartGrain) error {
lastSavedMessage, ok := s.LastSaves[id] lastSavedMessage, ok := s.LastSaves[id]
if ok && lastSavedMessage > grain.GetLastChange() { if ok && lastSavedMessage > grain.GetLastChange() {
return nil return nil

View File

@@ -7,25 +7,27 @@ import (
) )
type GrainPool interface { type GrainPool interface {
GetOrSpawn(id string) (Grain, error) Process(id CartId, messages ...Message) (interface{}, error)
Get(id string) (Grain, error) Get(id CartId) (Grain, error)
} }
type Ttl struct { type Ttl struct {
Expires time.Time Expires time.Time
Item Grain Item *CartGrain
} }
type GrainLocalPool struct { type GrainLocalPool struct {
grains map[string]Grain grains map[CartId]*CartGrain
expiry []Ttl expiry []Ttl
spawn func(id CartId) (*CartGrain, error)
Ttl time.Duration Ttl time.Duration
PoolSize int PoolSize int
} }
func NewGrainLocalPool(size int, ttl time.Duration) *GrainLocalPool { func NewGrainLocalPool(size int, ttl time.Duration, spawn func(id CartId) (*CartGrain, error)) *GrainLocalPool {
ret := &GrainLocalPool{ ret := &GrainLocalPool{
grains: make(map[string]Grain), spawn: spawn,
grains: make(map[CartId]*CartGrain),
expiry: make([]Ttl, 0), expiry: make([]Ttl, 0),
Ttl: ttl, Ttl: ttl,
PoolSize: size, PoolSize: size,
@@ -59,11 +61,12 @@ func (p *GrainLocalPool) Purge() {
} }
} }
func (p *GrainLocalPool) GetGrains() map[string]Grain { func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain {
return p.grains return p.grains
} }
func (p *GrainLocalPool) GetOrSpawn(id string, generator func(id string) Grain) (Grain, error) { func (p *GrainLocalPool) Process(id CartId, messages ...Message) (interface{}, error) {
var err error
grain, ok := p.grains[id] grain, ok := p.grains[id]
if !ok { if !ok {
if len(p.grains) >= p.PoolSize { if len(p.grains) >= p.PoolSize {
@@ -74,13 +77,19 @@ func (p *GrainLocalPool) GetOrSpawn(id string, generator func(id string) Grain)
return nil, fmt.Errorf("pool is full") return nil, fmt.Errorf("pool is full")
} }
} }
grain = generator(id) grain, err = p.spawn(id)
p.grains[id] = grain p.grains[id] = grain
} }
return grain, nil if err == nil && grain != nil {
for _, message := range messages {
_, err = grain.HandleMessage(&message, false)
}
}
return grain, err
} }
func (p *GrainLocalPool) Get(id string) (Grain, error) { func (p *GrainLocalPool) Get(id CartId) (Grain, error) {
grain, ok := p.grains[id] grain, ok := p.grains[id]
if !ok { if !ok {
return nil, fmt.Errorf("grain not found") return nil, fmt.Errorf("grain not found")

60
main.go
View File

@@ -1,18 +1,16 @@
package main package main
import ( import (
"encoding/gob"
"encoding/json" "encoding/json"
"log" "log"
"net/http" "net/http"
"os" "os"
"time" "time"
"git.tornberg.me/go-cart-actor/git.tornberg.me/go-cart-actor/messages" messages "git.tornberg.me/go-cart-actor/proto"
) )
func spawn(id string) Grain { func spawn(id CartId) (*CartGrain, error) {
ret := &CartGrain{ ret := &CartGrain{
Id: id, Id: id,
Items: []CartItem{}, Items: []CartItem{},
@@ -20,16 +18,11 @@ func spawn(id string) Grain {
TotalPrice: 0, TotalPrice: 0,
} }
err := loadMessages(ret, id) err := loadMessages(ret, id)
if err != nil { return ret, err
log.Printf("Error loading messages for grain %s: %v\n", id, err)
}
return ret
} }
func init() { func init() {
os.Mkdir("data", 0755) os.Mkdir("data", 0755)
gob.Register(CartItem{})
gob.Register(Message{})
} }
type App struct { type App struct {
@@ -39,7 +32,7 @@ type App struct {
func (a *App) HandleGet(w http.ResponseWriter, r *http.Request) { func (a *App) HandleGet(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id") id := r.PathValue("id")
grain, err := a.pool.GetOrSpawn(id, spawn) grain, err := a.pool.Get(ToCartId(id))
if err != nil { if err != nil {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return
@@ -52,25 +45,19 @@ func (a *App) HandleGet(w http.ResponseWriter, r *http.Request) {
func (a *App) HandleAddSku(w http.ResponseWriter, r *http.Request) { func (a *App) HandleAddSku(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id") id := r.PathValue("id")
sku := r.PathValue("sku") sku := r.PathValue("sku")
grain, err := a.pool.GetOrSpawn(id, spawn) grain, err := a.pool.Process(ToCartId(id), Message{
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
message := &Message{
Type: AddRequestType, Type: AddRequestType,
Content: &messages.AddRequest{Sku: sku}, Content: &messages.AddRequest{Sku: sku},
} })
var reply CartGrain
err = grain.HandleMessage(message, false, &reply)
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error())) w.Write([]byte(err.Error()))
return return
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(reply) json.NewEncoder(w).Encode(grain)
} }
func (a *App) Save() error { func (a *App) Save() error {
@@ -100,13 +87,38 @@ func main() {
log.Printf("Error loading state: %v\n", err) log.Printf("Error loading state: %v\n", err)
} }
app := &App{ app := &App{
pool: NewGrainLocalPool(1000, 5*time.Minute), pool: NewGrainLocalPool(1000, 5*time.Minute, spawn),
storage: storage, storage: storage,
} }
rpcHandler, err := NewGrainHandler(app.pool, "localhost:1337")
if err != nil {
log.Fatalf("Error creating handler: %v\n", err)
}
go rpcHandler.Serve()
remotePool := NewRemoteGrainPool("localhost:1337")
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("GET /{id}", app.HandleGet) mux.HandleFunc("GET /api/{id}", app.HandleGet)
mux.HandleFunc("GET /{id}/add/{sku}", app.HandleAddSku) mux.HandleFunc("GET /api/{id}/add/{sku}", app.HandleAddSku)
mux.HandleFunc("GET /remote/{id}", func(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
ts := time.Now().Unix()
data, err := remotePool.Process(ToCartId(id), Message{
Type: AddRequestType,
TimeStamp: &ts,
Content: &messages.AddRequest{Sku: "49565"},
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(data)
})
mux.HandleFunc("GET /save", app.HandleSave) mux.HandleFunc("GET /save", app.HandleSave)
http.ListenAndServe(":8080", mux) http.ListenAndServe(":8080", mux)

View File

@@ -1,109 +1,140 @@
package main package main
import ( import (
"bufio"
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io" "io"
"time"
"git.tornberg.me/go-cart-actor/git.tornberg.me/go-cart-actor/messages" messages "git.tornberg.me/go-cart-actor/proto"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
type StorableMessage interface { type StorableMessage interface {
GetBytes() ([]byte, error) Write(w io.Writer) error
FromReader(io.Reader, *Message) error
} }
type Message struct { type Message struct {
Type uint64 Type uint16
TimeStamp *int64 TimeStamp *int64
Content interface{} Content interface{}
} }
type MessageWriter struct { type MessageWriter struct {
writer io.Writer io.Writer
} }
func NewMessageWriter(b *bytes.Buffer) *MessageWriter { type StorableMessageHeader struct {
return &MessageWriter{writer: bufio.NewWriter(b)} Version uint16
Type uint16
TimeStamp int64
DataLength uint64
} }
func (w *MessageWriter) WriteUint64(value uint64) error { func GetData(fn func(w io.Writer) error) ([]byte, error) {
bytes := make([]byte, 8) var buf bytes.Buffer
binary.LittleEndian.PutUint64(bytes, value) err := fn(&buf)
_, err := w.writer.Write(bytes) if err != nil {
return err return nil, err
}
func (w *MessageWriter) WriteInt64(value int64) error {
return w.WriteUint64(uint64(value))
}
func (w *MessageWriter) WriteMessage(m *Message) error {
if err := w.WriteUint64(m.Type); err != nil {
return err
} }
if err := w.WriteInt64(*m.TimeStamp); err != nil { b := buf.Bytes()
return err return b, nil
} }
var messageBytes []byte
var err error // func (w *MessageWriter) WriteUint64(value uint64) (int, error) {
// bytes := make([]byte, 8)
// binary.LittleEndian.PutUint64(bytes, value)
// return w.Write(bytes)
// }
// func (w *MessageWriter) WriteInt64(value int64) (int, error) {
// return w.WriteUint64(uint64(value))
// }
// func (w *MessageWriter) WriteMessage(m *Message) (int, error) {
// var i, l int
// var err error
// i, err = w.WriteUint64(m.Type)
// l += i
// i, err = w.WriteInt64(*m.TimeStamp)
// l += i
// var messageBytes []byte
// var err error
// if m.Type == AddRequestType {
// messageBytes, err = proto.Marshal(m.Content.(*messages.AddRequest))
// } else if m.Type == AddItemType {
// messageBytes, err = proto.Marshal(m.Content.(*messages.AddItem))
// } else {
// return fmt.Errorf("unknown message type")
// }
// if err != nil {
// return err
// }
// if err := w.WriteUint64(uint64(len(messageBytes))); err != nil {
// return err
// }
// _, err = w.Write(messageBytes)
// return err
// }
func (m Message) Write(w io.Writer) error {
data, err := GetData(func(wr io.Writer) error {
if m.Type == AddRequestType { if m.Type == AddRequestType {
messageBytes, err = proto.Marshal(m.Content.(*messages.AddRequest)) messageBytes, err := proto.Marshal(m.Content.(*messages.AddRequest))
} else if m.Type == AddItemType {
messageBytes, err = proto.Marshal(m.Content.(*messages.AddItem))
} else {
return fmt.Errorf("unknown message type")
}
if err != nil { if err != nil {
return err return err
} }
if err := w.WriteUint64(uint64(len(messageBytes))); err != nil { wr.Write(messageBytes)
} else if m.Type == AddItemType {
messageBytes, err := proto.Marshal(m.Content.(*messages.AddItem))
if err != nil {
return err return err
} }
_, err = w.writer.Write(messageBytes) wr.Write(messageBytes)
}
return nil
})
if err != nil {
return err
}
ts := time.Now().Unix()
if m.TimeStamp != nil {
ts = *m.TimeStamp
}
err = binary.Write(w, binary.LittleEndian, StorableMessageHeader{
Version: 1,
Type: m.Type,
TimeStamp: ts,
DataLength: uint64(len(data)),
})
w.Write(data)
return err return err
} }
func (m Message) GetBytes() ([]byte, error) { func MessageFromReader(reader io.Reader, m *Message) error {
var b bytes.Buffer header := StorableMessageHeader{}
mw := NewMessageWriter(&b) err := binary.Read(reader, binary.LittleEndian, &header)
err := mw.WriteMessage(&m) if err != nil {
return b.Bytes(), err
}
func (i Message) FromReader(reader io.Reader, m *Message) error {
bytes := make([]byte, 8)
if _, err := reader.Read(bytes); err != nil {
return err return err
} }
m.Type = binary.LittleEndian.Uint64(bytes) messageBytes := make([]byte, header.DataLength)
if _, err := reader.Read(bytes); err != nil { _, err = reader.Read(messageBytes)
if err != nil {
return err return err
} }
timestamp := int64(binary.LittleEndian.Uint64(bytes)) switch header.Type {
m.TimeStamp = &timestamp case AddRequestType:
if _, err := reader.Read(bytes); err != nil {
return err
}
messageBytes := make([]byte, binary.LittleEndian.Uint64(bytes))
if _, err := reader.Read(messageBytes); err != nil {
return err
}
var err error
if m.Type == AddRequestType {
msg := &messages.AddRequest{} msg := &messages.AddRequest{}
err = proto.Unmarshal(messageBytes, msg) err = proto.Unmarshal(messageBytes, msg)
m.Content = msg m.Content = msg
} else if m.Type == AddItemType { case AddItemType:
msg := &messages.AddItem{} msg := &messages.AddItem{}
err = proto.Unmarshal(messageBytes, msg) err = proto.Unmarshal(messageBytes, msg)
m.Content = msg m.Content = msg
} else { default:
return fmt.Errorf("unknown message type") return fmt.Errorf("unknown message type")
} }
if err != nil { if err != nil {

View File

@@ -2,7 +2,7 @@
// versions: // versions:
// protoc-gen-go v1.35.1 // protoc-gen-go v1.35.1
// protoc v5.28.2 // protoc v5.28.2
// source: add.proto // source: proto/messages.proto
package messages package messages
@@ -30,7 +30,7 @@ type AddRequest struct {
func (x *AddRequest) Reset() { func (x *AddRequest) Reset() {
*x = AddRequest{} *x = AddRequest{}
mi := &file_add_proto_msgTypes[0] mi := &file_proto_messages_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -42,7 +42,7 @@ func (x *AddRequest) String() string {
func (*AddRequest) ProtoMessage() {} func (*AddRequest) ProtoMessage() {}
func (x *AddRequest) ProtoReflect() protoreflect.Message { func (x *AddRequest) ProtoReflect() protoreflect.Message {
mi := &file_add_proto_msgTypes[0] mi := &file_proto_messages_proto_msgTypes[0]
if x != nil { if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -55,7 +55,7 @@ func (x *AddRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use AddRequest.ProtoReflect.Descriptor instead. // Deprecated: Use AddRequest.ProtoReflect.Descriptor instead.
func (*AddRequest) Descriptor() ([]byte, []int) { func (*AddRequest) Descriptor() ([]byte, []int) {
return file_add_proto_rawDescGZIP(), []int{0} return file_proto_messages_proto_rawDescGZIP(), []int{0}
} }
func (x *AddRequest) GetSku() string { func (x *AddRequest) GetSku() string {
@@ -79,7 +79,7 @@ type AddItem struct {
func (x *AddItem) Reset() { func (x *AddItem) Reset() {
*x = AddItem{} *x = AddItem{}
mi := &file_add_proto_msgTypes[1] mi := &file_proto_messages_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -91,7 +91,7 @@ func (x *AddItem) String() string {
func (*AddItem) ProtoMessage() {} func (*AddItem) ProtoMessage() {}
func (x *AddItem) ProtoReflect() protoreflect.Message { func (x *AddItem) ProtoReflect() protoreflect.Message {
mi := &file_add_proto_msgTypes[1] mi := &file_proto_messages_proto_msgTypes[1]
if x != nil { if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -104,7 +104,7 @@ func (x *AddItem) ProtoReflect() protoreflect.Message {
// Deprecated: Use AddItem.ProtoReflect.Descriptor instead. // Deprecated: Use AddItem.ProtoReflect.Descriptor instead.
func (*AddItem) Descriptor() ([]byte, []int) { func (*AddItem) Descriptor() ([]byte, []int) {
return file_add_proto_rawDescGZIP(), []int{1} return file_proto_messages_proto_rawDescGZIP(), []int{1}
} }
func (x *AddItem) GetQuantity() int32 { func (x *AddItem) GetQuantity() int32 {
@@ -142,43 +142,42 @@ func (x *AddItem) GetImage() string {
return "" return ""
} }
var File_add_proto protoreflect.FileDescriptor var File_proto_messages_proto protoreflect.FileDescriptor
var file_add_proto_rawDesc = []byte{ var file_proto_messages_proto_rawDesc = []byte{
0x0a, 0x09, 0x61, 0x64, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x6d, 0x65, 0x73, 0x0a, 0x14, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0x1e, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73,
0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x6b, 0x75, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x22, 0x1e, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10,
0x52, 0x03, 0x53, 0x6b, 0x75, 0x22, 0x77, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x49, 0x74, 0x65, 0x6d, 0x0a, 0x03, 0x53, 0x6b, 0x75, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x53, 0x6b, 0x75,
0x12, 0x1a, 0x0a, 0x08, 0x51, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, 0x22, 0x77, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x49, 0x74, 0x65, 0x6d, 0x12, 0x1a, 0x0a, 0x08, 0x51,
0x28, 0x05, 0x52, 0x08, 0x51, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x51,
0x50, 0x72, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x50, 0x72, 0x69, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x50, 0x72, 0x69, 0x63, 0x65,
0x63, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x6b, 0x75, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x50, 0x72, 0x69, 0x63, 0x65, 0x12, 0x10, 0x0a,
0x03, 0x53, 0x6b, 0x75, 0x12, 0x12, 0x0a, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x03, 0x53, 0x6b, 0x75, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x53, 0x6b, 0x75, 0x12,
0x28, 0x09, 0x52, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x49, 0x6d, 0x61, 0x67, 0x12, 0x0a, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x4e,
0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x49, 0x6d, 0x61, 0x67, 0x65, 0x42, 0x28, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x49, 0x6d, 0x61, 0x67, 0x65, 0x18, 0x06, 0x20, 0x01,
0x5a, 0x26, 0x67, 0x69, 0x74, 0x2e, 0x74, 0x6f, 0x72, 0x6e, 0x62, 0x65, 0x72, 0x67, 0x2e, 0x6d, 0x28, 0x09, 0x52, 0x05, 0x49, 0x6d, 0x61, 0x67, 0x65, 0x42, 0x0c, 0x5a, 0x0a, 0x2e, 0x3b, 0x6d,
0x65, 0x2f, 0x67, 0x6f, 0x2d, 0x63, 0x61, 0x72, 0x74, 0x2d, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
file_add_proto_rawDescOnce sync.Once file_proto_messages_proto_rawDescOnce sync.Once
file_add_proto_rawDescData = file_add_proto_rawDesc file_proto_messages_proto_rawDescData = file_proto_messages_proto_rawDesc
) )
func file_add_proto_rawDescGZIP() []byte { func file_proto_messages_proto_rawDescGZIP() []byte {
file_add_proto_rawDescOnce.Do(func() { file_proto_messages_proto_rawDescOnce.Do(func() {
file_add_proto_rawDescData = protoimpl.X.CompressGZIP(file_add_proto_rawDescData) file_proto_messages_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_messages_proto_rawDescData)
}) })
return file_add_proto_rawDescData return file_proto_messages_proto_rawDescData
} }
var file_add_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_proto_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_add_proto_goTypes = []any{ var file_proto_messages_proto_goTypes = []any{
(*AddRequest)(nil), // 0: messages.AddRequest (*AddRequest)(nil), // 0: messages.AddRequest
(*AddItem)(nil), // 1: messages.AddItem (*AddItem)(nil), // 1: messages.AddItem
} }
var file_add_proto_depIdxs = []int32{ var file_proto_messages_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type 0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type 0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension type_name
@@ -186,27 +185,27 @@ var file_add_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for field type_name 0, // [0:0] is the sub-list for field type_name
} }
func init() { file_add_proto_init() } func init() { file_proto_messages_proto_init() }
func file_add_proto_init() { func file_proto_messages_proto_init() {
if File_add_proto != nil { if File_proto_messages_proto != nil {
return return
} }
type x struct{} type x struct{}
out := protoimpl.TypeBuilder{ out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{ File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_add_proto_rawDesc, RawDescriptor: file_proto_messages_proto_rawDesc,
NumEnums: 0, NumEnums: 0,
NumMessages: 2, NumMessages: 2,
NumExtensions: 0, NumExtensions: 0,
NumServices: 0, NumServices: 0,
}, },
GoTypes: file_add_proto_goTypes, GoTypes: file_proto_messages_proto_goTypes,
DependencyIndexes: file_add_proto_depIdxs, DependencyIndexes: file_proto_messages_proto_depIdxs,
MessageInfos: file_add_proto_msgTypes, MessageInfos: file_proto_messages_proto_msgTypes,
}.Build() }.Build()
File_add_proto = out.File File_proto_messages_proto = out.File
file_add_proto_rawDesc = nil file_proto_messages_proto_rawDesc = nil
file_add_proto_goTypes = nil file_proto_messages_proto_goTypes = nil
file_add_proto_depIdxs = nil file_proto_messages_proto_depIdxs = nil
} }

View File

@@ -1,6 +1,6 @@
syntax = "proto3"; syntax = "proto3";
package messages; package messages;
option go_package = "git.tornberg.me/go-cart-actor/messages"; option go_package = ".;messages";
message AddRequest { message AddRequest {
string Sku = 2; string Sku = 2;

0
proto/service.proto Normal file
View File

130
rpc-pool.go Normal file
View File

@@ -0,0 +1,130 @@
package main
import (
"encoding/binary"
"net"
)
const (
RemoteGetState = uint16(0x01)
RemoteHandleMessage = uint16(0x02)
)
type RemoteGrainPool struct {
Hosts []string
grains map[CartId]RemoteGrain
}
func (id CartId) String() string {
return string(id[:])
}
func ToCartId(id string) CartId {
var result [16]byte
copy(result[:], []byte(id))
return result
}
type RemoteGrain struct {
client net.Conn
Id CartId
Address string
}
func NewRemoteGrain(id CartId, address string) *RemoteGrain {
return &RemoteGrain{
Id: id,
Address: address,
}
}
func (g *RemoteGrain) Connect() error {
if g.client == nil {
client, err := net.Dial("tcp", g.Address)
if err != nil {
return err
}
g.client = client
}
return nil
}
type Packet struct {
Version uint16
MessageType uint16
Id CartId
DataLength uint16
}
func (g *RemoteGrain) SendPacket(messageType uint16, data []byte) error {
binary.Write(g.client, binary.LittleEndian, Packet{
Version: 2,
MessageType: messageType,
Id: g.Id,
DataLength: uint16(len(data)),
})
return binary.Write(g.client, binary.LittleEndian, data)
}
func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
data, err := GetData(message.Write)
if err != nil {
return nil, err
}
err = g.SendPacket(RemoteHandleMessage, data)
result := make([]byte, 65535)
g.client.Read(result)
return result, err
}
func (g *RemoteGrain) GetId() CartId {
return g.Id
}
func (g *RemoteGrain) GetCurrentState() (Grain, error) {
var reply CartGrain
err := g.SendPacket(RemoteGetState, nil)
if err != nil {
return nil, err
}
return &reply, err
}
func NewRemoteGrainPool(addr ...string) *RemoteGrainPool {
return &RemoteGrainPool{
Hosts: addr,
grains: make(map[CartId]RemoteGrain),
}
}
func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain {
grain, ok := p.grains[id]
if !ok {
return nil
}
return &grain
}
func (p *RemoteGrainPool) Process(id CartId, messages ...Message) (interface{}, error) {
var result interface{}
var err error
grain := p.findRemoteGrain(id)
if grain == nil {
grain = NewRemoteGrain(id, p.Hosts[0])
grain.Connect()
p.grains[id] = *grain
}
for _, message := range messages {
result, err = grain.HandleMessage(&message, false)
}
return result, err
}
func (p *RemoteGrainPool) Get(id CartId) (Grain, error) {
grain := p.findRemoteGrain(id)
if grain == nil {
return nil, nil
}
return grain.GetCurrentState()
}

86
rpc-server.go Normal file
View File

@@ -0,0 +1,86 @@
package main
import (
"encoding/binary"
"fmt"
"io"
"net"
)
type GrainHandler struct {
listener net.Listener
pool GrainPool
}
func (h *GrainHandler) GetState(id CartId, reply *Grain) error {
grain, err := h.pool.Get(id)
if err != nil {
return err
}
*reply = grain
return nil
}
func NewGrainHandler(pool GrainPool, listen string) (*GrainHandler, error) {
handler := &GrainHandler{
pool: pool,
}
l, err := net.Listen("tcp", listen)
handler.listener = l
return handler, err
}
func (h *GrainHandler) Serve() {
for {
// Accept incoming connections
conn, err := h.listener.Accept()
if err != nil {
fmt.Println("Error:", err)
continue
}
// Handle client connection in a goroutine
go h.handleClient(conn)
}
}
func (h *GrainHandler) handleClient(conn net.Conn) {
fmt.Println("Handling client connection")
defer conn.Close()
var packet Packet
for {
for {
err := binary.Read(conn, binary.LittleEndian, &packet)
if err != nil {
if err == io.EOF {
break
}
fmt.Println("Error reading packet:", err)
}
if packet.Version != 2 {
fmt.Printf("Unknown version %d", packet.Version)
break
}
switch packet.MessageType {
case RemoteHandleMessage:
fmt.Printf("Handling message\n")
var msg Message
err := MessageFromReader(conn, &msg)
if err != nil {
fmt.Println("Error reading message:", err)
}
fmt.Printf("Message: %s, %v\n", packet.Id.String(), msg)
case RemoteGetState:
fmt.Printf("Package: %s %v\n", packet.Id.String(), packet)
}
}
}
}