diff --git a/synced-pool.go b/synced-pool.go index 50716e5..76b0b3d 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -188,7 +188,7 @@ func (p *SyncedPool) handleConnection(conn net.Conn) { log.Printf("Negotiated with remote, found %v hosts\n", knownHosts) for _, h := range knownHosts { - err = p.AddRemote(h) + err = p.AddRemoteWithConnection(h, conn) if err != nil { log.Printf("Error adding remote %s: %v\n", h, err) } @@ -230,14 +230,14 @@ func (p *SyncedPool) handleConnection(conn net.Conn) { if !found { log.Printf("Remote host %s not found\n", idAndHostParts[1]) log.Printf("Remotes %v\n", p.remotes) - SendPacket(conn, AckError, func(w io.Writer) error { + err = SendPacket(conn, AckError, func(w io.Writer) error { w.Write([]byte("remote host not found")) return nil }) } else { - SendPacket(conn, AckChange, func(w io.Writer) error { - w.Write([]byte("ok")) - return nil + err = SendPacket(conn, AckChange, func(w io.Writer) error { + _, err := w.Write([]byte("ok")) + return err }) } } @@ -280,9 +280,8 @@ func (p *SyncedPool) Negotiate(knownHosts []string) ([]string, error) { func (r *RemoteHost) ConfirmChange(id CartId, host string) error { SendPacket(r.connection, RemoteGrainChanged, func(w io.Writer) error { - - w.Write([]byte(fmt.Sprintf("%s;%s", id, host))) - return nil + _, err := w.Write([]byte(fmt.Sprintf("%s;%s", id, host))) + return err }) t, data, err := ReceivePacket(r.connection) if err != nil { @@ -309,15 +308,36 @@ func (p *SyncedPool) OwnerChanged(id CartId, host string) error { return nil } -func (p *SyncedPool) AddRemote(address string) error { +func (p *SyncedPool) AddRemoteWithConnection(address string, connection net.Conn) error { + pool := NewRemoteGrainPool(fmt.Sprintf(address, 1337)) + remote := RemoteHost{ + connection: connection, + Pool: pool, + Host: address, + } + return p.addRemoteHost(address, remote) +} + +func (p *SyncedPool) addRemoteHost(address string, remote RemoteHost) error { for _, r := range p.remotes { if r.Host == address { log.Printf("Remote %s already exists\n", address) return fmt.Errorf("remote %s already exists", address) } } + + p.remotes = append(p.remotes, remote) + connectedRemotes.Set(float64(len(p.remotes))) + log.Printf("Added remote %s\n", remote.Host) + + return nil +} + +func (p *SyncedPool) AddRemote(address string) error { + connection, err := net.Dial("tcp", fmt.Sprintf("%s:1338", address)) if err != nil { + log.Printf("Error connecting to remote %s: %v\n", address, err) return err } @@ -328,11 +348,7 @@ func (p *SyncedPool) AddRemote(address string) error { Host: address, } - p.remotes = append(p.remotes, remote) - connectedRemotes.Set(float64(len(p.remotes))) - log.Printf("Added remote %s\n", remote.Host) - - return nil + return p.addRemoteHost(address, remote) } func (p *SyncedPool) Process(id CartId, messages ...Message) ([]byte, error) {