test/net-pool #2
1
go.mod
1
go.mod
@@ -41,6 +41,7 @@ require (
|
||||
github.com/prometheus/procfs v0.15.1 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/x448/float16 v0.8.4 // indirect
|
||||
github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e // indirect
|
||||
golang.org/x/net v0.26.0 // indirect
|
||||
golang.org/x/oauth2 v0.21.0 // indirect
|
||||
golang.org/x/sys v0.22.0 // indirect
|
||||
|
||||
2
go.sum
2
go.sum
@@ -100,6 +100,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
||||
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
|
||||
github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e h1:fAzVSmKQkWflN25ED65CH/C1T3iVWq2BQfN7eQsg4E4=
|
||||
github.com/yudhasubki/netpool v0.0.0-20230717065341-3c1353ca328e/go.mod h1:gQsFrHrY6nviQu+VX7zKWDyhtLPNzngtYZ+C+7cywdk=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
|
||||
2
main.go
2
main.go
@@ -223,7 +223,7 @@ func main() {
|
||||
}(conn)
|
||||
}(l)
|
||||
} else {
|
||||
log.Println("Error creating echo server: %v\n", err)
|
||||
log.Printf("Error creating echo server: %v\n", err)
|
||||
}
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
|
||||
67
remote-grain-pool._go
Normal file
67
remote-grain-pool._go
Normal file
@@ -0,0 +1,67 @@
|
||||
// package main
|
||||
|
||||
// import "sync"
|
||||
|
||||
// type RemoteGrainPool struct {
|
||||
// mu sync.RWMutex
|
||||
// Host string
|
||||
// grains map[CartId]*RemoteGrain
|
||||
// }
|
||||
|
||||
// func NewRemoteGrainPool(addr string) *RemoteGrainPool {
|
||||
// return &RemoteGrainPool{
|
||||
// Host: addr,
|
||||
// grains: make(map[CartId]*RemoteGrain),
|
||||
// }
|
||||
// }
|
||||
|
||||
// func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain {
|
||||
// p.mu.RLock()
|
||||
// grain, ok := p.grains[id]
|
||||
// p.mu.RUnlock()
|
||||
// if !ok {
|
||||
// return nil
|
||||
// }
|
||||
// return grain
|
||||
// }
|
||||
|
||||
// func (p *RemoteGrainPool) findOrCreateGrain(id CartId) (*RemoteGrain, error) {
|
||||
// grain := p.findRemoteGrain(id)
|
||||
|
||||
// if grain == nil {
|
||||
// grain, err := NewRemoteGrain(id, p.Host)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// p.mu.Lock()
|
||||
// p.grains[id] = grain
|
||||
// p.mu.Unlock()
|
||||
// }
|
||||
// return grain, nil
|
||||
// }
|
||||
|
||||
// func (p *RemoteGrainPool) Delete(id CartId) {
|
||||
// p.mu.Lock()
|
||||
// delete(p.grains, id)
|
||||
// p.mu.Unlock()
|
||||
// }
|
||||
|
||||
// func (p *RemoteGrainPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) {
|
||||
// var result *FrameWithPayload
|
||||
// grain, err := p.findOrCreateGrain(id)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// for _, message := range messages {
|
||||
// result, err = grain.HandleMessage(&message, false)
|
||||
// }
|
||||
// return result, err
|
||||
// }
|
||||
|
||||
// func (p *RemoteGrainPool) Get(id CartId) (*FrameWithPayload, error) {
|
||||
// grain, err := p.findOrCreateGrain(id)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return grain.GetCurrentState()
|
||||
// }
|
||||
@@ -1,67 +0,0 @@
|
||||
package main
|
||||
|
||||
import "sync"
|
||||
|
||||
type RemoteGrainPool struct {
|
||||
mu sync.RWMutex
|
||||
Host string
|
||||
grains map[CartId]*RemoteGrain
|
||||
}
|
||||
|
||||
func NewRemoteGrainPool(addr string) *RemoteGrainPool {
|
||||
return &RemoteGrainPool{
|
||||
Host: addr,
|
||||
grains: make(map[CartId]*RemoteGrain),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain {
|
||||
p.mu.RLock()
|
||||
grain, ok := p.grains[id]
|
||||
p.mu.RUnlock()
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return grain
|
||||
}
|
||||
|
||||
func (p *RemoteGrainPool) findOrCreateGrain(id CartId) (*RemoteGrain, error) {
|
||||
grain := p.findRemoteGrain(id)
|
||||
|
||||
if grain == nil {
|
||||
grain, err := NewRemoteGrain(id, p.Host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.mu.Lock()
|
||||
p.grains[id] = grain
|
||||
p.mu.Unlock()
|
||||
}
|
||||
return grain, nil
|
||||
}
|
||||
|
||||
func (p *RemoteGrainPool) Delete(id CartId) {
|
||||
p.mu.Lock()
|
||||
delete(p.grains, id)
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
func (p *RemoteGrainPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) {
|
||||
var result *FrameWithPayload
|
||||
grain, err := p.findOrCreateGrain(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, message := range messages {
|
||||
result, err = grain.HandleMessage(&message, false)
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (p *RemoteGrainPool) Get(id CartId) (*FrameWithPayload, error) {
|
||||
grain, err := p.findOrCreateGrain(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return grain.GetCurrentState()
|
||||
}
|
||||
@@ -3,6 +3,8 @@ package main
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/yudhasubki/netpool"
|
||||
)
|
||||
|
||||
func (id CartId) String() string {
|
||||
@@ -43,12 +45,13 @@ type RemoteGrain struct {
|
||||
Host string
|
||||
}
|
||||
|
||||
func NewRemoteGrain(id CartId, host string) (*RemoteGrain, error) {
|
||||
func NewRemoteGrain(id CartId, host string, pool netpool.Netpooler) *RemoteGrain {
|
||||
addr := fmt.Sprintf("%s:1337", host)
|
||||
return &RemoteGrain{
|
||||
Id: id,
|
||||
Host: host,
|
||||
Connection: NewConnection(fmt.Sprintf("%s:1337", host)),
|
||||
}, nil
|
||||
Connection: NewConnection(addr, pool),
|
||||
}
|
||||
}
|
||||
|
||||
func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) (*FrameWithPayload, error) {
|
||||
|
||||
@@ -4,10 +4,13 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"github.com/yudhasubki/netpool"
|
||||
)
|
||||
|
||||
type RemoteHost struct {
|
||||
*Connection
|
||||
HostPool netpool.Netpooler
|
||||
Host string
|
||||
MissedPings int
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ func (h *GrainHandler) GetState(id CartId, reply *Grain) error {
|
||||
}
|
||||
|
||||
func NewGrainHandler(pool *GrainLocalPool, listen string) (*GrainHandler, error) {
|
||||
conn := NewConnection(listen)
|
||||
conn := NewConnection(listen, nil)
|
||||
server, err := conn.Listen()
|
||||
handler := &GrainHandler{
|
||||
GenericListener: server,
|
||||
|
||||
@@ -3,12 +3,14 @@ package main
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/yudhasubki/netpool"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
@@ -154,14 +156,23 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
|
||||
}
|
||||
|
||||
go func(i CartId, h string) {
|
||||
remote, err := NewRemoteGrain(i, h)
|
||||
if err != nil {
|
||||
log.Printf("Error creating remote grain %v", err)
|
||||
var pool netpool.Netpooler
|
||||
p.mu.RLock()
|
||||
for _, r := range p.remotes {
|
||||
if r.Host == h {
|
||||
pool = r.HostPool
|
||||
break
|
||||
}
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
if pool == nil {
|
||||
log.Printf("Error spawning remote grain, no pool for %s", h)
|
||||
return
|
||||
}
|
||||
remoteGrain := NewRemoteGrain(i, h, pool)
|
||||
|
||||
p.mu.Lock()
|
||||
p.remoteIndex[i] = remote
|
||||
p.remoteIndex[i] = remoteGrain
|
||||
p.mu.Unlock()
|
||||
}(id, host)
|
||||
}
|
||||
@@ -181,7 +192,7 @@ func (p *SyncedPool) HandleHostError(host string) {
|
||||
|
||||
func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) {
|
||||
listen := fmt.Sprintf("%s:1338", hostname)
|
||||
conn := NewConnection(listen)
|
||||
conn := NewConnection(listen, nil)
|
||||
server, err := conn.Listen()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -381,9 +392,17 @@ func (p *SyncedPool) AddRemote(host string) {
|
||||
return
|
||||
}
|
||||
|
||||
client := NewConnection(fmt.Sprintf("%s:1338", host))
|
||||
host_pool, err := netpool.New(func() (net.Conn, error) {
|
||||
return net.Dial("tcp", fmt.Sprintf("%s:1338", host))
|
||||
}, netpool.WithMaxPool(256))
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Error creating pool: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
client := NewConnection(fmt.Sprintf("%s:1338", host), host_pool)
|
||||
|
||||
var err error
|
||||
pings := 3
|
||||
for pings >= 0 {
|
||||
_, err = client.Call(Ping, nil)
|
||||
@@ -397,7 +416,12 @@ func (p *SyncedPool) AddRemote(host string) {
|
||||
}
|
||||
log.Printf("Connected to remote %s", host)
|
||||
|
||||
cart_pool, err := netpool.New(func() (net.Conn, error) {
|
||||
return net.Dial("tcp", fmt.Sprintf("%s:1337", host))
|
||||
}, netpool.WithMaxPool(512))
|
||||
|
||||
remote := RemoteHost{
|
||||
HostPool: cart_pool,
|
||||
Connection: client,
|
||||
MissedPings: 0,
|
||||
Host: host,
|
||||
|
||||
@@ -6,10 +6,13 @@ import (
|
||||
"log"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/yudhasubki/netpool"
|
||||
)
|
||||
|
||||
type Connection struct {
|
||||
address string
|
||||
pool netpool.Netpooler
|
||||
count uint64
|
||||
}
|
||||
|
||||
@@ -56,9 +59,10 @@ type FrameData interface {
|
||||
FromBytes([]byte) error
|
||||
}
|
||||
|
||||
func NewConnection(address string) *Connection {
|
||||
func NewConnection(address string, pool netpool.Netpooler) *Connection {
|
||||
return &Connection{
|
||||
count: 0,
|
||||
pool: pool,
|
||||
address: address,
|
||||
}
|
||||
}
|
||||
@@ -75,7 +79,8 @@ func SendFrame(conn net.Conn, data *FrameWithPayload) error {
|
||||
}
|
||||
|
||||
func (c *Connection) CallAsync(msg FrameType, payload []byte, ch chan<- FrameWithPayload) (net.Conn, error) {
|
||||
conn, err := net.Dial("tcp", c.address)
|
||||
conn, err := c.pool.Get()
|
||||
//conn, err := net.Dial("tcp", c.address)
|
||||
if err != nil {
|
||||
return conn, err
|
||||
}
|
||||
@@ -102,7 +107,7 @@ func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer conn.Close()
|
||||
defer c.pool.Put(conn, nil) // conn.Close()
|
||||
defer close(ch)
|
||||
|
||||
select {
|
||||
@@ -154,7 +159,8 @@ func (c *Connection) Listen() (*GenericListener, error) {
|
||||
for !ret.StopListener {
|
||||
connection, err := l.Accept()
|
||||
if err != nil {
|
||||
log.Fatalf("Error accepting connection: %v\n", err)
|
||||
log.Printf("Error accepting connection: %v\n", err)
|
||||
continue
|
||||
}
|
||||
go ret.HandleConnection(connection)
|
||||
}
|
||||
@@ -194,12 +200,12 @@ func (l *GenericListener) HandleFrame(conn net.Conn, frame *FrameWithPayload) er
|
||||
defer close(resultChan)
|
||||
err := handler(frame, resultChan)
|
||||
if err != nil {
|
||||
log.Fatalf("Error handling frame: %v\n", err)
|
||||
log.Fatalf("Error handling frame: %s\n", err)
|
||||
}
|
||||
result := <-resultChan
|
||||
err = SendFrame(conn, &result)
|
||||
if err != nil {
|
||||
log.Fatalf("Error sending frame: %v\n", err)
|
||||
log.Fatalf("Error sending frame: %s\n", err)
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
|
||||
@@ -1,16 +1,27 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/yudhasubki/netpool"
|
||||
)
|
||||
|
||||
func TestGenericConnection(t *testing.T) {
|
||||
conn := NewConnection("localhost:51337")
|
||||
listener, err := conn.Listen()
|
||||
|
||||
listenConn := NewConnection("127.0.0.1:51337", nil)
|
||||
listener, err := listenConn.Listen()
|
||||
if err != nil {
|
||||
t.Errorf("Error listening: %v\n", err)
|
||||
}
|
||||
pool, err := netpool.New(func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:51337")
|
||||
}, netpool.WithMaxPool(512))
|
||||
if err != nil {
|
||||
t.Errorf("Error creating pool: %v\n", err)
|
||||
}
|
||||
conn := NewConnection("127.0.0.1:51337", pool)
|
||||
|
||||
datta := []byte("Hello, world!")
|
||||
listener.AddHandler(Ping, func(input *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
|
||||
resultChan <- MakeFrameWithPayload(Pong, 200, nil)
|
||||
@@ -21,7 +32,8 @@ func TestGenericConnection(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
listener.AddHandler(3, func(input *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
|
||||
return fmt.Errorf("Error")
|
||||
resultChan <- MakeFrameWithPayload(2, 200, datta)
|
||||
return nil //fmt.Errorf("Error")
|
||||
})
|
||||
r, err := conn.Call(1, datta)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user