diff --git a/cart-packet-queue.go b/cart-packet-queue.go index 384cae4..e51cb25 100644 --- a/cart-packet-queue.go +++ b/cart-packet-queue.go @@ -18,22 +18,22 @@ type CartPacketWithData struct { } type CartPacketQueue struct { - mu sync.RWMutex - Packets []CartPacketWithData - connection net.Conn + mu sync.RWMutex + Packets []CartPacketWithData + //connection net.Conn } func NewCartPacketQueue(connection net.Conn) *CartPacketQueue { queue := &CartPacketQueue{ - Packets: make([]CartPacketWithData, 0), - connection: connection, + Packets: make([]CartPacketWithData, 0), + //connection: connection, } go func() { defer connection.Close() var packet CartPacket for { - err := ReadPacket(queue.connection, &packet) + err := ReadPacket(connection, &packet) ts := time.Now() if err != nil { @@ -44,7 +44,7 @@ func NewCartPacketQueue(connection net.Conn) *CartPacketQueue { //return } - data, err := GetPacketData(queue.connection, int(packet.DataLength)) + data, err := GetPacketData(connection, int(packet.DataLength)) if err != nil { log.Printf("Error receiving packet data: %v\n", err) return diff --git a/packet-queue.go b/packet-queue.go index 79712c0..2bd6fbd 100644 --- a/packet-queue.go +++ b/packet-queue.go @@ -17,22 +17,22 @@ type PacketWithData struct { } type PacketQueue struct { - mu sync.RWMutex - Packets []PacketWithData - connection net.Conn + mu sync.RWMutex + Packets []PacketWithData + //connection net.Conn } func NewPacketQueue(connection net.Conn) *PacketQueue { queue := &PacketQueue{ - Packets: make([]PacketWithData, 0), - connection: connection, + Packets: make([]PacketWithData, 0), + //connection: connection, } go func() { defer connection.Close() var packet Packet for { - err := ReadPacket(queue.connection, &packet) + err := ReadPacket(connection, &packet) ts := time.Now() if err != nil { @@ -42,10 +42,9 @@ func NewPacketQueue(connection net.Conn) *PacketQueue { log.Printf("Error receiving packet: %v\n", err) //return } - data, err := GetPacketData(queue.connection, int(packet.DataLength)) + data, err := GetPacketData(connection, int(packet.DataLength)) if err != nil { log.Printf("Error receiving packet data: %v\n", err) - return } queue.mu.Lock() diff --git a/synced-pool.go b/synced-pool.go index 2e7f52a..c37a74b 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -21,7 +21,7 @@ type RemoteHost struct { *Client Host string MissedPings int - Pool *RemoteGrainPool + //Pool *RemoteGrainPool } type SyncedPool struct { @@ -358,33 +358,38 @@ func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) error { return nil } -func (p *SyncedPool) AddRemote(address string) error { - if address == "" || p.IsKnown(address) { +func (p *SyncedPool) AddRemote(host string) error { + if host == "" || p.IsKnown(host) { return nil } - client, err := Dial(fmt.Sprintf("%s:1338", address)) + client, err := Dial(fmt.Sprintf("%s:1338", host)) + if err != nil { + log.Printf("Error connecting to remote %s: %v\n", host, err) + } + _, err = client.Call(Ping, Pong, nil) if err != nil { - log.Printf("Error connecting to remote %s: %v\n", address, err) + log.Printf("Error pinging remote %s: %v\n", host, err) return err } - pool := NewRemoteGrainPool(address) + //pool := NewRemoteGrainPool(host) remote := RemoteHost{ Client: client, - Pool: pool, - Host: address, + // Pool: pool, + Host: host, } go func() { for range client.Errors { if client.ErrorCount > 3 { + log.Printf("Error count exceeded, removing remote %s\n", host) p.RemoveHost(&remote) } } }() - return p.addRemoteHost(address, &remote) + return p.addRemoteHost(host, &remote) } func (p *SyncedPool) getGrain(id CartId) (Grain, error) { diff --git a/tcp-cart-client.go b/tcp-cart-client.go index 94f1868..c2f13a2 100644 --- a/tcp-cart-client.go +++ b/tcp-cart-client.go @@ -2,7 +2,6 @@ package main import ( "encoding/binary" - "io" "net" "time" ) @@ -83,13 +82,13 @@ func (m *CartTCPClient) SendPacket(messageType uint16, id CartId, data []byte) e return err } -func (m *CartTCPClient) SendPacketFn(messageType uint16, id CartId, datafn func(w io.Writer) error) error { - data, err := GetData(datafn) - if err != nil { - return err - } - return m.SendPacket(messageType, id, data) -} +// func (m *CartTCPClient) SendPacketFn(messageType uint16, id CartId, datafn func(w io.Writer) error) error { +// data, err := GetData(datafn) +// if err != nil { +// return err +// } +// return m.SendPacket(messageType, id, data) +// } func (m *CartTCPClient) Call(messageType uint16, id CartId, responseType uint16, data []byte) ([]byte, error) { err := m.SendPacket(messageType, id, data) diff --git a/tcp-client.go b/tcp-client.go index 24c4545..de6cd2a 100644 --- a/tcp-client.go +++ b/tcp-client.go @@ -2,7 +2,6 @@ package main import ( "encoding/binary" - "io" "net" "time" ) @@ -82,13 +81,13 @@ func (m *TCPClient) SendPacket(messageType uint16, data []byte) error { return err } -func (m *TCPClient) SendPacketFn(messageType uint16, datafn func(w io.Writer) error) error { - data, err := GetData(datafn) - if err != nil { - return err - } - return m.SendPacket(messageType, data) -} +// func (m *TCPClient) SendPacketFn(messageType uint16, datafn func(w io.Writer) error) error { +// data, err := GetData(datafn) +// if err != nil { +// return err +// } +// return m.SendPacket(messageType, data) +// } func (m *TCPClient) Call(messageType uint16, responseType uint16, data []byte) ([]byte, error) { err := m.SendPacket(messageType, data)