From 0fe6cb092040448b2ac094dde9cb6563de170361 Mon Sep 17 00:00:00 2001 From: matst80 Date: Thu, 14 Nov 2024 18:52:51 +0100 Subject: [PATCH] discarded host handler --- discarded-host.go | 79 ++++++++++++++++++++++++++++++++++++++++++ discarded-host_test.go | 17 +++++++++ synced-pool.go | 47 +++++++++++-------------- 3 files changed, 117 insertions(+), 26 deletions(-) create mode 100644 discarded-host.go create mode 100644 discarded-host_test.go diff --git a/discarded-host.go b/discarded-host.go new file mode 100644 index 0000000..f1b6780 --- /dev/null +++ b/discarded-host.go @@ -0,0 +1,79 @@ +package main + +import ( + "fmt" + "log" + "net" + "sync" + "time" +) + +type DiscardedHost struct { + *Connection + Host string + Tries int +} + +type DiscardedHostHandler struct { + mu sync.RWMutex + port int + hosts []*DiscardedHost + onConnection *func(string) +} + +func (d *DiscardedHostHandler) run() { + for range time.Tick(time.Second) { + d.mu.RLock() + lst := make([]*DiscardedHost, 0, len(d.hosts)) + for _, host := range d.hosts { + if host.Tries >= 0 || host.Tries < 5 { + go d.testConnection(host) + lst = append(lst, host) + } + } + d.mu.RUnlock() + d.mu.Lock() + d.hosts = lst + d.mu.Unlock() + } + +} + +func (d *DiscardedHostHandler) testConnection(host *DiscardedHost) { + addr := fmt.Sprintf("%s:%d", host.Host, d.port) + conn, err := net.Dial("tcp", addr) + + if err != nil { + host.Tries++ + host.Tries = -1 + } else { + conn.Close() + if d.onConnection != nil { + fn := *d.onConnection + fn(host.Host) + } + } +} + +func NewDiscardedHostHandler(port int) *DiscardedHostHandler { + ret := &DiscardedHostHandler{ + hosts: make([]*DiscardedHost, 0), + port: port, + } + go ret.run() + return ret +} + +func (d *DiscardedHostHandler) SetReconnectHandler(fn func(string)) { + d.onConnection = &fn +} + +func (d *DiscardedHostHandler) AppendHost(host string) { + d.mu.Lock() + defer d.mu.Unlock() + log.Printf("Retrying host %s", host) + d.hosts = append(d.hosts, &DiscardedHost{ + Host: host, + Tries: 0, + }) +} diff --git a/discarded-host_test.go b/discarded-host_test.go new file mode 100644 index 0000000..a4303ff --- /dev/null +++ b/discarded-host_test.go @@ -0,0 +1,17 @@ +package main + +import ( + "testing" + "time" +) + +func TestDiscardedHost(t *testing.T) { + dh := NewDiscardedHostHandler(func(host string) { + t.Log(host) + }, 8080) + dh.AppendHost("localhost") + time.Sleep(2 * time.Second) + if dh.hosts[0].Tries == 0 { + t.Error("Host not tested") + } +} diff --git a/synced-pool.go b/synced-pool.go index 037cef9..aa91021 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -22,12 +22,13 @@ type HealthHandler interface { } type SyncedPool struct { - Server *GenericListener - mu sync.RWMutex - Hostname string - local *GrainLocalPool - remotes map[string]*RemoteHost - remoteIndex map[CartId]*RemoteGrain + Server *GenericListener + mu sync.RWMutex + discardedHostHandler *DiscardedHostHandler + Hostname string + local *GrainLocalPool + remotes map[string]*RemoteHost + remoteIndex map[CartId]*RemoteGrain } var ( @@ -110,13 +111,13 @@ func (p *SyncedPool) GrainOwnerChangeHandler(data *FrameWithPayload, resultChan idAndHostParts := strings.Split(string(data.Payload), ";") if len(idAndHostParts) != 2 { - log.Printf("Invalid remote grain change message\n") + log.Printf("Invalid remote grain change message") resultChan <- MakeFrameWithPayload(AckError, 500, []byte("invalid")) return nil } id := ToCartId(idAndHostParts[0]) host := idAndHostParts[1] - log.Printf("Handling remote grain owner change to %s for id %s\n", host, id) + log.Printf("Handling remote grain owner change to %s for id %s", host, id) for _, r := range p.remotes { if r.Host == host && r.IsHealthy() { go p.SpawnRemoteGrain(id, host) @@ -136,11 +137,11 @@ func (p *SyncedPool) RemoveRemoteGrain(id CartId) { func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { if id.String() == "" { - log.Printf("Invalid grain id, %s\n", id) + log.Printf("Invalid grain id, %s", id) return } if p.local.grains[id] != nil { - log.Printf("Grain %s already exists locally, owner is (%s)\n", id, host) + log.Printf("Grain %s already exists locally, owner is (%s)", id, host) p.mu.Lock() delete(p.local.grains, id) p.mu.Unlock() @@ -148,15 +149,9 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { remote, err := NewRemoteGrain(id, host) if err != nil { - log.Printf("Error creating remote grain %v\n", err) + log.Printf("Error creating remote grain %v", err) return } - // go func() { - // <-remote.Died - // p.RemoveRemoteGrain(id) - // p.HandleHostError(host) - // log.Printf("Remote grain %s died, host: %s\n", id.String(), host) - // }() p.mu.Lock() p.remoteIndex[id] = remote @@ -183,16 +178,16 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) } log.Printf("Listening on %s", listen) - + dh := NewDiscardedHostHandler(1338) pool := &SyncedPool{ - Server: server, - Hostname: hostname, - local: local, - - remotes: make(map[string]*RemoteHost), - remoteIndex: make(map[CartId]*RemoteGrain), + Server: server, + Hostname: hostname, + local: local, + discardedHostHandler: dh, + remotes: make(map[string]*RemoteHost), + remoteIndex: make(map[CartId]*RemoteGrain), } - + dh.SetReconnectHandler(pool.AddRemote) server.AddHandler(Ping, pool.PongHandler) server.AddHandler(GetCartIds, pool.GetCartIdHandler) server.AddHandler(RemoteNegotiate, pool.NegotiateHandler) @@ -266,11 +261,11 @@ func (p *SyncedPool) ExcludeKnown(hosts []string) []string { } func (p *SyncedPool) RemoveHost(host *RemoteHost) { - p.mu.Lock() delete(p.remotes, host.Host) p.mu.Unlock() p.RemoveHostMappedCarts(host) + p.discardedHostHandler.AppendHost(host.Host) connectedRemotes.Set(float64(len(p.remotes))) }