discarded host handler
All checks were successful
Build and Publish / BuildAndDeployAmd64 (push) Successful in 29s
Build and Publish / BuildAndDeploy (push) Successful in 2m29s

This commit is contained in:
matst80
2024-11-14 18:52:51 +01:00
parent d9563460f7
commit 0fe6cb0920
3 changed files with 117 additions and 26 deletions

79
discarded-host.go Normal file
View File

@@ -0,0 +1,79 @@
package main
import (
"fmt"
"log"
"net"
"sync"
"time"
)
type DiscardedHost struct {
*Connection
Host string
Tries int
}
type DiscardedHostHandler struct {
mu sync.RWMutex
port int
hosts []*DiscardedHost
onConnection *func(string)
}
func (d *DiscardedHostHandler) run() {
for range time.Tick(time.Second) {
d.mu.RLock()
lst := make([]*DiscardedHost, 0, len(d.hosts))
for _, host := range d.hosts {
if host.Tries >= 0 || host.Tries < 5 {
go d.testConnection(host)
lst = append(lst, host)
}
}
d.mu.RUnlock()
d.mu.Lock()
d.hosts = lst
d.mu.Unlock()
}
}
func (d *DiscardedHostHandler) testConnection(host *DiscardedHost) {
addr := fmt.Sprintf("%s:%d", host.Host, d.port)
conn, err := net.Dial("tcp", addr)
if err != nil {
host.Tries++
host.Tries = -1
} else {
conn.Close()
if d.onConnection != nil {
fn := *d.onConnection
fn(host.Host)
}
}
}
func NewDiscardedHostHandler(port int) *DiscardedHostHandler {
ret := &DiscardedHostHandler{
hosts: make([]*DiscardedHost, 0),
port: port,
}
go ret.run()
return ret
}
func (d *DiscardedHostHandler) SetReconnectHandler(fn func(string)) {
d.onConnection = &fn
}
func (d *DiscardedHostHandler) AppendHost(host string) {
d.mu.Lock()
defer d.mu.Unlock()
log.Printf("Retrying host %s", host)
d.hosts = append(d.hosts, &DiscardedHost{
Host: host,
Tries: 0,
})
}

17
discarded-host_test.go Normal file
View File

@@ -0,0 +1,17 @@
package main
import (
"testing"
"time"
)
func TestDiscardedHost(t *testing.T) {
dh := NewDiscardedHostHandler(func(host string) {
t.Log(host)
}, 8080)
dh.AppendHost("localhost")
time.Sleep(2 * time.Second)
if dh.hosts[0].Tries == 0 {
t.Error("Host not tested")
}
}

View File

@@ -22,12 +22,13 @@ type HealthHandler interface {
}
type SyncedPool struct {
Server *GenericListener
mu sync.RWMutex
Hostname string
local *GrainLocalPool
remotes map[string]*RemoteHost
remoteIndex map[CartId]*RemoteGrain
Server *GenericListener
mu sync.RWMutex
discardedHostHandler *DiscardedHostHandler
Hostname string
local *GrainLocalPool
remotes map[string]*RemoteHost
remoteIndex map[CartId]*RemoteGrain
}
var (
@@ -110,13 +111,13 @@ func (p *SyncedPool) GrainOwnerChangeHandler(data *FrameWithPayload, resultChan
idAndHostParts := strings.Split(string(data.Payload), ";")
if len(idAndHostParts) != 2 {
log.Printf("Invalid remote grain change message\n")
log.Printf("Invalid remote grain change message")
resultChan <- MakeFrameWithPayload(AckError, 500, []byte("invalid"))
return nil
}
id := ToCartId(idAndHostParts[0])
host := idAndHostParts[1]
log.Printf("Handling remote grain owner change to %s for id %s\n", host, id)
log.Printf("Handling remote grain owner change to %s for id %s", host, id)
for _, r := range p.remotes {
if r.Host == host && r.IsHealthy() {
go p.SpawnRemoteGrain(id, host)
@@ -136,11 +137,11 @@ func (p *SyncedPool) RemoveRemoteGrain(id CartId) {
func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
if id.String() == "" {
log.Printf("Invalid grain id, %s\n", id)
log.Printf("Invalid grain id, %s", id)
return
}
if p.local.grains[id] != nil {
log.Printf("Grain %s already exists locally, owner is (%s)\n", id, host)
log.Printf("Grain %s already exists locally, owner is (%s)", id, host)
p.mu.Lock()
delete(p.local.grains, id)
p.mu.Unlock()
@@ -148,15 +149,9 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
remote, err := NewRemoteGrain(id, host)
if err != nil {
log.Printf("Error creating remote grain %v\n", err)
log.Printf("Error creating remote grain %v", err)
return
}
// go func() {
// <-remote.Died
// p.RemoveRemoteGrain(id)
// p.HandleHostError(host)
// log.Printf("Remote grain %s died, host: %s\n", id.String(), host)
// }()
p.mu.Lock()
p.remoteIndex[id] = remote
@@ -183,16 +178,16 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery)
}
log.Printf("Listening on %s", listen)
dh := NewDiscardedHostHandler(1338)
pool := &SyncedPool{
Server: server,
Hostname: hostname,
local: local,
remotes: make(map[string]*RemoteHost),
remoteIndex: make(map[CartId]*RemoteGrain),
Server: server,
Hostname: hostname,
local: local,
discardedHostHandler: dh,
remotes: make(map[string]*RemoteHost),
remoteIndex: make(map[CartId]*RemoteGrain),
}
dh.SetReconnectHandler(pool.AddRemote)
server.AddHandler(Ping, pool.PongHandler)
server.AddHandler(GetCartIds, pool.GetCartIdHandler)
server.AddHandler(RemoteNegotiate, pool.NegotiateHandler)
@@ -266,11 +261,11 @@ func (p *SyncedPool) ExcludeKnown(hosts []string) []string {
}
func (p *SyncedPool) RemoveHost(host *RemoteHost) {
p.mu.Lock()
delete(p.remotes, host.Host)
p.mu.Unlock()
p.RemoveHostMappedCarts(host)
p.discardedHostHandler.AppendHost(host.Host)
connectedRemotes.Set(float64(len(p.remotes)))
}