update
All checks were successful
Build and Publish / BuildAndDeployAmd64 (push) Successful in 28s
Build and Publish / BuildAndDeploy (push) Successful in 2m13s

This commit is contained in:
matst80
2024-11-12 18:05:41 +01:00
parent 170a51fad9
commit 3f8cdec9af
4 changed files with 132 additions and 118 deletions

View File

@@ -13,7 +13,7 @@ jobs:
- name: Push to registry - name: Push to registry
run: docker push registry.knatofs.se/go-cart-actor-amd64:latest run: docker push registry.knatofs.se/go-cart-actor-amd64:latest
- name: Rollout amd64 deployment - name: Rollout amd64 deployment
run: kubectl rollout restart deployment/cart-actor-amd64 -n cart run: kubectl rollout restart deployment/cart-actor-x86 -n cart
BuildAndDeploy: BuildAndDeploy:
runs-on: arm64 runs-on: arm64

94
remote-host.go Normal file
View File

@@ -0,0 +1,94 @@
package main
import (
"fmt"
"log"
"strings"
)
type RemoteHost struct {
*Client
Host string
MissedPings int
}
func (h *RemoteHost) IsHealthy() bool {
return !h.Dead && h.MissedPings < 3
}
func (h *RemoteHost) Initialize(p *SyncedPool) {
ids, err := h.GetCartMappings()
if err != nil {
log.Printf("Error getting remote mappings: %v\n", err)
return
}
log.Printf("Remote %s has %d grains\n", h.Host, len(ids))
p.mu.Lock()
local := 0
remoteNo := 0
for _, id := range ids {
go p.SpawnRemoteGrain(id, h.Host)
remoteNo++
}
log.Printf("Removed %d local grains, added %d remote grains\n", local, remoteNo)
p.mu.Unlock()
go p.Negotiate()
}
func (h *RemoteHost) Ping() error {
_, err := h.Call(Ping, Pong, []byte{})
if err != nil {
h.MissedPings++
log.Printf("Error pinging remote %s, missed pings: %d", h.Host, h.MissedPings)
} else {
h.MissedPings = 0
}
return err
}
func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) {
reply, err := h.Call(RemoteNegotiate, RemoteNegotiateResponse, []byte(strings.Join(knownHosts, ";")))
if err != nil {
return nil, err
}
if reply.StatusCode != 200 {
return nil, fmt.Errorf("remote returned error on negotiate: %s", string(reply.Data))
}
return strings.Split(string(reply.Data), ";"), nil
}
func (g *RemoteHost) GetCartMappings() ([]CartId, error) {
reply, err := g.Call(GetCartIds, CartIdsResponse, []byte{})
if err != nil {
return nil, err
}
if reply.StatusCode != 200 {
log.Printf("Remote returned error on get cart mappings: %s", string(reply.Data))
return nil, fmt.Errorf("remote returned error: %s", string(reply.Data))
}
parts := strings.Split(string(reply.Data), ";")
ids := make([]CartId, 0, len(parts))
for _, p := range parts {
ids = append(ids, ToCartId(p))
}
return ids, nil
}
func (r *RemoteHost) ConfirmChange(id CartId, host string) error {
reply, err := r.Call(RemoteGrainChanged, AckChange, []byte(fmt.Sprintf("%s;%s", id, host)))
if err != nil {
return err
}
if string(reply.Data) != "ok" {
return fmt.Errorf("remote grain change failed %s", string(reply.Data))
}
return nil
}

View File

@@ -21,12 +21,6 @@ type HealthHandler interface {
IsHealthy() bool IsHealthy() bool
} }
type RemoteHost struct {
*Client
Host string
MissedPings int
}
type SyncedPool struct { type SyncedPool struct {
*Server *Server
mu sync.RWMutex mu sync.RWMutex
@@ -126,6 +120,12 @@ func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint32, []byte, error
return AckChange, []byte("ok"), nil return AckChange, []byte("ok"), nil
} }
func (p *SyncedPool) RemoveRemoteGrain(id CartId) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.remoteIndex, id)
}
func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
if p.local.grains[id] != nil { if p.local.grains[id] != nil {
log.Printf("Grain %s already exists locally, deleting\n", id) log.Printf("Grain %s already exists locally, deleting\n", id)
@@ -141,11 +141,9 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
} }
go func() { go func() {
<-remote.Died <-remote.Died
p.mu.Lock() p.RemoveRemoteGrain(id)
delete(p.remoteIndex, id) p.HandleHostError(host)
p.mu.Unlock()
log.Printf("Remote grain %s died, host: %s\n", id.String(), host) log.Printf("Remote grain %s died, host: %s\n", id.String(), host)
//p.RemoveHost(host)
}() }()
p.mu.Lock() p.mu.Lock()
@@ -153,6 +151,19 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
p.mu.Unlock() p.mu.Unlock()
} }
func (p *SyncedPool) HandleHostError(host string) {
for _, r := range p.remotes {
if r.Host == host {
if !r.IsHealthy() {
p.RemoveHost(r)
} else {
r.ErrorCount++
}
return
}
}
}
func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) { func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) {
listen := fmt.Sprintf("%s:1338", hostname) listen := fmt.Sprintf("%s:1338", hostname)
@@ -165,7 +176,6 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery)
pool := &SyncedPool{ pool := &SyncedPool{
Server: server, Server: server,
//Discovery: discovery,
Hostname: hostname, Hostname: hostname,
local: local, local: local,
@@ -192,7 +202,7 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery)
continue continue
} }
known := pool.IsKnown(chng.Host) known := pool.IsKnown(chng.Host)
if chng.Type == watch.Added && !known { if chng.Type != watch.Deleted && !known {
go func(h string) { go func(h string) {
log.Printf("Discovered host %s, waiting for startup", h) log.Printf("Discovered host %s, waiting for startup", h)
time.Sleep(time.Second) time.Sleep(time.Second)
@@ -218,7 +228,7 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery)
func (p *SyncedPool) IsHealthy() bool { func (p *SyncedPool) IsHealthy() bool {
for _, r := range p.remotes { for _, r := range p.remotes {
if r.MissedPings > 3 { if !r.IsHealthy() {
return false return false
} }
} }
@@ -278,66 +288,26 @@ const (
RemoteNegotiateResponse = uint32(11) RemoteNegotiateResponse = uint32(11)
) )
func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) { func (p *SyncedPool) Negotiate() {
reply, err := h.Call(RemoteNegotiate, RemoteNegotiateResponse, []byte(strings.Join(knownHosts, ";"))) knownHosts := make([]string, 0, len(p.remotes)+1)
for _, r := range p.remotes {
knownHosts = append(knownHosts, r.Host)
}
knownHosts = append([]string{p.Hostname}, knownHosts...)
if err != nil {
return nil, err
}
if reply.StatusCode != 200 {
return nil, fmt.Errorf("remote returned error on negotiate: %s", string(reply.Data))
}
return strings.Split(string(reply.Data), ";"), nil
}
func (g *RemoteHost) GetCartMappings() ([]CartId, error) {
reply, err := g.Call(GetCartIds, CartIdsResponse, []byte{})
if err != nil {
return nil, err
}
if reply.StatusCode != 200 {
log.Printf("Remote returned error on get cart mappings: %s", string(reply.Data))
return nil, fmt.Errorf("remote returned error: %s", string(reply.Data))
}
parts := strings.Split(string(reply.Data), ";")
ids := make([]CartId, 0, len(parts))
for _, p := range parts {
ids = append(ids, ToCartId(p))
}
return ids, nil
}
func (p *SyncedPool) Negotiate(knownHosts []string) ([]string, error) {
allHosts := make(map[string]struct{}, 0)
for _, r := range p.remotes { for _, r := range p.remotes {
hosts, err := r.Negotiate(knownHosts) hosts, err := r.Negotiate(knownHosts)
if err != nil { if err != nil {
return nil, err log.Printf("Error negotiating with %s: %v\n", r.Host, err)
return
} }
for _, h := range hosts { for _, h := range hosts {
allHosts[h] = struct{}{} if !p.IsKnown(h) {
p.AddRemote(h)
}
} }
} }
ret := make([]string, 0, len(allHosts))
for h := range allHosts {
ret = append(ret, h)
}
return ret, nil
}
func (r *RemoteHost) ConfirmChange(id CartId, host string) error {
reply, err := r.Call(RemoteGrainChanged, AckChange, []byte(fmt.Sprintf("%s;%s", id, host)))
if err != nil {
return err
}
if string(reply.Data) != "ok" {
return fmt.Errorf("remote grain change failed %s", string(reply.Data))
}
return nil
} }
func (p *SyncedPool) RequestOwnership(id CartId) error { func (p *SyncedPool) RequestOwnership(id CartId) error {
@@ -373,7 +343,6 @@ func (p *SyncedPool) removeLocalGrain(id CartId) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
delete(p.local.grains, id) delete(p.local.grains, id)
delete(p.remoteIndex, id)
} }
func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) error { func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) error {
@@ -392,53 +361,10 @@ func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) error {
connectedRemotes.Set(float64(len(p.remotes))) connectedRemotes.Set(float64(len(p.remotes)))
log.Printf("Added remote %s\n", remote.Host) log.Printf("Added remote %s\n", remote.Host)
go func() { go remote.Initialize(p)
ids, err := remote.GetCartMappings()
if err != nil {
log.Printf("Error getting remote mappings: %v\n", err)
return
}
log.Printf("Remote %s has %d grains\n", remote.Host, len(ids))
p.mu.Lock()
local := 0
remoteNo := 0
for _, id := range ids {
go p.SpawnRemoteGrain(id, remote.Host)
remoteNo++
}
log.Printf("Removed %d local grains, added %d remote grains\n", local, remoteNo)
p.mu.Unlock()
go func() {
other, err := p.Negotiate(known)
if err != nil {
log.Printf("Error negotiating with remote %s: %v\n", remote.Host, err)
return
}
for _, o := range p.ExcludeKnown(other) {
p.AddRemote(o)
}
}()
}()
return nil return nil
} }
func (h *RemoteHost) Ping() error {
_, err := h.Call(Ping, Pong, []byte{})
if err != nil {
h.MissedPings++
log.Printf("Error pinging remote %s, missed pings: %d", h.Host, h.MissedPings)
} else {
h.MissedPings = 0
}
return err
}
func (p *SyncedPool) AddRemote(host string) error { func (p *SyncedPool) AddRemote(host string) error {
if host == "" || p.IsKnown(host) { if host == "" || p.IsKnown(host) {
return nil return nil

View File

@@ -25,13 +25,7 @@ func TestConnection(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("Error adding remote: %v", err) t.Errorf("Error adding remote: %v", err)
} }
allHosts, err := pool.Negotiate([]string{"kalle", "pelle"}) go pool.Negotiate()
if err != nil {
t.Errorf("Error negotiating: %v", err)
}
if len(allHosts) != 0 {
t.Errorf("Expected 0 host, (host should be known) got %d", len(allHosts))
}
data, err := pool.Get(ToCartId("kalle")) data, err := pool.Get(ToCartId("kalle"))
if err != nil { if err != nil {