From 5348c33f3bcb09b08cb54b3f16129ef983e4faca Mon Sep 17 00:00:00 2001 From: matst80 Date: Thu, 21 Nov 2024 21:23:38 +0100 Subject: [PATCH 1/7] netpool test. wip --- go.mod | 1 + go.sum | 2 ++ main.go | 2 +- remote-grain-pool._go | 67 ++++++++++++++++++++++++++++++++++++++++++ remote-grain-pool.go | 67 ------------------------------------------ remote-grain.go | 9 ++++-- remote-host.go | 3 ++ rpc-server.go | 2 +- synced-pool.go | 38 +++++++++++++++++++----- tcp-connection.go | 18 ++++++++---- tcp-connection_test.go | 20 ++++++++++--- 11 files changed, 140 insertions(+), 89 deletions(-) create mode 100644 remote-grain-pool._go delete mode 100644 remote-grain-pool.go diff --git a/go.mod b/go.mod index 9f69bec..28dee33 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/prometheus/procfs v0.15.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/x448/float16 v0.8.4 // indirect + github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e // indirect golang.org/x/net v0.26.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect golang.org/x/sys v0.22.0 // indirect diff --git a/go.sum b/go.sum index 4cdb221..709846b 100644 --- a/go.sum +++ b/go.sum @@ -100,6 +100,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e h1:fAzVSmKQkWflN25ED65CH/C1T3iVWq2BQfN7eQsg4E4= +github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e/go.mod h1:gQsFrHrY6nviQu+VX7zKWDyhtLPNzngtYZ+C+7cywdk= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/main.go b/main.go index 7d2c717..f8fb35d 100644 --- a/main.go +++ b/main.go @@ -223,7 +223,7 @@ func main() { }(conn) }(l) } else { - log.Println("Error creating echo server: %v\n", err) + log.Printf("Error creating echo server: %v\n", err) } sigs := make(chan os.Signal, 1) diff --git a/remote-grain-pool._go b/remote-grain-pool._go new file mode 100644 index 0000000..bc13b41 --- /dev/null +++ b/remote-grain-pool._go @@ -0,0 +1,67 @@ +// package main + +// import "sync" + +// type RemoteGrainPool struct { +// mu sync.RWMutex +// Host string +// grains map[CartId]*RemoteGrain +// } + +// func NewRemoteGrainPool(addr string) *RemoteGrainPool { +// return &RemoteGrainPool{ +// Host: addr, +// grains: make(map[CartId]*RemoteGrain), +// } +// } + +// func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain { +// p.mu.RLock() +// grain, ok := p.grains[id] +// p.mu.RUnlock() +// if !ok { +// return nil +// } +// return grain +// } + +// func (p *RemoteGrainPool) findOrCreateGrain(id CartId) (*RemoteGrain, error) { +// grain := p.findRemoteGrain(id) + +// if grain == nil { +// grain, err := NewRemoteGrain(id, p.Host) +// if err != nil { +// return nil, err +// } +// p.mu.Lock() +// p.grains[id] = grain +// p.mu.Unlock() +// } +// return grain, nil +// } + +// func (p *RemoteGrainPool) Delete(id CartId) { +// p.mu.Lock() +// delete(p.grains, id) +// p.mu.Unlock() +// } + +// func (p *RemoteGrainPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) { +// var result *FrameWithPayload +// grain, err := p.findOrCreateGrain(id) +// if err != nil { +// return nil, err +// } +// for _, message := range messages { +// result, err = grain.HandleMessage(&message, false) +// } +// return result, err +// } + +// func (p *RemoteGrainPool) Get(id CartId) (*FrameWithPayload, error) { +// grain, err := p.findOrCreateGrain(id) +// if err != nil { +// return nil, err +// } +// return grain.GetCurrentState() +// } diff --git a/remote-grain-pool.go b/remote-grain-pool.go deleted file mode 100644 index 3be28ff..0000000 --- a/remote-grain-pool.go +++ /dev/null @@ -1,67 +0,0 @@ -package main - -import "sync" - -type RemoteGrainPool struct { - mu sync.RWMutex - Host string - grains map[CartId]*RemoteGrain -} - -func NewRemoteGrainPool(addr string) *RemoteGrainPool { - return &RemoteGrainPool{ - Host: addr, - grains: make(map[CartId]*RemoteGrain), - } -} - -func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain { - p.mu.RLock() - grain, ok := p.grains[id] - p.mu.RUnlock() - if !ok { - return nil - } - return grain -} - -func (p *RemoteGrainPool) findOrCreateGrain(id CartId) (*RemoteGrain, error) { - grain := p.findRemoteGrain(id) - - if grain == nil { - grain, err := NewRemoteGrain(id, p.Host) - if err != nil { - return nil, err - } - p.mu.Lock() - p.grains[id] = grain - p.mu.Unlock() - } - return grain, nil -} - -func (p *RemoteGrainPool) Delete(id CartId) { - p.mu.Lock() - delete(p.grains, id) - p.mu.Unlock() -} - -func (p *RemoteGrainPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) { - var result *FrameWithPayload - grain, err := p.findOrCreateGrain(id) - if err != nil { - return nil, err - } - for _, message := range messages { - result, err = grain.HandleMessage(&message, false) - } - return result, err -} - -func (p *RemoteGrainPool) Get(id CartId) (*FrameWithPayload, error) { - grain, err := p.findOrCreateGrain(id) - if err != nil { - return nil, err - } - return grain.GetCurrentState() -} diff --git a/remote-grain.go b/remote-grain.go index 5dad844..e6cfffb 100644 --- a/remote-grain.go +++ b/remote-grain.go @@ -3,6 +3,8 @@ package main import ( "fmt" "strings" + + "github.com/yudhasubki/netpool" ) func (id CartId) String() string { @@ -43,12 +45,13 @@ type RemoteGrain struct { Host string } -func NewRemoteGrain(id CartId, host string) (*RemoteGrain, error) { +func NewRemoteGrain(id CartId, host string, pool netpool.Netpooler) *RemoteGrain { + addr := fmt.Sprintf("%s:1337", host) return &RemoteGrain{ Id: id, Host: host, - Connection: NewConnection(fmt.Sprintf("%s:1337", host)), - }, nil + Connection: NewConnection(addr, pool), + } } func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) (*FrameWithPayload, error) { diff --git a/remote-host.go b/remote-host.go index 385dfba..5ca06c8 100644 --- a/remote-host.go +++ b/remote-host.go @@ -4,10 +4,13 @@ import ( "fmt" "log" "strings" + + "github.com/yudhasubki/netpool" ) type RemoteHost struct { *Connection + HostPool netpool.Netpooler Host string MissedPings int } diff --git a/rpc-server.go b/rpc-server.go index ef9d419..7492a85 100644 --- a/rpc-server.go +++ b/rpc-server.go @@ -20,7 +20,7 @@ func (h *GrainHandler) GetState(id CartId, reply *Grain) error { } func NewGrainHandler(pool *GrainLocalPool, listen string) (*GrainHandler, error) { - conn := NewConnection(listen) + conn := NewConnection(listen, nil) server, err := conn.Listen() handler := &GrainHandler{ GenericListener: server, diff --git a/synced-pool.go b/synced-pool.go index 884d1ac..a1b2da4 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -3,12 +3,14 @@ package main import ( "fmt" "log" + "net" "strings" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/yudhasubki/netpool" "k8s.io/apimachinery/pkg/watch" ) @@ -154,14 +156,23 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { } go func(i CartId, h string) { - remote, err := NewRemoteGrain(i, h) - if err != nil { - log.Printf("Error creating remote grain %v", err) + var pool netpool.Netpooler + p.mu.RLock() + for _, r := range p.remotes { + if r.Host == h { + pool = r.HostPool + break + } + } + p.mu.RUnlock() + if pool == nil { + log.Printf("Error spawning remote grain, no pool for %s", h) return } + remoteGrain := NewRemoteGrain(i, h, pool) p.mu.Lock() - p.remoteIndex[i] = remote + p.remoteIndex[i] = remoteGrain p.mu.Unlock() }(id, host) } @@ -181,7 +192,7 @@ func (p *SyncedPool) HandleHostError(host string) { func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) { listen := fmt.Sprintf("%s:1338", hostname) - conn := NewConnection(listen) + conn := NewConnection(listen, nil) server, err := conn.Listen() if err != nil { return nil, err @@ -381,9 +392,17 @@ func (p *SyncedPool) AddRemote(host string) { return } - client := NewConnection(fmt.Sprintf("%s:1338", host)) + host_pool, err := netpool.New(func() (net.Conn, error) { + return net.Dial("tcp", fmt.Sprintf("%s:1338", host)) + }, netpool.WithMaxPool(256)) + + if err != nil { + log.Printf("Error creating pool: %v\n", err) + return + } + + client := NewConnection(fmt.Sprintf("%s:1338", host), host_pool) - var err error pings := 3 for pings >= 0 { _, err = client.Call(Ping, nil) @@ -397,7 +416,12 @@ func (p *SyncedPool) AddRemote(host string) { } log.Printf("Connected to remote %s", host) + cart_pool, err := netpool.New(func() (net.Conn, error) { + return net.Dial("tcp", fmt.Sprintf("%s:1337", host)) + }, netpool.WithMaxPool(512)) + remote := RemoteHost{ + HostPool: cart_pool, Connection: client, MissedPings: 0, Host: host, diff --git a/tcp-connection.go b/tcp-connection.go index 6c4cf6f..b447b0f 100644 --- a/tcp-connection.go +++ b/tcp-connection.go @@ -6,10 +6,13 @@ import ( "log" "net" "time" + + "github.com/yudhasubki/netpool" ) type Connection struct { address string + pool netpool.Netpooler count uint64 } @@ -56,9 +59,10 @@ type FrameData interface { FromBytes([]byte) error } -func NewConnection(address string) *Connection { +func NewConnection(address string, pool netpool.Netpooler) *Connection { return &Connection{ count: 0, + pool: pool, address: address, } } @@ -75,7 +79,8 @@ func SendFrame(conn net.Conn, data *FrameWithPayload) error { } func (c *Connection) CallAsync(msg FrameType, payload []byte, ch chan<- FrameWithPayload) (net.Conn, error) { - conn, err := net.Dial("tcp", c.address) + conn, err := c.pool.Get() + //conn, err := net.Dial("tcp", c.address) if err != nil { return conn, err } @@ -102,7 +107,7 @@ func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error) return nil, err } - defer conn.Close() + defer c.pool.Put(conn, nil) // conn.Close() defer close(ch) select { @@ -154,7 +159,8 @@ func (c *Connection) Listen() (*GenericListener, error) { for !ret.StopListener { connection, err := l.Accept() if err != nil { - log.Fatalf("Error accepting connection: %v\n", err) + log.Printf("Error accepting connection: %v\n", err) + continue } go ret.HandleConnection(connection) } @@ -194,12 +200,12 @@ func (l *GenericListener) HandleFrame(conn net.Conn, frame *FrameWithPayload) er defer close(resultChan) err := handler(frame, resultChan) if err != nil { - log.Fatalf("Error handling frame: %v\n", err) + log.Fatalf("Error handling frame: %s\n", err) } result := <-resultChan err = SendFrame(conn, &result) if err != nil { - log.Fatalf("Error sending frame: %v\n", err) + log.Fatalf("Error sending frame: %s\n", err) } }() } else { diff --git a/tcp-connection_test.go b/tcp-connection_test.go index a1da4af..2300330 100644 --- a/tcp-connection_test.go +++ b/tcp-connection_test.go @@ -1,16 +1,27 @@ package main import ( - "fmt" + "net" "testing" + + "github.com/yudhasubki/netpool" ) func TestGenericConnection(t *testing.T) { - conn := NewConnection("localhost:51337") - listener, err := conn.Listen() + + listenConn := NewConnection("127.0.0.1:51337", nil) + listener, err := listenConn.Listen() if err != nil { t.Errorf("Error listening: %v\n", err) } + pool, err := netpool.New(func() (net.Conn, error) { + return net.Dial("tcp", "127.0.0.1:51337") + }, netpool.WithMaxPool(512)) + if err != nil { + t.Errorf("Error creating pool: %v\n", err) + } + conn := NewConnection("127.0.0.1:51337", pool) + datta := []byte("Hello, world!") listener.AddHandler(Ping, func(input *FrameWithPayload, resultChan chan<- FrameWithPayload) error { resultChan <- MakeFrameWithPayload(Pong, 200, nil) @@ -21,7 +32,8 @@ func TestGenericConnection(t *testing.T) { return nil }) listener.AddHandler(3, func(input *FrameWithPayload, resultChan chan<- FrameWithPayload) error { - return fmt.Errorf("Error") + resultChan <- MakeFrameWithPayload(2, 200, datta) + return nil //fmt.Errorf("Error") }) r, err := conn.Call(1, datta) if err != nil { -- 2.49.1 From 933f0bb04ff533199ee852d50d9a2c231450fc8e Mon Sep 17 00:00:00 2001 From: matst80 Date: Thu, 21 Nov 2024 22:20:41 +0100 Subject: [PATCH 2/7] test connection pools --- synced-pool.go | 4 ++-- tcp-connection.go | 29 +++++++++++++++-------------- tcp-connection_test.go | 16 +++++++++++----- 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/synced-pool.go b/synced-pool.go index a1b2da4..83ed3fb 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -394,7 +394,7 @@ func (p *SyncedPool) AddRemote(host string) { host_pool, err := netpool.New(func() (net.Conn, error) { return net.Dial("tcp", fmt.Sprintf("%s:1338", host)) - }, netpool.WithMaxPool(256)) + }, netpool.WithMaxPool(256), netpool.WithMinPool(5)) if err != nil { log.Printf("Error creating pool: %v\n", err) @@ -418,7 +418,7 @@ func (p *SyncedPool) AddRemote(host string) { cart_pool, err := netpool.New(func() (net.Conn, error) { return net.Dial("tcp", fmt.Sprintf("%s:1337", host)) - }, netpool.WithMaxPool(512)) + }, netpool.WithMaxPool(1024), netpool.WithMinPool(5)) remote := RemoteHost{ HostPool: cart_pool, diff --git a/tcp-connection.go b/tcp-connection.go index 57a82f8..c8a82e8 100644 --- a/tcp-connection.go +++ b/tcp-connection.go @@ -96,7 +96,7 @@ func (c *Connection) CallAsync(msg FrameType, payload []byte, ch chan<- FrameWit }(MakeFrameWithPayload(msg, 1, payload)) c.count++ - return conn, nil + return conn, err } func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error) { @@ -107,14 +107,14 @@ func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error) return nil, err } - defer c.pool.Put(conn, nil) // conn.Close() + defer c.pool.Put(conn, err) // conn.Close() defer close(ch) select { case ret := <-ch: return &ret, nil case <-time.After(MaxCallDuration): - return nil, fmt.Errorf("timeout") + return nil, fmt.Errorf("timeout waiting for frame") } } @@ -169,22 +169,21 @@ func (c *Connection) Listen() (*GenericListener, error) { } const ( - MaxCallDuration = 500 * time.Millisecond + MaxCallDuration = 300 * time.Millisecond + ListenerKeepalive = 5 * time.Second ) func (l *GenericListener) HandleConnection(conn net.Conn) { - ch := make(chan FrameWithPayload, 1) - conn.SetReadDeadline(time.Now().Add(MaxCallDuration)) - go WaitForFrame(conn, ch) - select { - case frame := <-ch: + for !l.StopListener { + ch := make(chan FrameWithPayload, 1) + //conn.SetReadDeadline(time.Now().Add(MaxCallDuration)) + go WaitForFrame(conn, ch) + frame := <-ch err := l.HandleFrame(conn, &frame) if err != nil { log.Fatalf("Error in handler: %v\n", err) } - case <-time.After(MaxCallDuration): - close(ch) - log.Printf("Timeout waiting for frame\n") + } } @@ -200,13 +199,15 @@ func (l *GenericListener) HandleFrame(conn net.Conn, frame *FrameWithPayload) er defer close(resultChan) err := handler(frame, resultChan) if err != nil { - resultChan <- MakeFrameWithPayload(frame.Type, 500, []byte(err.Error())) + errFrame := MakeFrameWithPayload(frame.Type, 500, []byte(err.Error())) + SendFrame(conn, &errFrame) log.Printf("Handler returned error: %s", err) + return } result := <-resultChan err = SendFrame(conn, &result) if err != nil { - log.Fatalf("Error sending frame: %s\n", err) + log.Printf("Error sending frame: %s", err) } }() } else { diff --git a/tcp-connection_test.go b/tcp-connection_test.go index 2f1d9f5..7f72270 100644 --- a/tcp-connection_test.go +++ b/tcp-connection_test.go @@ -17,7 +17,7 @@ func TestGenericConnection(t *testing.T) { } pool, err := netpool.New(func() (net.Conn, error) { return net.Dial("tcp", "127.0.0.1:51337") - }, netpool.WithMaxPool(512)) + }, netpool.WithMaxPool(512), netpool.WithMinPool(5)) if err != nil { t.Errorf("Error creating pool: %v\n", err) } @@ -42,11 +42,12 @@ func TestGenericConnection(t *testing.T) { if r.Type != 2 { t.Errorf("Expected type 2, got %d\n", r.Type) } - response, err := conn.Call(Ping, nil) - if err != nil || response.StatusCode != 200 || response.Type != Pong { - t.Errorf("Error connecting to remote %v\n", response) - } + res, err := conn.Call(3, datta) + if err != nil { + t.Errorf("Did not expect error, got %v\n", err) + return + } if res.StatusCode == 200 { t.Errorf("Expected error, got %v\n", res) } @@ -65,4 +66,9 @@ func TestGenericConnection(t *testing.T) { i++ } + response, err := conn.Call(Ping, nil) + if err != nil || response.StatusCode != 200 || response.Type != Pong { + t.Errorf("Error connecting to remote %v, err: %v\n", response, err) + } + } -- 2.49.1 From c17a8a4219ed84617670c5583d23c99a4b655616 Mon Sep 17 00:00:00 2001 From: matst80 Date: Thu, 21 Nov 2024 22:27:09 +0100 Subject: [PATCH 3/7] allow longer wait --- tcp-connection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tcp-connection.go b/tcp-connection.go index c8a82e8..05e3a4a 100644 --- a/tcp-connection.go +++ b/tcp-connection.go @@ -169,7 +169,7 @@ func (c *Connection) Listen() (*GenericListener, error) { } const ( - MaxCallDuration = 300 * time.Millisecond + MaxCallDuration = 900 * time.Millisecond ListenerKeepalive = 5 * time.Second ) -- 2.49.1 From 9b137e2ff1c6bdfc4276893cf062329ecd0f3098 Mon Sep 17 00:00:00 2001 From: matst80 Date: Thu, 21 Nov 2024 22:41:49 +0100 Subject: [PATCH 4/7] update --- tcp-connection.go | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/tcp-connection.go b/tcp-connection.go index 05e3a4a..e9700ac 100644 --- a/tcp-connection.go +++ b/tcp-connection.go @@ -1,8 +1,10 @@ package main import ( + "bufio" "encoding/binary" "fmt" + "io" "log" "net" "time" @@ -130,6 +132,7 @@ func WaitForFrame(conn net.Conn, resultChan chan<- FrameWithPayload) error { payload := make([]byte, frame.Length) _, err = conn.Read(payload) if err != nil { + conn.Close() return err } resultChan <- FrameWithPayload{ @@ -169,21 +172,28 @@ func (c *Connection) Listen() (*GenericListener, error) { } const ( - MaxCallDuration = 900 * time.Millisecond + MaxCallDuration = 300 * time.Millisecond ListenerKeepalive = 5 * time.Second ) func (l *GenericListener) HandleConnection(conn net.Conn) { - for !l.StopListener { - ch := make(chan FrameWithPayload, 1) - //conn.SetReadDeadline(time.Now().Add(MaxCallDuration)) - go WaitForFrame(conn, ch) - frame := <-ch - err := l.HandleFrame(conn, &frame) - if err != nil { - log.Fatalf("Error in handler: %v\n", err) - } + var err error + var frame Frame + b := bufio.NewReader(conn) + for err != io.EOF { + err = binary.Read(b, binary.LittleEndian, &frame) + + if err == nil && frame.IsValid() { + payload := make([]byte, frame.Length) + _, err = b.Read(payload) + if err == nil { + err = l.HandleFrame(conn, &FrameWithPayload{ + Frame: frame, + Payload: payload, + }) + } + } } } -- 2.49.1 From 006d3ab4d8a0f87f1428ab865f49d7ba823a504e Mon Sep 17 00:00:00 2001 From: matst80 Date: Fri, 22 Nov 2024 08:39:01 +0100 Subject: [PATCH 5/7] lock --- synced-pool.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/synced-pool.go b/synced-pool.go index 83ed3fb..860de7c 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -335,8 +335,8 @@ func (p *SyncedPool) Negotiate() { } func (p *SyncedPool) GetHealthyRemotes() []*RemoteHost { - // p.mu.RLock() - // defer p.mu.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() remotes := make([]*RemoteHost, 0, len(p.remotes)) for _, r := range p.remotes { if r.IsHealthy() { @@ -351,10 +351,7 @@ func (p *SyncedPool) RequestOwnership(id CartId) error { all := 0 for _, r := range p.GetHealthyRemotes() { - if !r.IsHealthy() { - continue - } - //log.Printf("Asking for confirmation change of %s to %s (me) with %s\n", id, p.Hostname, r.Host) + err := r.ConfirmChange(id, p.Hostname) all++ if err != nil { -- 2.49.1 From af5b060d095d9c3d2095ae7d3f53e36c935a9573 Mon Sep 17 00:00:00 2001 From: matst80 Date: Fri, 22 Nov 2024 08:46:07 +0100 Subject: [PATCH 6/7] always wait, no timeout --- synced-pool.go | 2 +- tcp-connection.go | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/synced-pool.go b/synced-pool.go index 860de7c..3a4761b 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -391,7 +391,7 @@ func (p *SyncedPool) AddRemote(host string) { host_pool, err := netpool.New(func() (net.Conn, error) { return net.Dial("tcp", fmt.Sprintf("%s:1338", host)) - }, netpool.WithMaxPool(256), netpool.WithMinPool(5)) + }, netpool.WithMaxPool(1024), netpool.WithMinPool(5)) if err != nil { log.Printf("Error creating pool: %v\n", err) diff --git a/tcp-connection.go b/tcp-connection.go index e9700ac..0078256 100644 --- a/tcp-connection.go +++ b/tcp-connection.go @@ -112,12 +112,14 @@ func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error) defer c.pool.Put(conn, err) // conn.Close() defer close(ch) - select { - case ret := <-ch: - return &ret, nil - case <-time.After(MaxCallDuration): - return nil, fmt.Errorf("timeout waiting for frame") - } + ret := <-ch + return &ret, nil + // select { + // case ret := <-ch: + // return &ret, nil + // case <-time.After(MaxCallDuration): + // return nil, fmt.Errorf("timeout waiting for frame") + // } } func WaitForFrame(conn net.Conn, resultChan chan<- FrameWithPayload) error { -- 2.49.1 From 354964040a7c3c5359162e080fba0aada6087db7 Mon Sep 17 00:00:00 2001 From: matst80 Date: Fri, 22 Nov 2024 08:53:09 +0100 Subject: [PATCH 7/7] send close message --- main.go | 1 + synced-pool.go | 26 ++++++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/main.go b/main.go index f8fb35d..ef33359 100644 --- a/main.go +++ b/main.go @@ -233,6 +233,7 @@ func main() { go func() { sig := <-sigs fmt.Println("Shutting down due to signal:", sig) + go syncedPool.Close() app.Save() done <- true }() diff --git a/synced-pool.go b/synced-pool.go index 3a4761b..2d6bf76 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -213,6 +213,7 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) server.AddHandler(GetCartIds, pool.GetCartIdHandler) server.AddHandler(RemoteNegotiate, pool.NegotiateHandler) server.AddHandler(RemoteGrainChanged, pool.GrainOwnerChangeHandler) + server.AddHandler(Closing, pool.HostTerminatingHandler) if discovery != nil { go func() { @@ -252,6 +253,21 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) return pool, nil } +func (p *SyncedPool) HostTerminatingHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error { + log.Printf("Remote host terminating") + host := string(data.Payload) + p.mu.RLock() + defer p.mu.RUnlock() + for _, r := range p.remotes { + if r.Host == host { + go p.RemoveHost(r) + break + } + } + resultChan <- MakeFrameWithPayload(Pong, 200, []byte("ok")) + return nil +} + func (p *SyncedPool) IsHealthy() bool { for _, r := range p.remotes { if !r.IsHealthy() { @@ -310,6 +326,7 @@ const ( GetCartIds = FrameType(9) CartIdsResponse = FrameType(10) RemoteNegotiateResponse = FrameType(11) + Closing = FrameType(12) ) func (p *SyncedPool) Negotiate() { @@ -479,6 +496,15 @@ func (p *SyncedPool) getGrain(id CartId) (Grain, error) { return localGrain, nil } +func (p *SyncedPool) Close() { + p.mu.Lock() + defer p.mu.Unlock() + payload := []byte(p.Hostname) + for _, r := range p.remotes { + go r.Call(Closing, payload) + } +} + func (p *SyncedPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) { pool, err := p.getGrain(id) var res *FrameWithPayload -- 2.49.1