update connection reuse
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m48s

This commit is contained in:
matst80
2024-11-09 20:00:21 +01:00
parent f8c1f07989
commit 84cfc142e1

View File

@@ -188,7 +188,7 @@ func (p *SyncedPool) handleConnection(conn net.Conn) {
log.Printf("Negotiated with remote, found %v hosts\n", knownHosts) log.Printf("Negotiated with remote, found %v hosts\n", knownHosts)
for _, h := range knownHosts { for _, h := range knownHosts {
err = p.AddRemote(h) err = p.AddRemoteWithConnection(h, conn)
if err != nil { if err != nil {
log.Printf("Error adding remote %s: %v\n", h, err) log.Printf("Error adding remote %s: %v\n", h, err)
} }
@@ -230,14 +230,14 @@ func (p *SyncedPool) handleConnection(conn net.Conn) {
if !found { if !found {
log.Printf("Remote host %s not found\n", idAndHostParts[1]) log.Printf("Remote host %s not found\n", idAndHostParts[1])
log.Printf("Remotes %v\n", p.remotes) log.Printf("Remotes %v\n", p.remotes)
SendPacket(conn, AckError, func(w io.Writer) error { err = SendPacket(conn, AckError, func(w io.Writer) error {
w.Write([]byte("remote host not found")) w.Write([]byte("remote host not found"))
return nil return nil
}) })
} else { } else {
SendPacket(conn, AckChange, func(w io.Writer) error { err = SendPacket(conn, AckChange, func(w io.Writer) error {
w.Write([]byte("ok")) _, err := w.Write([]byte("ok"))
return nil return err
}) })
} }
} }
@@ -280,9 +280,8 @@ func (p *SyncedPool) Negotiate(knownHosts []string) ([]string, error) {
func (r *RemoteHost) ConfirmChange(id CartId, host string) error { func (r *RemoteHost) ConfirmChange(id CartId, host string) error {
SendPacket(r.connection, RemoteGrainChanged, func(w io.Writer) error { SendPacket(r.connection, RemoteGrainChanged, func(w io.Writer) error {
_, err := w.Write([]byte(fmt.Sprintf("%s;%s", id, host)))
w.Write([]byte(fmt.Sprintf("%s;%s", id, host))) return err
return nil
}) })
t, data, err := ReceivePacket(r.connection) t, data, err := ReceivePacket(r.connection)
if err != nil { if err != nil {
@@ -309,15 +308,36 @@ func (p *SyncedPool) OwnerChanged(id CartId, host string) error {
return nil return nil
} }
func (p *SyncedPool) AddRemote(address string) error { func (p *SyncedPool) AddRemoteWithConnection(address string, connection net.Conn) error {
pool := NewRemoteGrainPool(fmt.Sprintf(address, 1337))
remote := RemoteHost{
connection: connection,
Pool: pool,
Host: address,
}
return p.addRemoteHost(address, remote)
}
func (p *SyncedPool) addRemoteHost(address string, remote RemoteHost) error {
for _, r := range p.remotes { for _, r := range p.remotes {
if r.Host == address { if r.Host == address {
log.Printf("Remote %s already exists\n", address) log.Printf("Remote %s already exists\n", address)
return fmt.Errorf("remote %s already exists", address) return fmt.Errorf("remote %s already exists", address)
} }
} }
p.remotes = append(p.remotes, remote)
connectedRemotes.Set(float64(len(p.remotes)))
log.Printf("Added remote %s\n", remote.Host)
return nil
}
func (p *SyncedPool) AddRemote(address string) error {
connection, err := net.Dial("tcp", fmt.Sprintf("%s:1338", address)) connection, err := net.Dial("tcp", fmt.Sprintf("%s:1338", address))
if err != nil { if err != nil {
log.Printf("Error connecting to remote %s: %v\n", address, err)
return err return err
} }
@@ -328,11 +348,7 @@ func (p *SyncedPool) AddRemote(address string) error {
Host: address, Host: address,
} }
p.remotes = append(p.remotes, remote) return p.addRemoteHost(address, remote)
connectedRemotes.Set(float64(len(p.remotes)))
log.Printf("Added remote %s\n", remote.Host)
return nil
} }
func (p *SyncedPool) Process(id CartId, messages ...Message) ([]byte, error) { func (p *SyncedPool) Process(id CartId, messages ...Message) ([]byte, error) {