diff --git a/.gitea/workflows/build.yaml b/.gitea/workflows/build.yaml index f4ff248..ca880a9 100644 --- a/.gitea/workflows/build.yaml +++ b/.gitea/workflows/build.yaml @@ -13,7 +13,7 @@ jobs: - name: Push to registry run: docker push registry.knatofs.se/go-cart-actor-amd64:latest - name: Rollout amd64 deployment - run: kubectl rollout restart deployment/cart-actor-amd64 -n cart + run: kubectl rollout restart deployment/cart-actor-x86 -n cart BuildAndDeploy: runs-on: arm64 diff --git a/remote-host.go b/remote-host.go new file mode 100644 index 0000000..88e2837 --- /dev/null +++ b/remote-host.go @@ -0,0 +1,94 @@ +package main + +import ( + "fmt" + "log" + "strings" +) + +type RemoteHost struct { + *Client + Host string + MissedPings int +} + +func (h *RemoteHost) IsHealthy() bool { + return !h.Dead && h.MissedPings < 3 +} + +func (h *RemoteHost) Initialize(p *SyncedPool) { + + ids, err := h.GetCartMappings() + if err != nil { + log.Printf("Error getting remote mappings: %v\n", err) + return + } + log.Printf("Remote %s has %d grains\n", h.Host, len(ids)) + p.mu.Lock() + local := 0 + remoteNo := 0 + for _, id := range ids { + go p.SpawnRemoteGrain(id, h.Host) + remoteNo++ + } + log.Printf("Removed %d local grains, added %d remote grains\n", local, remoteNo) + p.mu.Unlock() + go p.Negotiate() + +} + +func (h *RemoteHost) Ping() error { + _, err := h.Call(Ping, Pong, []byte{}) + + if err != nil { + h.MissedPings++ + log.Printf("Error pinging remote %s, missed pings: %d", h.Host, h.MissedPings) + + } else { + h.MissedPings = 0 + } + return err +} + +func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) { + reply, err := h.Call(RemoteNegotiate, RemoteNegotiateResponse, []byte(strings.Join(knownHosts, ";"))) + + if err != nil { + return nil, err + } + if reply.StatusCode != 200 { + return nil, fmt.Errorf("remote returned error on negotiate: %s", string(reply.Data)) + } + + return strings.Split(string(reply.Data), ";"), nil +} + +func (g *RemoteHost) GetCartMappings() ([]CartId, error) { + reply, err := g.Call(GetCartIds, CartIdsResponse, []byte{}) + if err != nil { + return nil, err + } + if reply.StatusCode != 200 { + log.Printf("Remote returned error on get cart mappings: %s", string(reply.Data)) + return nil, fmt.Errorf("remote returned error: %s", string(reply.Data)) + } + parts := strings.Split(string(reply.Data), ";") + ids := make([]CartId, 0, len(parts)) + for _, p := range parts { + ids = append(ids, ToCartId(p)) + } + return ids, nil +} + +func (r *RemoteHost) ConfirmChange(id CartId, host string) error { + reply, err := r.Call(RemoteGrainChanged, AckChange, []byte(fmt.Sprintf("%s;%s", id, host))) + + if err != nil { + return err + } + if string(reply.Data) != "ok" { + return fmt.Errorf("remote grain change failed %s", string(reply.Data)) + } + + return nil +} diff --git a/synced-pool.go b/synced-pool.go index 1f2edcc..30c055d 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -21,12 +21,6 @@ type HealthHandler interface { IsHealthy() bool } -type RemoteHost struct { - *Client - Host string - MissedPings int -} - type SyncedPool struct { *Server mu sync.RWMutex @@ -126,6 +120,12 @@ func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint32, []byte, error return AckChange, []byte("ok"), nil } +func (p *SyncedPool) RemoveRemoteGrain(id CartId) { + p.mu.Lock() + defer p.mu.Unlock() + delete(p.remoteIndex, id) +} + func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { if p.local.grains[id] != nil { log.Printf("Grain %s already exists locally, deleting\n", id) @@ -141,11 +141,9 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { } go func() { <-remote.Died - p.mu.Lock() - delete(p.remoteIndex, id) - p.mu.Unlock() + p.RemoveRemoteGrain(id) + p.HandleHostError(host) log.Printf("Remote grain %s died, host: %s\n", id.String(), host) - //p.RemoveHost(host) }() p.mu.Lock() @@ -153,6 +151,19 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { p.mu.Unlock() } +func (p *SyncedPool) HandleHostError(host string) { + for _, r := range p.remotes { + if r.Host == host { + if !r.IsHealthy() { + p.RemoveHost(r) + } else { + r.ErrorCount++ + } + return + } + } +} + func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) { listen := fmt.Sprintf("%s:1338", hostname) @@ -164,8 +175,7 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) log.Printf("Listening on %s", listen) pool := &SyncedPool{ - Server: server, - //Discovery: discovery, + Server: server, Hostname: hostname, local: local, @@ -192,7 +202,7 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) continue } known := pool.IsKnown(chng.Host) - if chng.Type == watch.Added && !known { + if chng.Type != watch.Deleted && !known { go func(h string) { log.Printf("Discovered host %s, waiting for startup", h) time.Sleep(time.Second) @@ -218,7 +228,7 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) func (p *SyncedPool) IsHealthy() bool { for _, r := range p.remotes { - if r.MissedPings > 3 { + if !r.IsHealthy() { return false } } @@ -278,66 +288,26 @@ const ( RemoteNegotiateResponse = uint32(11) ) -func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) { - reply, err := h.Call(RemoteNegotiate, RemoteNegotiateResponse, []byte(strings.Join(knownHosts, ";"))) +func (p *SyncedPool) Negotiate() { + knownHosts := make([]string, 0, len(p.remotes)+1) + for _, r := range p.remotes { + knownHosts = append(knownHosts, r.Host) + } + knownHosts = append([]string{p.Hostname}, knownHosts...) - if err != nil { - return nil, err - } - if reply.StatusCode != 200 { - return nil, fmt.Errorf("remote returned error on negotiate: %s", string(reply.Data)) - } - - return strings.Split(string(reply.Data), ";"), nil -} - -func (g *RemoteHost) GetCartMappings() ([]CartId, error) { - reply, err := g.Call(GetCartIds, CartIdsResponse, []byte{}) - if err != nil { - return nil, err - } - if reply.StatusCode != 200 { - log.Printf("Remote returned error on get cart mappings: %s", string(reply.Data)) - return nil, fmt.Errorf("remote returned error: %s", string(reply.Data)) - } - parts := strings.Split(string(reply.Data), ";") - ids := make([]CartId, 0, len(parts)) - for _, p := range parts { - ids = append(ids, ToCartId(p)) - } - return ids, nil -} - -func (p *SyncedPool) Negotiate(knownHosts []string) ([]string, error) { - allHosts := make(map[string]struct{}, 0) for _, r := range p.remotes { hosts, err := r.Negotiate(knownHosts) if err != nil { - return nil, err + log.Printf("Error negotiating with %s: %v\n", r.Host, err) + return } for _, h := range hosts { - allHosts[h] = struct{}{} + if !p.IsKnown(h) { + p.AddRemote(h) + } } } - ret := make([]string, 0, len(allHosts)) - for h := range allHosts { - ret = append(ret, h) - } - return ret, nil -} - -func (r *RemoteHost) ConfirmChange(id CartId, host string) error { - reply, err := r.Call(RemoteGrainChanged, AckChange, []byte(fmt.Sprintf("%s;%s", id, host))) - - if err != nil { - return err - } - if string(reply.Data) != "ok" { - return fmt.Errorf("remote grain change failed %s", string(reply.Data)) - } - - return nil } func (p *SyncedPool) RequestOwnership(id CartId) error { @@ -373,7 +343,6 @@ func (p *SyncedPool) removeLocalGrain(id CartId) { p.mu.Lock() defer p.mu.Unlock() delete(p.local.grains, id) - delete(p.remoteIndex, id) } func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) error { @@ -392,53 +361,10 @@ func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) error { connectedRemotes.Set(float64(len(p.remotes))) log.Printf("Added remote %s\n", remote.Host) - go func() { - - ids, err := remote.GetCartMappings() - if err != nil { - log.Printf("Error getting remote mappings: %v\n", err) - return - } - log.Printf("Remote %s has %d grains\n", remote.Host, len(ids)) - p.mu.Lock() - local := 0 - remoteNo := 0 - for _, id := range ids { - - go p.SpawnRemoteGrain(id, remote.Host) - - remoteNo++ - - } - log.Printf("Removed %d local grains, added %d remote grains\n", local, remoteNo) - p.mu.Unlock() - go func() { - other, err := p.Negotiate(known) - if err != nil { - log.Printf("Error negotiating with remote %s: %v\n", remote.Host, err) - return - } - for _, o := range p.ExcludeKnown(other) { - p.AddRemote(o) - } - }() - }() + go remote.Initialize(p) return nil } -func (h *RemoteHost) Ping() error { - _, err := h.Call(Ping, Pong, []byte{}) - - if err != nil { - h.MissedPings++ - log.Printf("Error pinging remote %s, missed pings: %d", h.Host, h.MissedPings) - - } else { - h.MissedPings = 0 - } - return err -} - func (p *SyncedPool) AddRemote(host string) error { if host == "" || p.IsKnown(host) { return nil diff --git a/synced-pool_test.go b/synced-pool_test.go index 127cdf8..5937f0f 100644 --- a/synced-pool_test.go +++ b/synced-pool_test.go @@ -25,13 +25,7 @@ func TestConnection(t *testing.T) { if err != nil { t.Errorf("Error adding remote: %v", err) } - allHosts, err := pool.Negotiate([]string{"kalle", "pelle"}) - if err != nil { - t.Errorf("Error negotiating: %v", err) - } - if len(allHosts) != 0 { - t.Errorf("Expected 0 host, (host should be known) got %d", len(allHosts)) - } + go pool.Negotiate() data, err := pool.Get(ToCartId("kalle")) if err != nil {