diff --git a/synced-pool.go b/synced-pool.go index a1b2da4..83ed3fb 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -394,7 +394,7 @@ func (p *SyncedPool) AddRemote(host string) { host_pool, err := netpool.New(func() (net.Conn, error) { return net.Dial("tcp", fmt.Sprintf("%s:1338", host)) - }, netpool.WithMaxPool(256)) + }, netpool.WithMaxPool(256), netpool.WithMinPool(5)) if err != nil { log.Printf("Error creating pool: %v\n", err) @@ -418,7 +418,7 @@ func (p *SyncedPool) AddRemote(host string) { cart_pool, err := netpool.New(func() (net.Conn, error) { return net.Dial("tcp", fmt.Sprintf("%s:1337", host)) - }, netpool.WithMaxPool(512)) + }, netpool.WithMaxPool(1024), netpool.WithMinPool(5)) remote := RemoteHost{ HostPool: cart_pool, diff --git a/tcp-connection.go b/tcp-connection.go index 57a82f8..c8a82e8 100644 --- a/tcp-connection.go +++ b/tcp-connection.go @@ -96,7 +96,7 @@ func (c *Connection) CallAsync(msg FrameType, payload []byte, ch chan<- FrameWit }(MakeFrameWithPayload(msg, 1, payload)) c.count++ - return conn, nil + return conn, err } func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error) { @@ -107,14 +107,14 @@ func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error) return nil, err } - defer c.pool.Put(conn, nil) // conn.Close() + defer c.pool.Put(conn, err) // conn.Close() defer close(ch) select { case ret := <-ch: return &ret, nil case <-time.After(MaxCallDuration): - return nil, fmt.Errorf("timeout") + return nil, fmt.Errorf("timeout waiting for frame") } } @@ -169,22 +169,21 @@ func (c *Connection) Listen() (*GenericListener, error) { } const ( - MaxCallDuration = 500 * time.Millisecond + MaxCallDuration = 300 * time.Millisecond + ListenerKeepalive = 5 * time.Second ) func (l *GenericListener) HandleConnection(conn net.Conn) { - ch := make(chan FrameWithPayload, 1) - conn.SetReadDeadline(time.Now().Add(MaxCallDuration)) - go WaitForFrame(conn, ch) - select { - case frame := <-ch: + for !l.StopListener { + ch := make(chan FrameWithPayload, 1) + //conn.SetReadDeadline(time.Now().Add(MaxCallDuration)) + go WaitForFrame(conn, ch) + frame := <-ch err := l.HandleFrame(conn, &frame) if err != nil { log.Fatalf("Error in handler: %v\n", err) } - case <-time.After(MaxCallDuration): - close(ch) - log.Printf("Timeout waiting for frame\n") + } } @@ -200,13 +199,15 @@ func (l *GenericListener) HandleFrame(conn net.Conn, frame *FrameWithPayload) er defer close(resultChan) err := handler(frame, resultChan) if err != nil { - resultChan <- MakeFrameWithPayload(frame.Type, 500, []byte(err.Error())) + errFrame := MakeFrameWithPayload(frame.Type, 500, []byte(err.Error())) + SendFrame(conn, &errFrame) log.Printf("Handler returned error: %s", err) + return } result := <-resultChan err = SendFrame(conn, &result) if err != nil { - log.Fatalf("Error sending frame: %s\n", err) + log.Printf("Error sending frame: %s", err) } }() } else { diff --git a/tcp-connection_test.go b/tcp-connection_test.go index 2f1d9f5..7f72270 100644 --- a/tcp-connection_test.go +++ b/tcp-connection_test.go @@ -17,7 +17,7 @@ func TestGenericConnection(t *testing.T) { } pool, err := netpool.New(func() (net.Conn, error) { return net.Dial("tcp", "127.0.0.1:51337") - }, netpool.WithMaxPool(512)) + }, netpool.WithMaxPool(512), netpool.WithMinPool(5)) if err != nil { t.Errorf("Error creating pool: %v\n", err) } @@ -42,11 +42,12 @@ func TestGenericConnection(t *testing.T) { if r.Type != 2 { t.Errorf("Expected type 2, got %d\n", r.Type) } - response, err := conn.Call(Ping, nil) - if err != nil || response.StatusCode != 200 || response.Type != Pong { - t.Errorf("Error connecting to remote %v\n", response) - } + res, err := conn.Call(3, datta) + if err != nil { + t.Errorf("Did not expect error, got %v\n", err) + return + } if res.StatusCode == 200 { t.Errorf("Expected error, got %v\n", res) } @@ -65,4 +66,9 @@ func TestGenericConnection(t *testing.T) { i++ } + response, err := conn.Call(Ping, nil) + if err != nil || response.StatusCode != 200 || response.Type != Pong { + t.Errorf("Error connecting to remote %v, err: %v\n", response, err) + } + }