diff --git a/go.mod b/go.mod index 9f69bec..28dee33 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/prometheus/procfs v0.15.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/x448/float16 v0.8.4 // indirect + github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e // indirect golang.org/x/net v0.26.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect golang.org/x/sys v0.22.0 // indirect diff --git a/go.sum b/go.sum index 4cdb221..709846b 100644 --- a/go.sum +++ b/go.sum @@ -100,6 +100,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e h1:fAzVSmKQkWflN25ED65CH/C1T3iVWq2BQfN7eQsg4E4= +github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e/go.mod h1:gQsFrHrY6nviQu+VX7zKWDyhtLPNzngtYZ+C+7cywdk= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/main.go b/main.go index 7d2c717..f8fb35d 100644 --- a/main.go +++ b/main.go @@ -223,7 +223,7 @@ func main() { }(conn) }(l) } else { - log.Println("Error creating echo server: %v\n", err) + log.Printf("Error creating echo server: %v\n", err) } sigs := make(chan os.Signal, 1) diff --git a/remote-grain-pool._go b/remote-grain-pool._go new file mode 100644 index 0000000..bc13b41 --- /dev/null +++ b/remote-grain-pool._go @@ -0,0 +1,67 @@ +// package main + +// import "sync" + +// type RemoteGrainPool struct { +// mu sync.RWMutex +// Host string +// grains map[CartId]*RemoteGrain +// } + +// func NewRemoteGrainPool(addr string) *RemoteGrainPool { +// return &RemoteGrainPool{ +// Host: addr, +// grains: make(map[CartId]*RemoteGrain), +// } +// } + +// func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain { +// p.mu.RLock() +// grain, ok := p.grains[id] +// p.mu.RUnlock() +// if !ok { +// return nil +// } +// return grain +// } + +// func (p *RemoteGrainPool) findOrCreateGrain(id CartId) (*RemoteGrain, error) { +// grain := p.findRemoteGrain(id) + +// if grain == nil { +// grain, err := NewRemoteGrain(id, p.Host) +// if err != nil { +// return nil, err +// } +// p.mu.Lock() +// p.grains[id] = grain +// p.mu.Unlock() +// } +// return grain, nil +// } + +// func (p *RemoteGrainPool) Delete(id CartId) { +// p.mu.Lock() +// delete(p.grains, id) +// p.mu.Unlock() +// } + +// func (p *RemoteGrainPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) { +// var result *FrameWithPayload +// grain, err := p.findOrCreateGrain(id) +// if err != nil { +// return nil, err +// } +// for _, message := range messages { +// result, err = grain.HandleMessage(&message, false) +// } +// return result, err +// } + +// func (p *RemoteGrainPool) Get(id CartId) (*FrameWithPayload, error) { +// grain, err := p.findOrCreateGrain(id) +// if err != nil { +// return nil, err +// } +// return grain.GetCurrentState() +// } diff --git a/remote-grain-pool.go b/remote-grain-pool.go deleted file mode 100644 index 3be28ff..0000000 --- a/remote-grain-pool.go +++ /dev/null @@ -1,67 +0,0 @@ -package main - -import "sync" - -type RemoteGrainPool struct { - mu sync.RWMutex - Host string - grains map[CartId]*RemoteGrain -} - -func NewRemoteGrainPool(addr string) *RemoteGrainPool { - return &RemoteGrainPool{ - Host: addr, - grains: make(map[CartId]*RemoteGrain), - } -} - -func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain { - p.mu.RLock() - grain, ok := p.grains[id] - p.mu.RUnlock() - if !ok { - return nil - } - return grain -} - -func (p *RemoteGrainPool) findOrCreateGrain(id CartId) (*RemoteGrain, error) { - grain := p.findRemoteGrain(id) - - if grain == nil { - grain, err := NewRemoteGrain(id, p.Host) - if err != nil { - return nil, err - } - p.mu.Lock() - p.grains[id] = grain - p.mu.Unlock() - } - return grain, nil -} - -func (p *RemoteGrainPool) Delete(id CartId) { - p.mu.Lock() - delete(p.grains, id) - p.mu.Unlock() -} - -func (p *RemoteGrainPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) { - var result *FrameWithPayload - grain, err := p.findOrCreateGrain(id) - if err != nil { - return nil, err - } - for _, message := range messages { - result, err = grain.HandleMessage(&message, false) - } - return result, err -} - -func (p *RemoteGrainPool) Get(id CartId) (*FrameWithPayload, error) { - grain, err := p.findOrCreateGrain(id) - if err != nil { - return nil, err - } - return grain.GetCurrentState() -} diff --git a/remote-grain.go b/remote-grain.go index 5dad844..e6cfffb 100644 --- a/remote-grain.go +++ b/remote-grain.go @@ -3,6 +3,8 @@ package main import ( "fmt" "strings" + + "github.com/yudhasubki/netpool" ) func (id CartId) String() string { @@ -43,12 +45,13 @@ type RemoteGrain struct { Host string } -func NewRemoteGrain(id CartId, host string) (*RemoteGrain, error) { +func NewRemoteGrain(id CartId, host string, pool netpool.Netpooler) *RemoteGrain { + addr := fmt.Sprintf("%s:1337", host) return &RemoteGrain{ Id: id, Host: host, - Connection: NewConnection(fmt.Sprintf("%s:1337", host)), - }, nil + Connection: NewConnection(addr, pool), + } } func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) (*FrameWithPayload, error) { diff --git a/remote-host.go b/remote-host.go index 385dfba..5ca06c8 100644 --- a/remote-host.go +++ b/remote-host.go @@ -4,10 +4,13 @@ import ( "fmt" "log" "strings" + + "github.com/yudhasubki/netpool" ) type RemoteHost struct { *Connection + HostPool netpool.Netpooler Host string MissedPings int } diff --git a/rpc-server.go b/rpc-server.go index ef9d419..7492a85 100644 --- a/rpc-server.go +++ b/rpc-server.go @@ -20,7 +20,7 @@ func (h *GrainHandler) GetState(id CartId, reply *Grain) error { } func NewGrainHandler(pool *GrainLocalPool, listen string) (*GrainHandler, error) { - conn := NewConnection(listen) + conn := NewConnection(listen, nil) server, err := conn.Listen() handler := &GrainHandler{ GenericListener: server, diff --git a/synced-pool.go b/synced-pool.go index 884d1ac..a1b2da4 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -3,12 +3,14 @@ package main import ( "fmt" "log" + "net" "strings" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/yudhasubki/netpool" "k8s.io/apimachinery/pkg/watch" ) @@ -154,14 +156,23 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { } go func(i CartId, h string) { - remote, err := NewRemoteGrain(i, h) - if err != nil { - log.Printf("Error creating remote grain %v", err) + var pool netpool.Netpooler + p.mu.RLock() + for _, r := range p.remotes { + if r.Host == h { + pool = r.HostPool + break + } + } + p.mu.RUnlock() + if pool == nil { + log.Printf("Error spawning remote grain, no pool for %s", h) return } + remoteGrain := NewRemoteGrain(i, h, pool) p.mu.Lock() - p.remoteIndex[i] = remote + p.remoteIndex[i] = remoteGrain p.mu.Unlock() }(id, host) } @@ -181,7 +192,7 @@ func (p *SyncedPool) HandleHostError(host string) { func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) { listen := fmt.Sprintf("%s:1338", hostname) - conn := NewConnection(listen) + conn := NewConnection(listen, nil) server, err := conn.Listen() if err != nil { return nil, err @@ -381,9 +392,17 @@ func (p *SyncedPool) AddRemote(host string) { return } - client := NewConnection(fmt.Sprintf("%s:1338", host)) + host_pool, err := netpool.New(func() (net.Conn, error) { + return net.Dial("tcp", fmt.Sprintf("%s:1338", host)) + }, netpool.WithMaxPool(256)) + + if err != nil { + log.Printf("Error creating pool: %v\n", err) + return + } + + client := NewConnection(fmt.Sprintf("%s:1338", host), host_pool) - var err error pings := 3 for pings >= 0 { _, err = client.Call(Ping, nil) @@ -397,7 +416,12 @@ func (p *SyncedPool) AddRemote(host string) { } log.Printf("Connected to remote %s", host) + cart_pool, err := netpool.New(func() (net.Conn, error) { + return net.Dial("tcp", fmt.Sprintf("%s:1337", host)) + }, netpool.WithMaxPool(512)) + remote := RemoteHost{ + HostPool: cart_pool, Connection: client, MissedPings: 0, Host: host, diff --git a/tcp-connection.go b/tcp-connection.go index 6c4cf6f..b447b0f 100644 --- a/tcp-connection.go +++ b/tcp-connection.go @@ -6,10 +6,13 @@ import ( "log" "net" "time" + + "github.com/yudhasubki/netpool" ) type Connection struct { address string + pool netpool.Netpooler count uint64 } @@ -56,9 +59,10 @@ type FrameData interface { FromBytes([]byte) error } -func NewConnection(address string) *Connection { +func NewConnection(address string, pool netpool.Netpooler) *Connection { return &Connection{ count: 0, + pool: pool, address: address, } } @@ -75,7 +79,8 @@ func SendFrame(conn net.Conn, data *FrameWithPayload) error { } func (c *Connection) CallAsync(msg FrameType, payload []byte, ch chan<- FrameWithPayload) (net.Conn, error) { - conn, err := net.Dial("tcp", c.address) + conn, err := c.pool.Get() + //conn, err := net.Dial("tcp", c.address) if err != nil { return conn, err } @@ -102,7 +107,7 @@ func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error) return nil, err } - defer conn.Close() + defer c.pool.Put(conn, nil) // conn.Close() defer close(ch) select { @@ -154,7 +159,8 @@ func (c *Connection) Listen() (*GenericListener, error) { for !ret.StopListener { connection, err := l.Accept() if err != nil { - log.Fatalf("Error accepting connection: %v\n", err) + log.Printf("Error accepting connection: %v\n", err) + continue } go ret.HandleConnection(connection) } @@ -194,12 +200,12 @@ func (l *GenericListener) HandleFrame(conn net.Conn, frame *FrameWithPayload) er defer close(resultChan) err := handler(frame, resultChan) if err != nil { - log.Fatalf("Error handling frame: %v\n", err) + log.Fatalf("Error handling frame: %s\n", err) } result := <-resultChan err = SendFrame(conn, &result) if err != nil { - log.Fatalf("Error sending frame: %v\n", err) + log.Fatalf("Error sending frame: %s\n", err) } }() } else { diff --git a/tcp-connection_test.go b/tcp-connection_test.go index a1da4af..2300330 100644 --- a/tcp-connection_test.go +++ b/tcp-connection_test.go @@ -1,16 +1,27 @@ package main import ( - "fmt" + "net" "testing" + + "github.com/yudhasubki/netpool" ) func TestGenericConnection(t *testing.T) { - conn := NewConnection("localhost:51337") - listener, err := conn.Listen() + + listenConn := NewConnection("127.0.0.1:51337", nil) + listener, err := listenConn.Listen() if err != nil { t.Errorf("Error listening: %v\n", err) } + pool, err := netpool.New(func() (net.Conn, error) { + return net.Dial("tcp", "127.0.0.1:51337") + }, netpool.WithMaxPool(512)) + if err != nil { + t.Errorf("Error creating pool: %v\n", err) + } + conn := NewConnection("127.0.0.1:51337", pool) + datta := []byte("Hello, world!") listener.AddHandler(Ping, func(input *FrameWithPayload, resultChan chan<- FrameWithPayload) error { resultChan <- MakeFrameWithPayload(Pong, 200, nil) @@ -21,7 +32,8 @@ func TestGenericConnection(t *testing.T) { return nil }) listener.AddHandler(3, func(input *FrameWithPayload, resultChan chan<- FrameWithPayload) error { - return fmt.Errorf("Error") + resultChan <- MakeFrameWithPayload(2, 200, datta) + return nil //fmt.Errorf("Error") }) r, err := conn.Call(1, datta) if err != nil {