From 356f5effba3be5e45a70ff0f0f617cafc3e6d698 Mon Sep 17 00:00:00 2001 From: matst80 Date: Fri, 8 Nov 2024 23:50:57 +0100 Subject: [PATCH] cleanup --- grain-pool.go | 32 ++++++++++++++++++++------------ grain-server.go | 23 ----------------------- packet.go | 12 ++++++++++++ rpc-server.go | 46 ++++++++++++---------------------------------- server-registry.go | 25 ------------------------- 5 files changed, 44 insertions(+), 94 deletions(-) delete mode 100644 grain-server.go delete mode 100644 server-registry.go diff --git a/grain-pool.go b/grain-pool.go index 448af04..6d43338 100644 --- a/grain-pool.go +++ b/grain-pool.go @@ -1,19 +1,20 @@ package main import ( + "encoding/json" "fmt" "log" "time" ) type GrainPool interface { - Process(id CartId, messages ...Message) (interface{}, error) - Get(id CartId) (Grain, error) + Process(id CartId, messages ...Message) ([]byte, error) + Get(id CartId) ([]byte, error) } type Ttl struct { Expires time.Time - Item *CartGrain + Grain *CartGrain } type GrainLocalPool struct { @@ -46,13 +47,13 @@ func (p *GrainLocalPool) Purge() { for i := 0; i < len(p.expiry); i++ { item := p.expiry[i] if item.Expires.Before(time.Now()) { - if item.Item.GetLastChange() > keepChanged { - log.Printf("Changed item %s expired, keeping", item.Item.GetId()) + if item.Grain.GetLastChange() > keepChanged { + log.Printf("Expired item %s changed, keeping", item.Grain.GetId()) p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) p.expiry = append(p.expiry, item) } else { - log.Printf("Item %s expired", item.Item.GetId()) - delete(p.grains, item.Item.GetId()) + log.Printf("Item %s expired", item.Grain.GetId()) + delete(p.grains, item.Grain.GetId()) p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) } } else { @@ -71,7 +72,7 @@ func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) { if !ok { if len(p.grains) >= p.PoolSize { if p.expiry[0].Expires.Before(time.Now()) { - delete(p.grains, p.expiry[0].Item.GetId()) + delete(p.grains, p.expiry[0].Grain.GetId()) p.expiry = p.expiry[1:] } else { return nil, fmt.Errorf("pool is full") @@ -84,16 +85,23 @@ func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) { return grain, err } -func (p *GrainLocalPool) Process(id CartId, messages ...Message) (interface{}, error) { +func (p *GrainLocalPool) Process(id CartId, messages ...Message) ([]byte, error) { grain, err := p.GetGrain(id) if err == nil && grain != nil { for _, message := range messages { _, err = grain.HandleMessage(&message, false) } } - return grain, err + if err != nil { + return nil, err + } + return json.Marshal(grain) } -func (p *GrainLocalPool) Get(id CartId) (Grain, error) { - return p.GetGrain(id) +func (p *GrainLocalPool) Get(id CartId) ([]byte, error) { + grain, err := p.GetGrain(id) + if err != nil { + return nil, err + } + return json.Marshal(grain) } diff --git a/grain-server.go b/grain-server.go deleted file mode 100644 index 297f997..0000000 --- a/grain-server.go +++ /dev/null @@ -1,23 +0,0 @@ -package main - -import ( - "fmt" - "net" - "net/rpc" -) - -type GrainServer struct { - Host string -} - -func NewServer(hostname string) *GrainServer { - return &GrainServer{ - Host: hostname, - } -} - -func (s *GrainServer) Start(port int, instance Grain) (net.Listener, error) { - rpc.Register(instance) - rpc.HandleHTTP() - return net.Listen("tcp", fmt.Sprintf(":%d", port)) -} diff --git a/packet.go b/packet.go index 9d047fa..b539821 100644 --- a/packet.go +++ b/packet.go @@ -2,6 +2,7 @@ package main import ( "encoding/binary" + "encoding/json" "io" ) @@ -53,6 +54,17 @@ func SendPacket(conn io.Writer, messageType uint16, datafn func(w io.Writer) err return err } +func SendProxyResponse(conn io.Writer, data any) error { + return SendPacket(conn, ResponseBody, func(w io.Writer) error { + data, err := json.Marshal(data) + if err != nil { + return err + } + w.Write(data) + return nil + }) +} + // func ReceiveCartPacket(conn io.Reader) (CartPacket, []byte, error) { // var packet CartPacket // err := binary.Read(conn, binary.LittleEndian, &packet) diff --git a/rpc-server.go b/rpc-server.go index fbb38c0..dfa0899 100644 --- a/rpc-server.go +++ b/rpc-server.go @@ -2,7 +2,6 @@ package main import ( "encoding/binary" - "encoding/json" "fmt" "io" "net" @@ -10,11 +9,11 @@ import ( type GrainHandler struct { listener net.Listener - pool GrainPool + pool *GrainLocalPool } func (h *GrainHandler) GetState(id CartId, reply *Grain) error { - grain, err := h.pool.Get(id) + grain, err := h.pool.GetGrain(id) if err != nil { return err } @@ -22,7 +21,7 @@ func (h *GrainHandler) GetState(id CartId, reply *Grain) error { return nil } -func NewGrainHandler(pool GrainPool, listen string) (*GrainHandler, error) { +func NewGrainHandler(pool *GrainLocalPool, listen string) (*GrainHandler, error) { handler := &GrainHandler{ pool: pool, } @@ -33,33 +32,30 @@ func NewGrainHandler(pool GrainPool, listen string) (*GrainHandler, error) { func (h *GrainHandler) Serve() { for { - // Accept incoming connections conn, err := h.listener.Accept() if err != nil { - fmt.Println("Error:", err) + fmt.Println("Error accepting connection:", err) continue } - // Handle client connection in a goroutine go h.handleClient(conn) } } func (h *GrainHandler) handleClient(conn net.Conn) { - + var err error fmt.Println("Handling client connection") defer conn.Close() var packet CartPacket for { - for { - err := binary.Read(conn, binary.LittleEndian, &packet) + err = binary.Read(conn, binary.LittleEndian, &packet) if err != nil { if err == io.EOF { break } - fmt.Println("Error reading packet:", err) + fmt.Println("Error in connection:", err) } if packet.Version != 2 { fmt.Printf("Unknown version %d", packet.Version) @@ -70,42 +66,24 @@ func (h *GrainHandler) handleClient(conn net.Conn) { case RemoteHandleMessage: fmt.Printf("Handling message\n") var msg Message - err := MessageFromReader(conn, &msg) + err = MessageFromReader(conn, &msg) if err != nil { fmt.Println("Error reading message:", err) } fmt.Printf("Message: %s, %v\n", packet.Id.String(), msg) - grain, err := h.pool.Get(packet.Id) - if err != nil { - fmt.Println("Error getting grain:", err) - } - _, err = grain.HandleMessage(&msg, false) + grain, err := h.pool.Process(packet.Id, msg) if err != nil { fmt.Println("Error handling message:", err) } - SendPacket(conn, ResponseBody, func(w io.Writer) error { - data, err := json.Marshal(grain) - if err != nil { - return err - } - w.Write(data) - return nil - }) - case RemoteGetState: + SendProxyResponse(conn, grain) + case RemoteGetState: fmt.Printf("Package: %s %v\n", packet.Id.String(), packet) grain, err := h.pool.Get(packet.Id) if err != nil { fmt.Println("Error getting grain:", err) } - SendPacket(conn, ResponseBody, func(w io.Writer) error { - data, err := json.Marshal(grain) - if err != nil { - return err - } - w.Write(data) - return nil - }) + SendProxyResponse(conn, grain) } } diff --git a/server-registry.go b/server-registry.go deleted file mode 100644 index 4719536..0000000 --- a/server-registry.go +++ /dev/null @@ -1,25 +0,0 @@ -package main - -import "fmt" - -type Registry interface { - Register(address string, id string) error - Get(id string) (*string, error) -} - -type MemoryRegistry struct { - registry map[string]string -} - -func (r *MemoryRegistry) Register(address string, id string) error { - r.registry[id] = address - return nil -} - -func (r *MemoryRegistry) Get(id string) (*string, error) { - addr, ok := r.registry[id] - if !ok { - return nil, fmt.Errorf("id not found") - } - return &addr, nil -}