From 170a51fad94b9fdf35e5941ce80d1109cfe1fa79 Mon Sep 17 00:00:00 2001 From: matst80 Date: Tue, 12 Nov 2024 17:30:19 +0100 Subject: [PATCH] use persisted connections and handle died --- synced-pool.go | 62 ++++++++++++++++++++++++++++++++------------------ tcp-client.go | 18 +++++++++++---- 2 files changed, 54 insertions(+), 26 deletions(-) diff --git a/synced-pool.go b/synced-pool.go index abec84b..1f2edcc 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -115,18 +115,9 @@ func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint32, []byte, error for _, r := range p.remotes { if r.Host == host { // log.Printf("Remote grain %s changed to %s\n", id, host) - p.mu.Lock() - if p.local.grains[id] != nil { - log.Printf("Grain %s already exists locally, deleting\n", id) - delete(p.local.grains, id) - } - grain, err := NewRemoteGrain(id, r.Host) - if err != nil { - log.Printf("Owner change failed %s: %v\n", id, err) - return AckChange, []byte("error"), err - } - p.remoteIndex[id] = grain - p.mu.Unlock() + + p.SpawnRemoteGrain(id, host) + return AckChange, []byte("ok"), nil } } @@ -135,6 +126,33 @@ func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint32, []byte, error return AckChange, []byte("ok"), nil } +func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { + if p.local.grains[id] != nil { + log.Printf("Grain %s already exists locally, deleting\n", id) + p.mu.Lock() + delete(p.local.grains, id) + p.mu.Unlock() + } + + remote, err := NewRemoteGrain(id, host) + if err != nil { + log.Printf("Error creating remote grain %v\n", err) + return + } + go func() { + <-remote.Died + p.mu.Lock() + delete(p.remoteIndex, id) + p.mu.Unlock() + log.Printf("Remote grain %s died, host: %s\n", id.String(), host) + //p.RemoveHost(host) + }() + + p.mu.Lock() + p.remoteIndex[id] = remote + p.mu.Unlock() +} + func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) { listen := fmt.Sprintf("%s:1338", hostname) @@ -242,6 +260,7 @@ func (p *SyncedPool) RemoveHostMappedCarts(host *RemoteHost) { defer p.mu.Unlock() for id, r := range p.remoteIndex { if r.Host == host.Host { + p.remoteIndex[id].Close() delete(p.remoteIndex, id) } } @@ -385,17 +404,11 @@ func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) error { local := 0 remoteNo := 0 for _, id := range ids { - if p.local.grains[id] != nil { - local++ - delete(p.local.grains, id) - } - grain, err := NewRemoteGrain(id, remote.Host) - if err != nil { - log.Printf("Error creating remote grain %s: %v\n", id, err) - continue - } + + go p.SpawnRemoteGrain(id, remote.Host) + remoteNo++ - p.remoteIndex[id] = grain + } log.Printf("Removed %d local grains, added %d remote grains\n", local, remoteNo) p.mu.Unlock() @@ -447,6 +460,11 @@ func (p *SyncedPool) AddRemote(host string) error { MissedPings: 0, Host: host, } + go func() { + <-remote.Died + log.Printf("Removing host, remote died %s", host) + p.RemoveHost(&remote) + }() go func() { for range time.Tick(time.Second * 3) { var err error diff --git a/tcp-client.go b/tcp-client.go index 8146525..d01679d 100644 --- a/tcp-client.go +++ b/tcp-client.go @@ -33,6 +33,8 @@ type TCPClient struct { type PersistentConnection struct { net.Conn + Died chan bool + Dead bool address string } @@ -43,21 +45,29 @@ func NewPersistentConnection(address string) (*PersistentConnection, error) { } return &PersistentConnection{ Conn: connection, + Died: make(chan bool, 1), + Dead: false, address: address, }, nil } func (m *PersistentConnection) Connect() error { - connection, err := net.Dial("tcp", m.address) - if err != nil { - return err + if !m.Dead { + connection, err := net.Dial("tcp", m.address) + if err != nil { + m.Died <- true + m.Dead = true + return err + } + m.Conn = connection } - m.Conn = connection return nil } func (m *PersistentConnection) Close() { m.Conn.Close() + m.Died <- true + m.Dead = true } func (m *PersistentConnection) HandleConnectionError(err error) error {