diff --git a/synced-pool.go b/synced-pool.go index b39747d..0b1c8dd 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -26,7 +26,7 @@ type SyncedPool struct { mu sync.RWMutex Hostname string local *GrainLocalPool - remotes []*RemoteHost + remotes map[string]*RemoteHost remoteIndex map[CartId]*RemoteGrain } @@ -126,6 +126,10 @@ func (p *SyncedPool) RemoveRemoteGrain(id CartId) { } func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { + if id.String() == "" { + log.Printf("Invalid grain id, %s\n", id) + return + } if p.local.grains[id] != nil { log.Printf("Grain %s already exists locally, owner is (%s)\n", id, host) p.mu.Lock() @@ -178,7 +182,7 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) Hostname: hostname, local: local, - remotes: make([]*RemoteHost, 0), + remotes: make(map[string]*RemoteHost), remoteIndex: make(map[CartId]*RemoteGrain), } @@ -254,16 +258,15 @@ func (p *SyncedPool) ExcludeKnown(hosts []string) []string { } func (p *SyncedPool) RemoveHost(host *RemoteHost) { - toKeep := make([]*RemoteHost, 0, len(p.remotes)) - for _, r := range p.remotes { - if r == host { - p.RemoveHostMappedCarts(r) - } else { - toKeep = append(toKeep, r) - } + if p.remotes[host.Host] == nil { + return } + p.mu.Lock() + defer p.mu.Unlock() - p.remotes = toKeep + h := p.remotes[host.Host] + h.Close() + delete(p.remotes, host.Host) connectedRemotes.Set(float64(len(p.remotes))) } @@ -350,17 +353,9 @@ func (p *SyncedPool) removeLocalGrain(id CartId) { delete(p.local.grains, id) } -func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) { - - p.remotes = append(p.remotes, remote) - connectedRemotes.Set(float64(len(p.remotes))) - log.Printf("Added remote %s\n", remote.Host) - - go remote.Initialize(p) -} - func (p *SyncedPool) AddRemote(host string) error { - if host == "" || p.IsKnown(host) { + _, hasHost := p.remotes[host] + if host == "" || p.IsKnown(host) || hasHost { return nil } client, err := Dial(fmt.Sprintf("%s:1338", host)) @@ -374,6 +369,7 @@ func (p *SyncedPool) AddRemote(host string) error { MissedPings: 0, Host: host, } + p.remotes[host] = &remote go func() { <-remote.Died log.Printf("Removing host, remote died %s", host) @@ -395,7 +391,10 @@ func (p *SyncedPool) AddRemote(host string) error { } }() - go p.addRemoteHost(host, &remote) + connectedRemotes.Set(float64(len(p.remotes))) + log.Printf("Added remote %s\n", remote.Host) + + go remote.Initialize(p) return nil }