diff --git a/cart-grain.go b/cart-grain.go index 44e604b..50ab0a0 100644 --- a/cart-grain.go +++ b/cart-grain.go @@ -7,10 +7,12 @@ import ( "net/http" "time" - "git.tornberg.me/go-cart-actor/git.tornberg.me/go-cart-actor/messages" + messages "git.tornberg.me/go-cart-actor/proto" "github.com/matst80/slask-finder/pkg/index" ) +type CartId [16]byte + type CartItem struct { Sku string `json:"sku"` Name string `json:"name"` @@ -20,19 +22,17 @@ type CartItem struct { type CartGrain struct { storageMessages []Message - Id string `json:"id"` + Id CartId `json:"id"` Items []CartItem `json:"items"` TotalPrice int64 `json:"totalPrice"` } type Grain interface { - GetId() string - GetLastChange() int64 - HandleMessage(message *Message, isReplay bool, reply *CartGrain) error - GetStorageMessage(since int64) []StorableMessage + GetId() CartId + HandleMessage(message *Message, isReplay bool) ([]byte, error) } -func (c *CartGrain) GetId() string { +func (c *CartGrain) GetId() CartId { return c.Id } @@ -79,15 +79,15 @@ func getItemData(sku string) (*messages.AddItem, error) { }, nil } -func (c *CartGrain) AddItem(sku string, reply *CartGrain) error { +func (c *CartGrain) AddItem(sku string) ([]byte, error) { cartItem, err := getItemData(sku) if err != nil { - return err + return nil, err } return c.HandleMessage(&Message{ Type: 2, Content: cartItem, - }, false, reply) + }, false) } func (c *CartGrain) GetStorageMessage(since int64) []StorableMessage { @@ -101,7 +101,7 @@ func (c *CartGrain) GetStorageMessage(since int64) []StorableMessage { return ret } -func (c *CartGrain) HandleMessage(message *Message, isReplay bool, reply *CartGrain) error { +func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) { log.Printf("Handling message %d", message.Type) if message.TimeStamp == nil { now := time.Now().Unix() @@ -114,7 +114,7 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool, reply *CartGr if !ok { err = fmt.Errorf("invalid content type") } else { - return c.AddItem(msg.Sku, reply) + return c.AddItem(msg.Sku) } case AddItemType: msg, ok := message.Content.(*messages.AddItem) @@ -132,9 +132,11 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool, reply *CartGr default: err = fmt.Errorf("unknown message type") } + if err != nil { + return nil, err + } if !isReplay { c.storageMessages = append(c.storageMessages, *message) } - *reply = *c - return err + return json.Marshal(c) } diff --git a/data/1.prot b/data/1.prot deleted file mode 100644 index 07c9ed6..0000000 Binary files a/data/1.prot and /dev/null differ diff --git a/data/state.json b/data/state.json deleted file mode 100644 index f086c5a..0000000 --- a/data/state.json +++ /dev/null @@ -1 +0,0 @@ -{"1":1731051371} diff --git a/data/state.json.bak b/data/state.json.bak deleted file mode 100644 index 09eb1ad..0000000 --- a/data/state.json.bak +++ /dev/null @@ -1 +0,0 @@ -{"1":1731050604} diff --git a/disk-storage.go b/disk-storage.go index 4203ad2..d230ec7 100644 --- a/disk-storage.go +++ b/disk-storage.go @@ -12,26 +12,26 @@ import ( type DiskStorage struct { stateFile string lastSave int64 - LastSaves map[string]int64 + LastSaves map[CartId]int64 } func NewDiskStorage(stateFile string) (*DiskStorage, error) { ret := &DiskStorage{ stateFile: stateFile, - LastSaves: make(map[string]int64), + LastSaves: make(map[CartId]int64), } err := ret.loadState() return ret, err } -func saveMessages(messages []StorableMessage, id string) error { +func saveMessages(messages []StorableMessage, id CartId) error { log.Printf("%d messages to save for %s", len(messages), id) if len(messages) == 0 { return nil } var file *os.File var err error - path := getCartPath(id) + path := getCartPath(id.String()) file, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err @@ -39,11 +39,10 @@ func saveMessages(messages []StorableMessage, id string) error { defer file.Close() for _, m := range messages { - b, err := m.GetBytes() + err := m.Write(file) if err != nil { return err } - file.Write(b) } return err } @@ -52,9 +51,9 @@ func getCartPath(id string) string { return fmt.Sprintf("data/%s.prot", id) } -func loadMessages(grain Grain, id string) error { +func loadMessages(grain Grain, id CartId) error { var err error - path := getCartPath(id) + path := getCartPath(id.String()) if _, err = os.Stat(path); errors.Is(err, os.ErrNotExist) { return err } @@ -64,13 +63,11 @@ func loadMessages(grain Grain, id string) error { } defer file.Close() - var reply CartGrain - for err == nil { - msg := &Message{} - err = msg.FromReader(file, msg) + var msg Message + err = MessageFromReader(file, &msg) if err == nil { - grain.HandleMessage(msg, true, &reply) + grain.HandleMessage(&msg, true) } } @@ -105,7 +102,7 @@ func (s *DiskStorage) loadState() error { return json.NewDecoder(file).Decode(&s.LastSaves) } -func (s *DiskStorage) Store(id string, grain Grain) error { +func (s *DiskStorage) Store(id CartId, grain *CartGrain) error { lastSavedMessage, ok := s.LastSaves[id] if ok && lastSavedMessage > grain.GetLastChange() { return nil diff --git a/grain-pool.go b/grain-pool.go index 82a8091..b331948 100644 --- a/grain-pool.go +++ b/grain-pool.go @@ -7,25 +7,27 @@ import ( ) type GrainPool interface { - GetOrSpawn(id string) (Grain, error) - Get(id string) (Grain, error) + Process(id CartId, messages ...Message) (interface{}, error) + Get(id CartId) (Grain, error) } type Ttl struct { Expires time.Time - Item Grain + Item *CartGrain } type GrainLocalPool struct { - grains map[string]Grain + grains map[CartId]*CartGrain expiry []Ttl + spawn func(id CartId) (*CartGrain, error) Ttl time.Duration PoolSize int } -func NewGrainLocalPool(size int, ttl time.Duration) *GrainLocalPool { +func NewGrainLocalPool(size int, ttl time.Duration, spawn func(id CartId) (*CartGrain, error)) *GrainLocalPool { ret := &GrainLocalPool{ - grains: make(map[string]Grain), + spawn: spawn, + grains: make(map[CartId]*CartGrain), expiry: make([]Ttl, 0), Ttl: ttl, PoolSize: size, @@ -59,11 +61,12 @@ func (p *GrainLocalPool) Purge() { } } -func (p *GrainLocalPool) GetGrains() map[string]Grain { +func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain { return p.grains } -func (p *GrainLocalPool) GetOrSpawn(id string, generator func(id string) Grain) (Grain, error) { +func (p *GrainLocalPool) Process(id CartId, messages ...Message) (interface{}, error) { + var err error grain, ok := p.grains[id] if !ok { if len(p.grains) >= p.PoolSize { @@ -74,13 +77,19 @@ func (p *GrainLocalPool) GetOrSpawn(id string, generator func(id string) Grain) return nil, fmt.Errorf("pool is full") } } - grain = generator(id) + grain, err = p.spawn(id) + p.grains[id] = grain } - return grain, nil + if err == nil && grain != nil { + for _, message := range messages { + _, err = grain.HandleMessage(&message, false) + } + } + return grain, err } -func (p *GrainLocalPool) Get(id string) (Grain, error) { +func (p *GrainLocalPool) Get(id CartId) (Grain, error) { grain, ok := p.grains[id] if !ok { return nil, fmt.Errorf("grain not found") diff --git a/main.go b/main.go index 040853e..ee351e9 100644 --- a/main.go +++ b/main.go @@ -1,18 +1,16 @@ package main import ( - "encoding/gob" "encoding/json" "log" "net/http" "os" "time" - "git.tornberg.me/go-cart-actor/git.tornberg.me/go-cart-actor/messages" + messages "git.tornberg.me/go-cart-actor/proto" ) -func spawn(id string) Grain { - +func spawn(id CartId) (*CartGrain, error) { ret := &CartGrain{ Id: id, Items: []CartItem{}, @@ -20,16 +18,11 @@ func spawn(id string) Grain { TotalPrice: 0, } err := loadMessages(ret, id) - if err != nil { - log.Printf("Error loading messages for grain %s: %v\n", id, err) - } - return ret + return ret, err } func init() { os.Mkdir("data", 0755) - gob.Register(CartItem{}) - gob.Register(Message{}) } type App struct { @@ -39,7 +32,7 @@ type App struct { func (a *App) HandleGet(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") - grain, err := a.pool.GetOrSpawn(id, spawn) + grain, err := a.pool.Get(ToCartId(id)) if err != nil { w.WriteHeader(http.StatusNotFound) return @@ -52,25 +45,19 @@ func (a *App) HandleGet(w http.ResponseWriter, r *http.Request) { func (a *App) HandleAddSku(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") sku := r.PathValue("sku") - grain, err := a.pool.GetOrSpawn(id, spawn) - if err != nil { - w.WriteHeader(http.StatusNotFound) - return - } - message := &Message{ + grain, err := a.pool.Process(ToCartId(id), Message{ Type: AddRequestType, Content: &messages.AddRequest{Sku: sku}, - } - var reply CartGrain - err = grain.HandleMessage(message, false, &reply) + }) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(reply) + json.NewEncoder(w).Encode(grain) } func (a *App) Save() error { @@ -100,13 +87,38 @@ func main() { log.Printf("Error loading state: %v\n", err) } app := &App{ - pool: NewGrainLocalPool(1000, 5*time.Minute), + pool: NewGrainLocalPool(1000, 5*time.Minute, spawn), storage: storage, } + rpcHandler, err := NewGrainHandler(app.pool, "localhost:1337") + if err != nil { + log.Fatalf("Error creating handler: %v\n", err) + } + go rpcHandler.Serve() + + remotePool := NewRemoteGrainPool("localhost:1337") + mux := http.NewServeMux() - mux.HandleFunc("GET /{id}", app.HandleGet) - mux.HandleFunc("GET /{id}/add/{sku}", app.HandleAddSku) + mux.HandleFunc("GET /api/{id}", app.HandleGet) + mux.HandleFunc("GET /api/{id}/add/{sku}", app.HandleAddSku) + mux.HandleFunc("GET /remote/{id}", func(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + ts := time.Now().Unix() + data, err := remotePool.Process(ToCartId(id), Message{ + Type: AddRequestType, + TimeStamp: &ts, + Content: &messages.AddRequest{Sku: "49565"}, + }) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(data) + }) mux.HandleFunc("GET /save", app.HandleSave) http.ListenAndServe(":8080", mux) diff --git a/message.go b/message.go index 518f2f8..d38ab27 100644 --- a/message.go +++ b/message.go @@ -1,109 +1,140 @@ package main import ( - "bufio" "bytes" "encoding/binary" "fmt" "io" + "time" - "git.tornberg.me/go-cart-actor/git.tornberg.me/go-cart-actor/messages" + messages "git.tornberg.me/go-cart-actor/proto" "google.golang.org/protobuf/proto" ) type StorableMessage interface { - GetBytes() ([]byte, error) - FromReader(io.Reader, *Message) error + Write(w io.Writer) error } type Message struct { - Type uint64 + Type uint16 TimeStamp *int64 Content interface{} } type MessageWriter struct { - writer io.Writer + io.Writer } -func NewMessageWriter(b *bytes.Buffer) *MessageWriter { - return &MessageWriter{writer: bufio.NewWriter(b)} +type StorableMessageHeader struct { + Version uint16 + Type uint16 + TimeStamp int64 + DataLength uint64 } -func (w *MessageWriter) WriteUint64(value uint64) error { - bytes := make([]byte, 8) - binary.LittleEndian.PutUint64(bytes, value) - _, err := w.writer.Write(bytes) - return err -} - -func (w *MessageWriter) WriteInt64(value int64) error { - return w.WriteUint64(uint64(value)) -} - -func (w *MessageWriter) WriteMessage(m *Message) error { - if err := w.WriteUint64(m.Type); err != nil { - return err - } - if err := w.WriteInt64(*m.TimeStamp); err != nil { - return err - } - var messageBytes []byte - var err error - if m.Type == AddRequestType { - messageBytes, err = proto.Marshal(m.Content.(*messages.AddRequest)) - } else if m.Type == AddItemType { - messageBytes, err = proto.Marshal(m.Content.(*messages.AddItem)) - } else { - return fmt.Errorf("unknown message type") +func GetData(fn func(w io.Writer) error) ([]byte, error) { + var buf bytes.Buffer + err := fn(&buf) + if err != nil { + return nil, err } + b := buf.Bytes() + return b, nil +} + +// func (w *MessageWriter) WriteUint64(value uint64) (int, error) { +// bytes := make([]byte, 8) +// binary.LittleEndian.PutUint64(bytes, value) +// return w.Write(bytes) +// } + +// func (w *MessageWriter) WriteInt64(value int64) (int, error) { +// return w.WriteUint64(uint64(value)) +// } + +// func (w *MessageWriter) WriteMessage(m *Message) (int, error) { +// var i, l int +// var err error +// i, err = w.WriteUint64(m.Type) +// l += i +// i, err = w.WriteInt64(*m.TimeStamp) +// l += i +// var messageBytes []byte +// var err error +// if m.Type == AddRequestType { +// messageBytes, err = proto.Marshal(m.Content.(*messages.AddRequest)) +// } else if m.Type == AddItemType { +// messageBytes, err = proto.Marshal(m.Content.(*messages.AddItem)) +// } else { +// return fmt.Errorf("unknown message type") +// } +// if err != nil { +// return err +// } +// if err := w.WriteUint64(uint64(len(messageBytes))); err != nil { +// return err +// } +// _, err = w.Write(messageBytes) +// return err +// } + +func (m Message) Write(w io.Writer) error { + data, err := GetData(func(wr io.Writer) error { + if m.Type == AddRequestType { + messageBytes, err := proto.Marshal(m.Content.(*messages.AddRequest)) + if err != nil { + return err + } + wr.Write(messageBytes) + } else if m.Type == AddItemType { + messageBytes, err := proto.Marshal(m.Content.(*messages.AddItem)) + if err != nil { + return err + } + wr.Write(messageBytes) + } + return nil + }) if err != nil { return err } - if err := w.WriteUint64(uint64(len(messageBytes))); err != nil { - return err + ts := time.Now().Unix() + if m.TimeStamp != nil { + ts = *m.TimeStamp } - _, err = w.writer.Write(messageBytes) + + err = binary.Write(w, binary.LittleEndian, StorableMessageHeader{ + Version: 1, + Type: m.Type, + TimeStamp: ts, + DataLength: uint64(len(data)), + }) + w.Write(data) + return err } -func (m Message) GetBytes() ([]byte, error) { - var b bytes.Buffer - mw := NewMessageWriter(&b) - err := mw.WriteMessage(&m) - return b.Bytes(), err -} - -func (i Message) FromReader(reader io.Reader, m *Message) error { - - bytes := make([]byte, 8) - if _, err := reader.Read(bytes); err != nil { +func MessageFromReader(reader io.Reader, m *Message) error { + header := StorableMessageHeader{} + err := binary.Read(reader, binary.LittleEndian, &header) + if err != nil { return err } - m.Type = binary.LittleEndian.Uint64(bytes) - if _, err := reader.Read(bytes); err != nil { + messageBytes := make([]byte, header.DataLength) + _, err = reader.Read(messageBytes) + if err != nil { return err } - timestamp := int64(binary.LittleEndian.Uint64(bytes)) - m.TimeStamp = ×tamp - if _, err := reader.Read(bytes); err != nil { - return err - } - messageBytes := make([]byte, binary.LittleEndian.Uint64(bytes)) - if _, err := reader.Read(messageBytes); err != nil { - return err - } - var err error - - if m.Type == AddRequestType { + switch header.Type { + case AddRequestType: msg := &messages.AddRequest{} err = proto.Unmarshal(messageBytes, msg) m.Content = msg - } else if m.Type == AddItemType { + case AddItemType: msg := &messages.AddItem{} err = proto.Unmarshal(messageBytes, msg) m.Content = msg - } else { + default: return fmt.Errorf("unknown message type") } if err != nil { diff --git a/git.tornberg.me/go-cart-actor/messages/add.pb.go b/proto/messages.pb.go similarity index 56% rename from git.tornberg.me/go-cart-actor/messages/add.pb.go rename to proto/messages.pb.go index be563af..83a6f4f 100644 --- a/git.tornberg.me/go-cart-actor/messages/add.pb.go +++ b/proto/messages.pb.go @@ -2,7 +2,7 @@ // versions: // protoc-gen-go v1.35.1 // protoc v5.28.2 -// source: add.proto +// source: proto/messages.proto package messages @@ -30,7 +30,7 @@ type AddRequest struct { func (x *AddRequest) Reset() { *x = AddRequest{} - mi := &file_add_proto_msgTypes[0] + mi := &file_proto_messages_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -42,7 +42,7 @@ func (x *AddRequest) String() string { func (*AddRequest) ProtoMessage() {} func (x *AddRequest) ProtoReflect() protoreflect.Message { - mi := &file_add_proto_msgTypes[0] + mi := &file_proto_messages_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -55,7 +55,7 @@ func (x *AddRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use AddRequest.ProtoReflect.Descriptor instead. func (*AddRequest) Descriptor() ([]byte, []int) { - return file_add_proto_rawDescGZIP(), []int{0} + return file_proto_messages_proto_rawDescGZIP(), []int{0} } func (x *AddRequest) GetSku() string { @@ -79,7 +79,7 @@ type AddItem struct { func (x *AddItem) Reset() { *x = AddItem{} - mi := &file_add_proto_msgTypes[1] + mi := &file_proto_messages_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -91,7 +91,7 @@ func (x *AddItem) String() string { func (*AddItem) ProtoMessage() {} func (x *AddItem) ProtoReflect() protoreflect.Message { - mi := &file_add_proto_msgTypes[1] + mi := &file_proto_messages_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -104,7 +104,7 @@ func (x *AddItem) ProtoReflect() protoreflect.Message { // Deprecated: Use AddItem.ProtoReflect.Descriptor instead. func (*AddItem) Descriptor() ([]byte, []int) { - return file_add_proto_rawDescGZIP(), []int{1} + return file_proto_messages_proto_rawDescGZIP(), []int{1} } func (x *AddItem) GetQuantity() int32 { @@ -142,43 +142,42 @@ func (x *AddItem) GetImage() string { return "" } -var File_add_proto protoreflect.FileDescriptor +var File_proto_messages_proto protoreflect.FileDescriptor -var file_add_proto_rawDesc = []byte{ - 0x0a, 0x09, 0x61, 0x64, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0x1e, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x6b, 0x75, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x03, 0x53, 0x6b, 0x75, 0x22, 0x77, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x49, 0x74, 0x65, 0x6d, - 0x12, 0x1a, 0x0a, 0x08, 0x51, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x08, 0x51, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x12, 0x14, 0x0a, 0x05, - 0x50, 0x72, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x50, 0x72, 0x69, - 0x63, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x6b, 0x75, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x03, 0x53, 0x6b, 0x75, 0x12, 0x12, 0x0a, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x49, 0x6d, 0x61, 0x67, - 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x49, 0x6d, 0x61, 0x67, 0x65, 0x42, 0x28, - 0x5a, 0x26, 0x67, 0x69, 0x74, 0x2e, 0x74, 0x6f, 0x72, 0x6e, 0x62, 0x65, 0x72, 0x67, 0x2e, 0x6d, - 0x65, 0x2f, 0x67, 0x6f, 0x2d, 0x63, 0x61, 0x72, 0x74, 0x2d, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2f, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +var file_proto_messages_proto_rawDesc = []byte{ + 0x0a, 0x14, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, + 0x22, 0x1e, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, + 0x0a, 0x03, 0x53, 0x6b, 0x75, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x53, 0x6b, 0x75, + 0x22, 0x77, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x49, 0x74, 0x65, 0x6d, 0x12, 0x1a, 0x0a, 0x08, 0x51, + 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x51, + 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x50, 0x72, 0x69, 0x63, 0x65, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x50, 0x72, 0x69, 0x63, 0x65, 0x12, 0x10, 0x0a, + 0x03, 0x53, 0x6b, 0x75, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x53, 0x6b, 0x75, 0x12, + 0x12, 0x0a, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x4e, + 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x49, 0x6d, 0x61, 0x67, 0x65, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x05, 0x49, 0x6d, 0x61, 0x67, 0x65, 0x42, 0x0c, 0x5a, 0x0a, 0x2e, 0x3b, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( - file_add_proto_rawDescOnce sync.Once - file_add_proto_rawDescData = file_add_proto_rawDesc + file_proto_messages_proto_rawDescOnce sync.Once + file_proto_messages_proto_rawDescData = file_proto_messages_proto_rawDesc ) -func file_add_proto_rawDescGZIP() []byte { - file_add_proto_rawDescOnce.Do(func() { - file_add_proto_rawDescData = protoimpl.X.CompressGZIP(file_add_proto_rawDescData) +func file_proto_messages_proto_rawDescGZIP() []byte { + file_proto_messages_proto_rawDescOnce.Do(func() { + file_proto_messages_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_messages_proto_rawDescData) }) - return file_add_proto_rawDescData + return file_proto_messages_proto_rawDescData } -var file_add_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_add_proto_goTypes = []any{ +var file_proto_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_proto_messages_proto_goTypes = []any{ (*AddRequest)(nil), // 0: messages.AddRequest (*AddItem)(nil), // 1: messages.AddItem } -var file_add_proto_depIdxs = []int32{ +var file_proto_messages_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type 0, // [0:0] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name @@ -186,27 +185,27 @@ var file_add_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for field type_name } -func init() { file_add_proto_init() } -func file_add_proto_init() { - if File_add_proto != nil { +func init() { file_proto_messages_proto_init() } +func file_proto_messages_proto_init() { + if File_proto_messages_proto != nil { return } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_add_proto_rawDesc, + RawDescriptor: file_proto_messages_proto_rawDesc, NumEnums: 0, NumMessages: 2, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_add_proto_goTypes, - DependencyIndexes: file_add_proto_depIdxs, - MessageInfos: file_add_proto_msgTypes, + GoTypes: file_proto_messages_proto_goTypes, + DependencyIndexes: file_proto_messages_proto_depIdxs, + MessageInfos: file_proto_messages_proto_msgTypes, }.Build() - File_add_proto = out.File - file_add_proto_rawDesc = nil - file_add_proto_goTypes = nil - file_add_proto_depIdxs = nil + File_proto_messages_proto = out.File + file_proto_messages_proto_rawDesc = nil + file_proto_messages_proto_goTypes = nil + file_proto_messages_proto_depIdxs = nil } diff --git a/messages/add.proto b/proto/messages.proto similarity index 76% rename from messages/add.proto rename to proto/messages.proto index 8a3f4a8..49a899d 100644 --- a/messages/add.proto +++ b/proto/messages.proto @@ -1,6 +1,6 @@ syntax = "proto3"; package messages; -option go_package = "git.tornberg.me/go-cart-actor/messages"; +option go_package = ".;messages"; message AddRequest { string Sku = 2; diff --git a/proto/service.proto b/proto/service.proto new file mode 100644 index 0000000..e69de29 diff --git a/rpc-pool.go b/rpc-pool.go new file mode 100644 index 0000000..9232418 --- /dev/null +++ b/rpc-pool.go @@ -0,0 +1,130 @@ +package main + +import ( + "encoding/binary" + "net" +) + +const ( + RemoteGetState = uint16(0x01) + RemoteHandleMessage = uint16(0x02) +) + +type RemoteGrainPool struct { + Hosts []string + grains map[CartId]RemoteGrain +} + +func (id CartId) String() string { + return string(id[:]) +} + +func ToCartId(id string) CartId { + var result [16]byte + copy(result[:], []byte(id)) + return result +} + +type RemoteGrain struct { + client net.Conn + Id CartId + Address string +} + +func NewRemoteGrain(id CartId, address string) *RemoteGrain { + return &RemoteGrain{ + Id: id, + Address: address, + } +} + +func (g *RemoteGrain) Connect() error { + if g.client == nil { + client, err := net.Dial("tcp", g.Address) + if err != nil { + return err + } + g.client = client + } + return nil +} + +type Packet struct { + Version uint16 + MessageType uint16 + Id CartId + DataLength uint16 +} + +func (g *RemoteGrain) SendPacket(messageType uint16, data []byte) error { + binary.Write(g.client, binary.LittleEndian, Packet{ + Version: 2, + MessageType: messageType, + Id: g.Id, + DataLength: uint16(len(data)), + }) + return binary.Write(g.client, binary.LittleEndian, data) +} + +func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) { + data, err := GetData(message.Write) + if err != nil { + return nil, err + } + err = g.SendPacket(RemoteHandleMessage, data) + result := make([]byte, 65535) + g.client.Read(result) + return result, err +} + +func (g *RemoteGrain) GetId() CartId { + return g.Id +} + +func (g *RemoteGrain) GetCurrentState() (Grain, error) { + + var reply CartGrain + err := g.SendPacket(RemoteGetState, nil) + if err != nil { + return nil, err + } + return &reply, err +} + +func NewRemoteGrainPool(addr ...string) *RemoteGrainPool { + return &RemoteGrainPool{ + Hosts: addr, + grains: make(map[CartId]RemoteGrain), + } +} + +func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain { + grain, ok := p.grains[id] + if !ok { + return nil + } + return &grain +} + +func (p *RemoteGrainPool) Process(id CartId, messages ...Message) (interface{}, error) { + var result interface{} + var err error + grain := p.findRemoteGrain(id) + if grain == nil { + grain = NewRemoteGrain(id, p.Hosts[0]) + grain.Connect() + p.grains[id] = *grain + } + for _, message := range messages { + result, err = grain.HandleMessage(&message, false) + } + return result, err +} + +func (p *RemoteGrainPool) Get(id CartId) (Grain, error) { + grain := p.findRemoteGrain(id) + if grain == nil { + return nil, nil + } + return grain.GetCurrentState() +} diff --git a/rpc-server.go b/rpc-server.go new file mode 100644 index 0000000..2d479ec --- /dev/null +++ b/rpc-server.go @@ -0,0 +1,86 @@ +package main + +import ( + "encoding/binary" + "fmt" + "io" + "net" +) + +type GrainHandler struct { + listener net.Listener + pool GrainPool +} + +func (h *GrainHandler) GetState(id CartId, reply *Grain) error { + grain, err := h.pool.Get(id) + if err != nil { + return err + } + *reply = grain + return nil +} + +func NewGrainHandler(pool GrainPool, listen string) (*GrainHandler, error) { + handler := &GrainHandler{ + pool: pool, + } + l, err := net.Listen("tcp", listen) + handler.listener = l + return handler, err +} + +func (h *GrainHandler) Serve() { + for { + // Accept incoming connections + conn, err := h.listener.Accept() + if err != nil { + fmt.Println("Error:", err) + continue + } + + // Handle client connection in a goroutine + go h.handleClient(conn) + } +} + +func (h *GrainHandler) handleClient(conn net.Conn) { + + fmt.Println("Handling client connection") + defer conn.Close() + + var packet Packet + for { + + for { + err := binary.Read(conn, binary.LittleEndian, &packet) + if err != nil { + if err == io.EOF { + break + } + fmt.Println("Error reading packet:", err) + } + if packet.Version != 2 { + fmt.Printf("Unknown version %d", packet.Version) + break + } + + switch packet.MessageType { + case RemoteHandleMessage: + fmt.Printf("Handling message\n") + var msg Message + err := MessageFromReader(conn, &msg) + if err != nil { + fmt.Println("Error reading message:", err) + } + fmt.Printf("Message: %s, %v\n", packet.Id.String(), msg) + case RemoteGetState: + + fmt.Printf("Package: %s %v\n", packet.Id.String(), packet) + + } + + } + + } +}