diff --git a/cart-grain.go b/cart-grain.go index 0a218ae..67b1814 100644 --- a/cart-grain.go +++ b/cart-grain.go @@ -1,95 +1,29 @@ package main import ( - "bufio" - "bytes" - "encoding/binary" "encoding/json" "fmt" - "io" "log" "net/http" "time" - "git.tornberg.me/go-cart-actor/git.tornberg.me/go-cart-actor/messages" + messages "git.tornberg.me/go-cart-actor/proto" "github.com/matst80/slask-finder/pkg/index" - "google.golang.org/protobuf/proto" ) -type StorableMessage interface { - GetBytes() ([]byte, error) - FromReader(io.Reader, *Message) error +type CartId [16]byte + +func (id CartId) MarshalJSON() ([]byte, error) { + return json.Marshal(id.String()) } -type Message struct { - Type uint64 - TimeStamp *int64 - Content interface{} -} - -func (m Message) GetBytes() ([]byte, error) { - var b bytes.Buffer - var err error - w := bufio.NewWriter(&b) - bytes := make([]byte, 8) - binary.LittleEndian.PutUint64(bytes, m.Type) - w.Write(bytes) - binary.LittleEndian.PutUint64(bytes, uint64(*m.TimeStamp)) - w.Write(bytes) - var messageBytes []byte - if m.Type == 1 { - messageBytes, err = proto.Marshal(m.Content.(*messages.AddRequest)) - } else if m.Type == 2 { - messageBytes, err = proto.Marshal(m.Content.(*messages.AddItem)) - } else { - return nil, fmt.Errorf("unknown message type") - } - if err != nil { - return nil, err - } - binary.LittleEndian.PutUint64(bytes, uint64(len(messageBytes))) - w.Write(bytes) - w.Write(messageBytes) - w.Flush() - return b.Bytes(), nil -} - -func (i Message) FromReader(reader io.Reader, m *Message) error { - - bytes := make([]byte, 8) - if _, err := reader.Read(bytes); err != nil { - return err - } - m.Type = binary.LittleEndian.Uint64(bytes) - if _, err := reader.Read(bytes); err != nil { - return err - } - timestamp := int64(binary.LittleEndian.Uint64(bytes)) - m.TimeStamp = ×tamp - if _, err := reader.Read(bytes); err != nil { - return err - } - messageBytes := make([]byte, binary.LittleEndian.Uint64(bytes)) - if _, err := reader.Read(messageBytes); err != nil { - return err - } - var err error - - if m.Type == 1 { - msg := &messages.AddRequest{} - err = proto.Unmarshal(messageBytes, msg) - m.Content = msg - } else if m.Type == 2 { - msg := &messages.AddItem{} - err = proto.Unmarshal(messageBytes, msg) - m.Content = msg - } else { - return fmt.Errorf("unknown message type") - } +func (id *CartId) UnmarshalJSON(data []byte) error { + var str string + err := json.Unmarshal(data, &str) if err != nil { return err } - + copy(id[:], []byte(str)) return nil } @@ -102,19 +36,17 @@ type CartItem struct { type CartGrain struct { storageMessages []Message - Id string `json:"id"` + Id CartId `json:"id"` Items []CartItem `json:"items"` TotalPrice int64 `json:"totalPrice"` } type Grain interface { - GetId() string - GetLastChange() int64 - HandleMessage(message *Message, isReplay bool, reply *CartGrain) error - GetStorageMessage(since int64) []StorableMessage + GetId() CartId + HandleMessage(message *Message, isReplay bool) ([]byte, error) } -func (c *CartGrain) GetId() string { +func (c *CartGrain) GetId() CartId { return c.Id } @@ -141,14 +73,14 @@ func getItemData(sku string) (*messages.AddItem, error) { priceField, ok := item.GetFields()[4] if ok { - pricef, ok := priceField.(float64) + priceFloat, ok := priceField.(float64) if !ok { price, ok = priceField.(int) if !ok { return nil, fmt.Errorf("invalid price type") } } else { - price = int(pricef) + price = int(priceFloat) } } } @@ -161,15 +93,15 @@ func getItemData(sku string) (*messages.AddItem, error) { }, nil } -func (c *CartGrain) AddItem(sku string, reply *CartGrain) error { +func (c *CartGrain) AddItem(sku string) ([]byte, error) { cartItem, err := getItemData(sku) if err != nil { - return err + return nil, err } return c.HandleMessage(&Message{ Type: 2, Content: cartItem, - }, false, reply) + }, false) } func (c *CartGrain) GetStorageMessage(since int64) []StorableMessage { @@ -183,7 +115,7 @@ func (c *CartGrain) GetStorageMessage(since int64) []StorableMessage { return ret } -func (c *CartGrain) HandleMessage(message *Message, isReplay bool, reply *CartGrain) error { +func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) { log.Printf("Handling message %d", message.Type) if message.TimeStamp == nil { now := time.Now().Unix() @@ -191,14 +123,14 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool, reply *CartGr } var err error switch message.Type { - case 1: + case AddRequestType: msg, ok := message.Content.(*messages.AddRequest) if !ok { err = fmt.Errorf("invalid content type") } else { - return c.AddItem(msg.Sku, reply) + return c.AddItem(msg.Sku) } - case 2: + case AddItemType: msg, ok := message.Content.(*messages.AddItem) if !ok { err = fmt.Errorf("invalid content type") @@ -214,9 +146,11 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool, reply *CartGr default: err = fmt.Errorf("unknown message type") } + if err != nil { + return nil, err + } if !isReplay { c.storageMessages = append(c.storageMessages, *message) } - *reply = *c - return err + return json.Marshal(c) } diff --git a/data/1.prot b/data/1.prot index 07c9ed6..9557f27 100644 Binary files a/data/1.prot and b/data/1.prot differ diff --git a/data/4.prot b/data/4.prot new file mode 100644 index 0000000..927516f Binary files /dev/null and b/data/4.prot differ diff --git a/data/state.gob b/data/state.gob new file mode 100644 index 0000000..b481792 Binary files /dev/null and b/data/state.gob differ diff --git a/data/state.gob.bak b/data/state.gob.bak new file mode 100644 index 0000000..f590136 Binary files /dev/null and b/data/state.gob.bak differ diff --git a/data/state.json b/data/state.json deleted file mode 100644 index f086c5a..0000000 --- a/data/state.json +++ /dev/null @@ -1 +0,0 @@ -{"1":1731051371} diff --git a/data/state.json.bak b/data/state.json.bak deleted file mode 100644 index 09eb1ad..0000000 --- a/data/state.json.bak +++ /dev/null @@ -1 +0,0 @@ -{"1":1731050604} diff --git a/disk-storage.go b/disk-storage.go index 4203ad2..ec26330 100644 --- a/disk-storage.go +++ b/disk-storage.go @@ -1,7 +1,7 @@ package main import ( - "encoding/json" + "encoding/gob" "errors" "fmt" "log" @@ -12,26 +12,26 @@ import ( type DiskStorage struct { stateFile string lastSave int64 - LastSaves map[string]int64 + LastSaves map[CartId]int64 } func NewDiskStorage(stateFile string) (*DiskStorage, error) { ret := &DiskStorage{ stateFile: stateFile, - LastSaves: make(map[string]int64), + LastSaves: make(map[CartId]int64), } err := ret.loadState() return ret, err } -func saveMessages(messages []StorableMessage, id string) error { +func saveMessages(messages []StorableMessage, id CartId) error { log.Printf("%d messages to save for %s", len(messages), id) if len(messages) == 0 { return nil } var file *os.File var err error - path := getCartPath(id) + path := getCartPath(id.String()) file, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err @@ -39,11 +39,10 @@ func saveMessages(messages []StorableMessage, id string) error { defer file.Close() for _, m := range messages { - b, err := m.GetBytes() + err := m.Write(file) if err != nil { return err } - file.Write(b) } return err } @@ -52,9 +51,9 @@ func getCartPath(id string) string { return fmt.Sprintf("data/%s.prot", id) } -func loadMessages(grain Grain, id string) error { +func loadMessages(grain Grain, id CartId) error { var err error - path := getCartPath(id) + path := getCartPath(id.String()) if _, err = os.Stat(path); errors.Is(err, os.ErrNotExist) { return err } @@ -64,13 +63,11 @@ func loadMessages(grain Grain, id string) error { } defer file.Close() - var reply CartGrain - for err == nil { - msg := &Message{} - err = msg.FromReader(file, msg) + var msg Message + err = MessageFromReader(file, &msg) if err == nil { - grain.HandleMessage(msg, true, &reply) + grain.HandleMessage(&msg, true) } } @@ -87,7 +84,7 @@ func (s *DiskStorage) saveState() error { return err } defer file.Close() - err = json.NewEncoder(file).Encode(s.LastSaves) + err = gob.NewEncoder(file).Encode(s.LastSaves) if err != nil { return err } @@ -97,15 +94,15 @@ func (s *DiskStorage) saveState() error { } func (s *DiskStorage) loadState() error { - file, err := os.Open("data/state.json") + file, err := os.Open(s.stateFile) if err != nil { return err } defer file.Close() - return json.NewDecoder(file).Decode(&s.LastSaves) + return gob.NewDecoder(file).Decode(&s.LastSaves) } -func (s *DiskStorage) Store(id string, grain Grain) error { +func (s *DiskStorage) Store(id CartId, grain *CartGrain) error { lastSavedMessage, ok := s.LastSaves[id] if ok && lastSavedMessage > grain.GetLastChange() { return nil diff --git a/git.tornberg.me/go-cart-actor/add.pb.go b/git.tornberg.me/go-cart-actor/add.pb.go deleted file mode 100644 index 0211997..0000000 --- a/git.tornberg.me/go-cart-actor/add.pb.go +++ /dev/null @@ -1,212 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.35.1 -// protoc v5.28.2 -// source: add.proto - -package go_cart_actor - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type AddRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Sku string `protobuf:"bytes,2,opt,name=Sku,proto3" json:"Sku,omitempty"` -} - -func (x *AddRequest) Reset() { - *x = AddRequest{} - mi := &file_add_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *AddRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*AddRequest) ProtoMessage() {} - -func (x *AddRequest) ProtoReflect() protoreflect.Message { - mi := &file_add_proto_msgTypes[0] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use AddRequest.ProtoReflect.Descriptor instead. -func (*AddRequest) Descriptor() ([]byte, []int) { - return file_add_proto_rawDescGZIP(), []int{0} -} - -func (x *AddRequest) GetSku() string { - if x != nil { - return x.Sku - } - return "" -} - -type AddItem struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Quantity int32 `protobuf:"varint,2,opt,name=Quantity,proto3" json:"Quantity,omitempty"` - Price int64 `protobuf:"varint,3,opt,name=Price,proto3" json:"Price,omitempty"` - Sku string `protobuf:"bytes,4,opt,name=Sku,proto3" json:"Sku,omitempty"` - Name string `protobuf:"bytes,5,opt,name=Name,proto3" json:"Name,omitempty"` - Image string `protobuf:"bytes,6,opt,name=Image,proto3" json:"Image,omitempty"` -} - -func (x *AddItem) Reset() { - *x = AddItem{} - mi := &file_add_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *AddItem) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*AddItem) ProtoMessage() {} - -func (x *AddItem) ProtoReflect() protoreflect.Message { - mi := &file_add_proto_msgTypes[1] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use AddItem.ProtoReflect.Descriptor instead. -func (*AddItem) Descriptor() ([]byte, []int) { - return file_add_proto_rawDescGZIP(), []int{1} -} - -func (x *AddItem) GetQuantity() int32 { - if x != nil { - return x.Quantity - } - return 0 -} - -func (x *AddItem) GetPrice() int64 { - if x != nil { - return x.Price - } - return 0 -} - -func (x *AddItem) GetSku() string { - if x != nil { - return x.Sku - } - return "" -} - -func (x *AddItem) GetName() string { - if x != nil { - return x.Name - } - return "" -} - -func (x *AddItem) GetImage() string { - if x != nil { - return x.Image - } - return "" -} - -var File_add_proto protoreflect.FileDescriptor - -var file_add_proto_rawDesc = []byte{ - 0x0a, 0x09, 0x61, 0x64, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0x1e, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x6b, 0x75, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x03, 0x53, 0x6b, 0x75, 0x22, 0x77, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x49, 0x74, 0x65, 0x6d, - 0x12, 0x1a, 0x0a, 0x08, 0x51, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x08, 0x51, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x12, 0x14, 0x0a, 0x05, - 0x50, 0x72, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x50, 0x72, 0x69, - 0x63, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x6b, 0x75, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x03, 0x53, 0x6b, 0x75, 0x12, 0x12, 0x0a, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x49, 0x6d, 0x61, 0x67, - 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x49, 0x6d, 0x61, 0x67, 0x65, 0x42, 0x1f, - 0x5a, 0x1d, 0x67, 0x69, 0x74, 0x2e, 0x74, 0x6f, 0x72, 0x6e, 0x62, 0x65, 0x72, 0x67, 0x2e, 0x6d, - 0x65, 0x2f, 0x67, 0x6f, 0x2d, 0x63, 0x61, 0x72, 0x74, 0x2d, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_add_proto_rawDescOnce sync.Once - file_add_proto_rawDescData = file_add_proto_rawDesc -) - -func file_add_proto_rawDescGZIP() []byte { - file_add_proto_rawDescOnce.Do(func() { - file_add_proto_rawDescData = protoimpl.X.CompressGZIP(file_add_proto_rawDescData) - }) - return file_add_proto_rawDescData -} - -var file_add_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_add_proto_goTypes = []any{ - (*AddRequest)(nil), // 0: messages.AddRequest - (*AddItem)(nil), // 1: messages.AddItem -} -var file_add_proto_depIdxs = []int32{ - 0, // [0:0] is the sub-list for method output_type - 0, // [0:0] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_add_proto_init() } -func file_add_proto_init() { - if File_add_proto != nil { - return - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_add_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_add_proto_goTypes, - DependencyIndexes: file_add_proto_depIdxs, - MessageInfos: file_add_proto_msgTypes, - }.Build() - File_add_proto = out.File - file_add_proto_rawDesc = nil - file_add_proto_goTypes = nil - file_add_proto_depIdxs = nil -} diff --git a/grain-pool.go b/grain-pool.go index 82a8091..448af04 100644 --- a/grain-pool.go +++ b/grain-pool.go @@ -7,25 +7,27 @@ import ( ) type GrainPool interface { - GetOrSpawn(id string) (Grain, error) - Get(id string) (Grain, error) + Process(id CartId, messages ...Message) (interface{}, error) + Get(id CartId) (Grain, error) } type Ttl struct { Expires time.Time - Item Grain + Item *CartGrain } type GrainLocalPool struct { - grains map[string]Grain + grains map[CartId]*CartGrain expiry []Ttl + spawn func(id CartId) (*CartGrain, error) Ttl time.Duration PoolSize int } -func NewGrainLocalPool(size int, ttl time.Duration) *GrainLocalPool { +func NewGrainLocalPool(size int, ttl time.Duration, spawn func(id CartId) (*CartGrain, error)) *GrainLocalPool { ret := &GrainLocalPool{ - grains: make(map[string]Grain), + spawn: spawn, + grains: make(map[CartId]*CartGrain), expiry: make([]Ttl, 0), Ttl: ttl, PoolSize: size, @@ -59,11 +61,12 @@ func (p *GrainLocalPool) Purge() { } } -func (p *GrainLocalPool) GetGrains() map[string]Grain { +func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain { return p.grains } -func (p *GrainLocalPool) GetOrSpawn(id string, generator func(id string) Grain) (Grain, error) { +func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) { + var err error grain, ok := p.grains[id] if !ok { if len(p.grains) >= p.PoolSize { @@ -74,16 +77,23 @@ func (p *GrainLocalPool) GetOrSpawn(id string, generator func(id string) Grain) return nil, fmt.Errorf("pool is full") } } - grain = generator(id) + grain, err = p.spawn(id) + p.grains[id] = grain } - return grain, nil + return grain, err } -func (p *GrainLocalPool) Get(id string) (Grain, error) { - grain, ok := p.grains[id] - if !ok { - return nil, fmt.Errorf("grain not found") +func (p *GrainLocalPool) Process(id CartId, messages ...Message) (interface{}, error) { + grain, err := p.GetGrain(id) + if err == nil && grain != nil { + for _, message := range messages { + _, err = grain.HandleMessage(&message, false) + } } - return grain, nil + return grain, err +} + +func (p *GrainLocalPool) Get(id CartId) (Grain, error) { + return p.GetGrain(id) } diff --git a/main.go b/main.go index 2083dbf..0bb6c56 100644 --- a/main.go +++ b/main.go @@ -1,18 +1,16 @@ package main import ( - "encoding/gob" "encoding/json" "log" "net/http" "os" "time" - "git.tornberg.me/go-cart-actor/git.tornberg.me/go-cart-actor/messages" + messages "git.tornberg.me/go-cart-actor/proto" ) -func spawn(id string) Grain { - +func spawn(id CartId) (*CartGrain, error) { ret := &CartGrain{ Id: id, Items: []CartItem{}, @@ -20,16 +18,11 @@ func spawn(id string) Grain { TotalPrice: 0, } err := loadMessages(ret, id) - if err != nil { - log.Printf("Error loading messages for grain %s: %v\n", id, err) - } - return ret + return ret, err } func init() { os.Mkdir("data", 0755) - gob.Register(CartItem{}) - gob.Register(Message{}) } type App struct { @@ -39,7 +32,7 @@ type App struct { func (a *App) HandleGet(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") - grain, err := a.pool.GetOrSpawn(id, spawn) + grain, err := a.pool.Get(ToCartId(id)) if err != nil { w.WriteHeader(http.StatusNotFound) return @@ -52,25 +45,19 @@ func (a *App) HandleGet(w http.ResponseWriter, r *http.Request) { func (a *App) HandleAddSku(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") sku := r.PathValue("sku") - grain, err := a.pool.GetOrSpawn(id, spawn) - if err != nil { - w.WriteHeader(http.StatusNotFound) - return - } - message := &Message{ - Type: 1, + grain, err := a.pool.Process(ToCartId(id), Message{ + Type: AddRequestType, Content: &messages.AddRequest{Sku: sku}, - } - var reply CartGrain - err = grain.HandleMessage(message, false, &reply) + }) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(reply) + json.NewEncoder(w).Encode(grain) } func (a *App) Save() error { @@ -95,18 +82,55 @@ func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) { func main() { // Create a new instance of the server - storage, err := NewDiskStorage("data/state.json") + storage, err := NewDiskStorage("data/state.gob") if err != nil { log.Printf("Error loading state: %v\n", err) } app := &App{ - pool: NewGrainLocalPool(1000, 5*time.Minute), + pool: NewGrainLocalPool(1000, 5*time.Minute, spawn), storage: storage, } + rpcHandler, err := NewGrainHandler(app.pool, "localhost:1337") + if err != nil { + log.Fatalf("Error creating handler: %v\n", err) + } + go rpcHandler.Serve() + + remotePool := NewRemoteGrainPool("localhost:1337") + mux := http.NewServeMux() - mux.HandleFunc("GET /{id}", app.HandleGet) - mux.HandleFunc("GET /{id}/add/{sku}", app.HandleAddSku) + mux.HandleFunc("GET /api/{id}", app.HandleGet) + mux.HandleFunc("GET /api/{id}/add/{sku}", app.HandleAddSku) + mux.HandleFunc("GET /remote/{id}/add", func(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + ts := time.Now().Unix() + data, err := remotePool.Process(ToCartId(id), Message{ + Type: AddRequestType, + TimeStamp: &ts, + Content: &messages.AddRequest{Sku: "49565"}, + }) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(data) + }) + mux.HandleFunc("GET /remote/{id}", func(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + data, err := remotePool.Get(ToCartId(id)) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(data) + }) mux.HandleFunc("GET /save", app.HandleSave) http.ListenAndServe(":8080", mux) diff --git a/message-types.go b/message-types.go new file mode 100644 index 0000000..9d5aac6 --- /dev/null +++ b/message-types.go @@ -0,0 +1,6 @@ +package main + +const ( + AddRequestType = 1 + AddItemType = 2 +) diff --git a/message.go b/message.go new file mode 100644 index 0000000..fc40015 --- /dev/null +++ b/message.go @@ -0,0 +1,111 @@ +package main + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "time" + + messages "git.tornberg.me/go-cart-actor/proto" + "google.golang.org/protobuf/proto" +) + +type StorableMessage interface { + Write(w io.Writer) error +} + +type Message struct { + Type uint16 + TimeStamp *int64 + Content interface{} +} + +type MessageWriter struct { + io.Writer +} + +type StorableMessageHeader struct { + Version uint16 + Type uint16 + TimeStamp int64 + DataLength uint64 +} + +func GetData(fn func(w io.Writer) error) ([]byte, error) { + var buf bytes.Buffer + err := fn(&buf) + if err != nil { + return nil, err + } + b := buf.Bytes() + return b, nil +} + +func (m Message) Write(w io.Writer) error { + data, err := GetData(func(wr io.Writer) error { + if m.Type == AddRequestType { + messageBytes, err := proto.Marshal(m.Content.(*messages.AddRequest)) + if err != nil { + return err + } + wr.Write(messageBytes) + } else if m.Type == AddItemType { + messageBytes, err := proto.Marshal(m.Content.(*messages.AddItem)) + if err != nil { + return err + } + wr.Write(messageBytes) + } + return nil + }) + if err != nil { + return err + } + ts := time.Now().Unix() + if m.TimeStamp != nil { + ts = *m.TimeStamp + } + + err = binary.Write(w, binary.LittleEndian, StorableMessageHeader{ + Version: 1, + Type: m.Type, + TimeStamp: ts, + DataLength: uint64(len(data)), + }) + w.Write(data) + + return err +} + +func MessageFromReader(reader io.Reader, m *Message) error { + header := StorableMessageHeader{} + err := binary.Read(reader, binary.LittleEndian, &header) + if err != nil { + return err + } + messageBytes := make([]byte, header.DataLength) + _, err = reader.Read(messageBytes) + if err != nil { + return err + } + switch header.Type { + case AddRequestType: + msg := &messages.AddRequest{} + err = proto.Unmarshal(messageBytes, msg) + m.Content = msg + case AddItemType: + msg := &messages.AddItem{} + err = proto.Unmarshal(messageBytes, msg) + m.Content = msg + default: + return fmt.Errorf("unknown message type") + } + if err != nil { + return err + } + m.Type = header.Type + m.TimeStamp = &header.TimeStamp + + return nil +} diff --git a/packet.go b/packet.go new file mode 100644 index 0000000..9d047fa --- /dev/null +++ b/packet.go @@ -0,0 +1,76 @@ +package main + +import ( + "encoding/binary" + "io" +) + +const ( + RemoteGetState = uint16(0x01) + RemoteHandleMessage = uint16(0x02) + ResponseBody = uint16(0x03) +) + +type CartPacket struct { + Version uint16 + MessageType uint16 + Id CartId + DataLength uint16 +} + +type ResponsePacket struct { + Version uint16 + MessageType uint16 + DataLength uint16 +} + +func SendCartPacket(conn io.Writer, id CartId, messageType uint16, datafn func(w io.Writer) error) error { + data, err := GetData(datafn) + if err != nil { + return err + } + binary.Write(conn, binary.LittleEndian, CartPacket{ + Version: 2, + MessageType: messageType, + Id: id, + DataLength: uint16(len(data)), + }) + _, err = conn.Write(data) + return err +} + +func SendPacket(conn io.Writer, messageType uint16, datafn func(w io.Writer) error) error { + data, err := GetData(datafn) + if err != nil { + return err + } + binary.Write(conn, binary.LittleEndian, ResponsePacket{ + Version: 1, + MessageType: messageType, + DataLength: uint16(len(data)), + }) + _, err = conn.Write(data) + return err +} + +// func ReceiveCartPacket(conn io.Reader) (CartPacket, []byte, error) { +// var packet CartPacket +// err := binary.Read(conn, binary.LittleEndian, &packet) +// if err != nil { +// return packet, nil, err +// } +// data := make([]byte, packet.DataLength) +// _, err = conn.Read(data) +// return packet, data, err +// } + +func ReceivePacket(conn io.Reader) (uint16, []byte, error) { + var packet ResponsePacket + err := binary.Read(conn, binary.LittleEndian, &packet) + if err != nil { + return packet.MessageType, nil, err + } + data := make([]byte, packet.DataLength) + _, err = conn.Read(data) + return packet.MessageType, data, err +} diff --git a/git.tornberg.me/go-cart-actor/messages/add.pb.go b/proto/messages.pb.go similarity index 56% rename from git.tornberg.me/go-cart-actor/messages/add.pb.go rename to proto/messages.pb.go index be563af..83a6f4f 100644 --- a/git.tornberg.me/go-cart-actor/messages/add.pb.go +++ b/proto/messages.pb.go @@ -2,7 +2,7 @@ // versions: // protoc-gen-go v1.35.1 // protoc v5.28.2 -// source: add.proto +// source: proto/messages.proto package messages @@ -30,7 +30,7 @@ type AddRequest struct { func (x *AddRequest) Reset() { *x = AddRequest{} - mi := &file_add_proto_msgTypes[0] + mi := &file_proto_messages_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -42,7 +42,7 @@ func (x *AddRequest) String() string { func (*AddRequest) ProtoMessage() {} func (x *AddRequest) ProtoReflect() protoreflect.Message { - mi := &file_add_proto_msgTypes[0] + mi := &file_proto_messages_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -55,7 +55,7 @@ func (x *AddRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use AddRequest.ProtoReflect.Descriptor instead. func (*AddRequest) Descriptor() ([]byte, []int) { - return file_add_proto_rawDescGZIP(), []int{0} + return file_proto_messages_proto_rawDescGZIP(), []int{0} } func (x *AddRequest) GetSku() string { @@ -79,7 +79,7 @@ type AddItem struct { func (x *AddItem) Reset() { *x = AddItem{} - mi := &file_add_proto_msgTypes[1] + mi := &file_proto_messages_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -91,7 +91,7 @@ func (x *AddItem) String() string { func (*AddItem) ProtoMessage() {} func (x *AddItem) ProtoReflect() protoreflect.Message { - mi := &file_add_proto_msgTypes[1] + mi := &file_proto_messages_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -104,7 +104,7 @@ func (x *AddItem) ProtoReflect() protoreflect.Message { // Deprecated: Use AddItem.ProtoReflect.Descriptor instead. func (*AddItem) Descriptor() ([]byte, []int) { - return file_add_proto_rawDescGZIP(), []int{1} + return file_proto_messages_proto_rawDescGZIP(), []int{1} } func (x *AddItem) GetQuantity() int32 { @@ -142,43 +142,42 @@ func (x *AddItem) GetImage() string { return "" } -var File_add_proto protoreflect.FileDescriptor +var File_proto_messages_proto protoreflect.FileDescriptor -var file_add_proto_rawDesc = []byte{ - 0x0a, 0x09, 0x61, 0x64, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0x1e, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x6b, 0x75, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x03, 0x53, 0x6b, 0x75, 0x22, 0x77, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x49, 0x74, 0x65, 0x6d, - 0x12, 0x1a, 0x0a, 0x08, 0x51, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x08, 0x51, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x12, 0x14, 0x0a, 0x05, - 0x50, 0x72, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x50, 0x72, 0x69, - 0x63, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x6b, 0x75, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x03, 0x53, 0x6b, 0x75, 0x12, 0x12, 0x0a, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x49, 0x6d, 0x61, 0x67, - 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x49, 0x6d, 0x61, 0x67, 0x65, 0x42, 0x28, - 0x5a, 0x26, 0x67, 0x69, 0x74, 0x2e, 0x74, 0x6f, 0x72, 0x6e, 0x62, 0x65, 0x72, 0x67, 0x2e, 0x6d, - 0x65, 0x2f, 0x67, 0x6f, 0x2d, 0x63, 0x61, 0x72, 0x74, 0x2d, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2f, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +var file_proto_messages_proto_rawDesc = []byte{ + 0x0a, 0x14, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, + 0x22, 0x1e, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, + 0x0a, 0x03, 0x53, 0x6b, 0x75, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x53, 0x6b, 0x75, + 0x22, 0x77, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x49, 0x74, 0x65, 0x6d, 0x12, 0x1a, 0x0a, 0x08, 0x51, + 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x51, + 0x75, 0x61, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x50, 0x72, 0x69, 0x63, 0x65, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x50, 0x72, 0x69, 0x63, 0x65, 0x12, 0x10, 0x0a, + 0x03, 0x53, 0x6b, 0x75, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x53, 0x6b, 0x75, 0x12, + 0x12, 0x0a, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x4e, + 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x49, 0x6d, 0x61, 0x67, 0x65, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x05, 0x49, 0x6d, 0x61, 0x67, 0x65, 0x42, 0x0c, 0x5a, 0x0a, 0x2e, 0x3b, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( - file_add_proto_rawDescOnce sync.Once - file_add_proto_rawDescData = file_add_proto_rawDesc + file_proto_messages_proto_rawDescOnce sync.Once + file_proto_messages_proto_rawDescData = file_proto_messages_proto_rawDesc ) -func file_add_proto_rawDescGZIP() []byte { - file_add_proto_rawDescOnce.Do(func() { - file_add_proto_rawDescData = protoimpl.X.CompressGZIP(file_add_proto_rawDescData) +func file_proto_messages_proto_rawDescGZIP() []byte { + file_proto_messages_proto_rawDescOnce.Do(func() { + file_proto_messages_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_messages_proto_rawDescData) }) - return file_add_proto_rawDescData + return file_proto_messages_proto_rawDescData } -var file_add_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_add_proto_goTypes = []any{ +var file_proto_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_proto_messages_proto_goTypes = []any{ (*AddRequest)(nil), // 0: messages.AddRequest (*AddItem)(nil), // 1: messages.AddItem } -var file_add_proto_depIdxs = []int32{ +var file_proto_messages_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type 0, // [0:0] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name @@ -186,27 +185,27 @@ var file_add_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for field type_name } -func init() { file_add_proto_init() } -func file_add_proto_init() { - if File_add_proto != nil { +func init() { file_proto_messages_proto_init() } +func file_proto_messages_proto_init() { + if File_proto_messages_proto != nil { return } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_add_proto_rawDesc, + RawDescriptor: file_proto_messages_proto_rawDesc, NumEnums: 0, NumMessages: 2, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_add_proto_goTypes, - DependencyIndexes: file_add_proto_depIdxs, - MessageInfos: file_add_proto_msgTypes, + GoTypes: file_proto_messages_proto_goTypes, + DependencyIndexes: file_proto_messages_proto_depIdxs, + MessageInfos: file_proto_messages_proto_msgTypes, }.Build() - File_add_proto = out.File - file_add_proto_rawDesc = nil - file_add_proto_goTypes = nil - file_add_proto_depIdxs = nil + File_proto_messages_proto = out.File + file_proto_messages_proto_rawDesc = nil + file_proto_messages_proto_goTypes = nil + file_proto_messages_proto_depIdxs = nil } diff --git a/messages/add.proto b/proto/messages.proto similarity index 76% rename from messages/add.proto rename to proto/messages.proto index 8a3f4a8..49a899d 100644 --- a/messages/add.proto +++ b/proto/messages.proto @@ -1,6 +1,6 @@ syntax = "proto3"; package messages; -option go_package = "git.tornberg.me/go-cart-actor/messages"; +option go_package = ".;messages"; message AddRequest { string Sku = 2; diff --git a/proto/service.proto b/proto/service.proto new file mode 100644 index 0000000..e69de29 diff --git a/rpc-pool.go b/rpc-pool.go new file mode 100644 index 0000000..fd00d89 --- /dev/null +++ b/rpc-pool.go @@ -0,0 +1,110 @@ +package main + +import ( + "io" + "net" + "strings" +) + +type RemoteGrainPool struct { + Hosts []string + grains map[CartId]RemoteGrain +} + +func (id CartId) String() string { + return strings.Trim(string(id[:]), "\x00") +} + +func ToCartId(id string) CartId { + var result [16]byte + copy(result[:], []byte(id)) + return result +} + +type RemoteGrain struct { + client net.Conn + Id CartId + Address string +} + +func NewRemoteGrain(id CartId, address string) *RemoteGrain { + return &RemoteGrain{ + Id: id, + Address: address, + } +} + +func (g *RemoteGrain) Connect() error { + if g.client == nil { + client, err := net.Dial("tcp", g.Address) + if err != nil { + return err + } + g.client = client + } + return nil +} + +func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) { + + err := SendCartPacket(g.client, g.Id, RemoteHandleMessage, message.Write) + if err != nil { + return nil, err + } + _, data, err := ReceivePacket(g.client) + return data, err +} + +func (g *RemoteGrain) GetId() CartId { + return g.Id +} + +func (g *RemoteGrain) GetCurrentState() ([]byte, error) { + + err := SendCartPacket(g.client, g.Id, RemoteGetState, func(w io.Writer) error { + return nil + }) + if err != nil { + return nil, err + } + _, data, err := ReceivePacket(g.client) + return data, err +} + +func NewRemoteGrainPool(addr ...string) *RemoteGrainPool { + return &RemoteGrainPool{ + Hosts: addr, + grains: make(map[CartId]RemoteGrain), + } +} + +func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain { + grain, ok := p.grains[id] + if !ok { + return nil + } + return &grain +} + +func (p *RemoteGrainPool) Process(id CartId, messages ...Message) ([]byte, error) { + var result []byte + var err error + grain := p.findRemoteGrain(id) + if grain == nil { + grain = NewRemoteGrain(id, p.Hosts[0]) + grain.Connect() + p.grains[id] = *grain + } + for _, message := range messages { + result, err = grain.HandleMessage(&message, false) + } + return result, err +} + +func (p *RemoteGrainPool) Get(id CartId) ([]byte, error) { + grain := p.findRemoteGrain(id) + if grain == nil { + return nil, nil + } + return grain.GetCurrentState() +} diff --git a/rpc-server.go b/rpc-server.go new file mode 100644 index 0000000..fbb38c0 --- /dev/null +++ b/rpc-server.go @@ -0,0 +1,114 @@ +package main + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "io" + "net" +) + +type GrainHandler struct { + listener net.Listener + pool GrainPool +} + +func (h *GrainHandler) GetState(id CartId, reply *Grain) error { + grain, err := h.pool.Get(id) + if err != nil { + return err + } + *reply = grain + return nil +} + +func NewGrainHandler(pool GrainPool, listen string) (*GrainHandler, error) { + handler := &GrainHandler{ + pool: pool, + } + l, err := net.Listen("tcp", listen) + handler.listener = l + return handler, err +} + +func (h *GrainHandler) Serve() { + for { + // Accept incoming connections + conn, err := h.listener.Accept() + if err != nil { + fmt.Println("Error:", err) + continue + } + + // Handle client connection in a goroutine + go h.handleClient(conn) + } +} + +func (h *GrainHandler) handleClient(conn net.Conn) { + + fmt.Println("Handling client connection") + defer conn.Close() + + var packet CartPacket + for { + + for { + err := binary.Read(conn, binary.LittleEndian, &packet) + if err != nil { + if err == io.EOF { + break + } + fmt.Println("Error reading packet:", err) + } + if packet.Version != 2 { + fmt.Printf("Unknown version %d", packet.Version) + break + } + + switch packet.MessageType { + case RemoteHandleMessage: + fmt.Printf("Handling message\n") + var msg Message + err := MessageFromReader(conn, &msg) + if err != nil { + fmt.Println("Error reading message:", err) + } + fmt.Printf("Message: %s, %v\n", packet.Id.String(), msg) + grain, err := h.pool.Get(packet.Id) + if err != nil { + fmt.Println("Error getting grain:", err) + } + _, err = grain.HandleMessage(&msg, false) + if err != nil { + fmt.Println("Error handling message:", err) + } + SendPacket(conn, ResponseBody, func(w io.Writer) error { + data, err := json.Marshal(grain) + if err != nil { + return err + } + w.Write(data) + return nil + }) + case RemoteGetState: + + fmt.Printf("Package: %s %v\n", packet.Id.String(), packet) + grain, err := h.pool.Get(packet.Id) + if err != nil { + fmt.Println("Error getting grain:", err) + } + SendPacket(conn, ResponseBody, func(w io.Writer) error { + data, err := json.Marshal(grain) + if err != nil { + return err + } + w.Write(data) + return nil + }) + } + + } + + } +}