change some details
All checks were successful
Build and Publish / BuildAndDeployAmd64 (push) Successful in 28s
Build and Publish / BuildAndDeploy (push) Successful in 2m24s

This commit is contained in:
matst80
2024-11-13 08:17:26 +01:00
parent 42c6536cd6
commit ce5f19d287

View File

@@ -26,7 +26,7 @@ type SyncedPool struct {
mu sync.RWMutex mu sync.RWMutex
Hostname string Hostname string
local *GrainLocalPool local *GrainLocalPool
remotes []*RemoteHost remotes map[string]*RemoteHost
remoteIndex map[CartId]*RemoteGrain remoteIndex map[CartId]*RemoteGrain
} }
@@ -126,6 +126,10 @@ func (p *SyncedPool) RemoveRemoteGrain(id CartId) {
} }
func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
if id.String() == "" {
log.Printf("Invalid grain id, %s\n", id)
return
}
if p.local.grains[id] != nil { if p.local.grains[id] != nil {
log.Printf("Grain %s already exists locally, owner is (%s)\n", id, host) log.Printf("Grain %s already exists locally, owner is (%s)\n", id, host)
p.mu.Lock() p.mu.Lock()
@@ -178,7 +182,7 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery)
Hostname: hostname, Hostname: hostname,
local: local, local: local,
remotes: make([]*RemoteHost, 0), remotes: make(map[string]*RemoteHost),
remoteIndex: make(map[CartId]*RemoteGrain), remoteIndex: make(map[CartId]*RemoteGrain),
} }
@@ -254,16 +258,15 @@ func (p *SyncedPool) ExcludeKnown(hosts []string) []string {
} }
func (p *SyncedPool) RemoveHost(host *RemoteHost) { func (p *SyncedPool) RemoveHost(host *RemoteHost) {
toKeep := make([]*RemoteHost, 0, len(p.remotes)) if p.remotes[host.Host] == nil {
for _, r := range p.remotes { return
if r == host {
p.RemoveHostMappedCarts(r)
} else {
toKeep = append(toKeep, r)
}
} }
p.mu.Lock()
defer p.mu.Unlock()
p.remotes = toKeep h := p.remotes[host.Host]
h.Close()
delete(p.remotes, host.Host)
connectedRemotes.Set(float64(len(p.remotes))) connectedRemotes.Set(float64(len(p.remotes)))
} }
@@ -350,17 +353,9 @@ func (p *SyncedPool) removeLocalGrain(id CartId) {
delete(p.local.grains, id) delete(p.local.grains, id)
} }
func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) {
p.remotes = append(p.remotes, remote)
connectedRemotes.Set(float64(len(p.remotes)))
log.Printf("Added remote %s\n", remote.Host)
go remote.Initialize(p)
}
func (p *SyncedPool) AddRemote(host string) error { func (p *SyncedPool) AddRemote(host string) error {
if host == "" || p.IsKnown(host) { _, hasHost := p.remotes[host]
if host == "" || p.IsKnown(host) || hasHost {
return nil return nil
} }
client, err := Dial(fmt.Sprintf("%s:1338", host)) client, err := Dial(fmt.Sprintf("%s:1338", host))
@@ -374,6 +369,7 @@ func (p *SyncedPool) AddRemote(host string) error {
MissedPings: 0, MissedPings: 0,
Host: host, Host: host,
} }
p.remotes[host] = &remote
go func() { go func() {
<-remote.Died <-remote.Died
log.Printf("Removing host, remote died %s", host) log.Printf("Removing host, remote died %s", host)
@@ -395,7 +391,10 @@ func (p *SyncedPool) AddRemote(host string) error {
} }
}() }()
go p.addRemoteHost(host, &remote) connectedRemotes.Set(float64(len(p.remotes)))
log.Printf("Added remote %s\n", remote.Host)
go remote.Initialize(p)
return nil return nil
} }