test/net-pool #2

Merged
mats merged 11 commits from test/net-pool into main 2024-11-23 15:30:38 +01:00
3 changed files with 28 additions and 21 deletions
Showing only changes of commit 933f0bb04f - Show all commits

View File

@@ -394,7 +394,7 @@ func (p *SyncedPool) AddRemote(host string) {
host_pool, err := netpool.New(func() (net.Conn, error) { host_pool, err := netpool.New(func() (net.Conn, error) {
return net.Dial("tcp", fmt.Sprintf("%s:1338", host)) return net.Dial("tcp", fmt.Sprintf("%s:1338", host))
}, netpool.WithMaxPool(256)) }, netpool.WithMaxPool(256), netpool.WithMinPool(5))
if err != nil { if err != nil {
log.Printf("Error creating pool: %v\n", err) log.Printf("Error creating pool: %v\n", err)
@@ -418,7 +418,7 @@ func (p *SyncedPool) AddRemote(host string) {
cart_pool, err := netpool.New(func() (net.Conn, error) { cart_pool, err := netpool.New(func() (net.Conn, error) {
return net.Dial("tcp", fmt.Sprintf("%s:1337", host)) return net.Dial("tcp", fmt.Sprintf("%s:1337", host))
}, netpool.WithMaxPool(512)) }, netpool.WithMaxPool(1024), netpool.WithMinPool(5))
remote := RemoteHost{ remote := RemoteHost{
HostPool: cart_pool, HostPool: cart_pool,

View File

@@ -96,7 +96,7 @@ func (c *Connection) CallAsync(msg FrameType, payload []byte, ch chan<- FrameWit
}(MakeFrameWithPayload(msg, 1, payload)) }(MakeFrameWithPayload(msg, 1, payload))
c.count++ c.count++
return conn, nil return conn, err
} }
func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error) { func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error) {
@@ -107,14 +107,14 @@ func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error)
return nil, err return nil, err
} }
defer c.pool.Put(conn, nil) // conn.Close() defer c.pool.Put(conn, err) // conn.Close()
defer close(ch) defer close(ch)
select { select {
case ret := <-ch: case ret := <-ch:
return &ret, nil return &ret, nil
case <-time.After(MaxCallDuration): case <-time.After(MaxCallDuration):
return nil, fmt.Errorf("timeout") return nil, fmt.Errorf("timeout waiting for frame")
} }
} }
@@ -169,22 +169,21 @@ func (c *Connection) Listen() (*GenericListener, error) {
} }
const ( const (
MaxCallDuration = 500 * time.Millisecond MaxCallDuration = 300 * time.Millisecond
ListenerKeepalive = 5 * time.Second
) )
func (l *GenericListener) HandleConnection(conn net.Conn) { func (l *GenericListener) HandleConnection(conn net.Conn) {
for !l.StopListener {
ch := make(chan FrameWithPayload, 1) ch := make(chan FrameWithPayload, 1)
conn.SetReadDeadline(time.Now().Add(MaxCallDuration)) //conn.SetReadDeadline(time.Now().Add(MaxCallDuration))
go WaitForFrame(conn, ch) go WaitForFrame(conn, ch)
select { frame := <-ch
case frame := <-ch:
err := l.HandleFrame(conn, &frame) err := l.HandleFrame(conn, &frame)
if err != nil { if err != nil {
log.Fatalf("Error in handler: %v\n", err) log.Fatalf("Error in handler: %v\n", err)
} }
case <-time.After(MaxCallDuration):
close(ch)
log.Printf("Timeout waiting for frame\n")
} }
} }
@@ -200,13 +199,15 @@ func (l *GenericListener) HandleFrame(conn net.Conn, frame *FrameWithPayload) er
defer close(resultChan) defer close(resultChan)
err := handler(frame, resultChan) err := handler(frame, resultChan)
if err != nil { if err != nil {
resultChan <- MakeFrameWithPayload(frame.Type, 500, []byte(err.Error())) errFrame := MakeFrameWithPayload(frame.Type, 500, []byte(err.Error()))
SendFrame(conn, &errFrame)
log.Printf("Handler returned error: %s", err) log.Printf("Handler returned error: %s", err)
return
} }
result := <-resultChan result := <-resultChan
err = SendFrame(conn, &result) err = SendFrame(conn, &result)
if err != nil { if err != nil {
log.Fatalf("Error sending frame: %s\n", err) log.Printf("Error sending frame: %s", err)
} }
}() }()
} else { } else {

View File

@@ -17,7 +17,7 @@ func TestGenericConnection(t *testing.T) {
} }
pool, err := netpool.New(func() (net.Conn, error) { pool, err := netpool.New(func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:51337") return net.Dial("tcp", "127.0.0.1:51337")
}, netpool.WithMaxPool(512)) }, netpool.WithMaxPool(512), netpool.WithMinPool(5))
if err != nil { if err != nil {
t.Errorf("Error creating pool: %v\n", err) t.Errorf("Error creating pool: %v\n", err)
} }
@@ -42,11 +42,12 @@ func TestGenericConnection(t *testing.T) {
if r.Type != 2 { if r.Type != 2 {
t.Errorf("Expected type 2, got %d\n", r.Type) t.Errorf("Expected type 2, got %d\n", r.Type)
} }
response, err := conn.Call(Ping, nil)
if err != nil || response.StatusCode != 200 || response.Type != Pong {
t.Errorf("Error connecting to remote %v\n", response)
}
res, err := conn.Call(3, datta) res, err := conn.Call(3, datta)
if err != nil {
t.Errorf("Did not expect error, got %v\n", err)
return
}
if res.StatusCode == 200 { if res.StatusCode == 200 {
t.Errorf("Expected error, got %v\n", res) t.Errorf("Expected error, got %v\n", res)
} }
@@ -65,4 +66,9 @@ func TestGenericConnection(t *testing.T) {
i++ i++
} }
response, err := conn.Call(Ping, nil)
if err != nil || response.StatusCode != 200 || response.Type != Pong {
t.Errorf("Error connecting to remote %v, err: %v\n", response, err)
}
} }