better node removal
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m45s
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m45s
This commit is contained in:
5
.gitignore
vendored
5
.gitignore
vendored
@@ -1 +1,4 @@
|
|||||||
__debug*
|
__debug*
|
||||||
|
go-cart-actor
|
||||||
|
data/*.prot
|
||||||
|
data/*.go*
|
||||||
@@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -30,6 +31,9 @@ func SendCartPacket(conn io.Writer, id CartId, messageType uint16, datafn func(w
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if conn == nil {
|
||||||
|
return fmt.Errorf("no connection to send to")
|
||||||
|
}
|
||||||
binary.Write(conn, binary.LittleEndian, CartPacket{
|
binary.Write(conn, binary.LittleEndian, CartPacket{
|
||||||
Version: 2,
|
Version: 2,
|
||||||
MessageType: messageType,
|
MessageType: messageType,
|
||||||
|
|||||||
@@ -56,16 +56,15 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, d Discovery) (*Synced
|
|||||||
go func(pingTimer *time.Ticker) {
|
go func(pingTimer *time.Ticker) {
|
||||||
for {
|
for {
|
||||||
<-pingTimer.C
|
<-pingTimer.C
|
||||||
log.Printf("Pinging remotes %d\n", len(pool.remotes))
|
for _, r := range pool.remotes {
|
||||||
for i, r := range pool.remotes {
|
|
||||||
err := DoPing(r)
|
err := DoPing(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.MissedPings++
|
r.MissedPings++
|
||||||
log.Printf("Error pinging remote %s: %v\n, missed pings: %d", r.Host, err, r.MissedPings)
|
log.Printf("Error pinging remote %s: %v\n, missed pings: %d", r.Host, err, r.MissedPings)
|
||||||
if r.MissedPings > 3 {
|
if r.MissedPings > 3 {
|
||||||
log.Printf("Removing remote %s\n", r.Host)
|
log.Printf("Removing remote %s\n", r.Host)
|
||||||
|
go pool.RemoveHost(r)
|
||||||
pool.remotes = append(pool.remotes[:i], pool.remotes[i+1:]...)
|
//pool.remotes = append(pool.remotes[:i], pool.remotes[i+1:]...)
|
||||||
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -133,6 +132,28 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, d Discovery) (*Synced
|
|||||||
return pool, nil
|
return pool, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *SyncedPool) RemoveHost(host *RemoteHost) {
|
||||||
|
|
||||||
|
for i, r := range p.remotes {
|
||||||
|
if r == host {
|
||||||
|
p.RemoveHostMappedCarts(r)
|
||||||
|
p.remotes = append(p.remotes[:i], p.remotes[i+1:]...)
|
||||||
|
connectedRemotes.Set(float64(len(p.remotes)))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *SyncedPool) RemoveHostMappedCarts(host *RemoteHost) {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
for id, r := range p.remoteIndex {
|
||||||
|
if r == host.Pool {
|
||||||
|
delete(p.remoteIndex, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
negotiationCount = promauto.NewCounter(prometheus.CounterOpts{
|
negotiationCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||||
Name: "cart_remote_negotiation_total",
|
Name: "cart_remote_negotiation_total",
|
||||||
|
|||||||
Reference in New Issue
Block a user