From 8cf8be277810d52e472537470eaf04037ab3f2c4 Mon Sep 17 00:00:00 2001 From: matst80 Date: Sat, 9 Nov 2024 19:16:01 +0100 Subject: [PATCH] more sync stuff --- synced-pool.go | 69 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 64 insertions(+), 5 deletions(-) diff --git a/synced-pool.go b/synced-pool.go index e6b53a6..15ffb67 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -49,7 +49,7 @@ func NewK8sDiscovery(client *kubernetes.Clientset) *K8sDiscovery { type Quorum interface { Negotiate(knownHosts []string) ([]string, error) - ElectOwner(CartId) error + OwnerChanged(CartId, host string) error } type RemoteHost struct { @@ -119,6 +119,8 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, d Discovery) (*Synced const ( RemoteNegotiate = uint16(3) RemoteGrainChanged = uint16(4) + AckChange = uint16(5) + AckError = uint16(6) ) func (p *SyncedPool) handleConnection(conn net.Conn) { @@ -162,9 +164,38 @@ func (p *SyncedPool) handleConnection(conn net.Conn) { // remote grain changed log.Printf("Remote grain changed\n") for err == nil { - id := make([]byte, 16) - _, err = conn.Read(id) - log.Printf("Remote grain %s changed\n", id) + idAndHost := make([]byte, packet.DataLength) + _, err = conn.Read(idAndHost) + log.Printf("Remote grain %s changed\n", idAndHost) + if err != nil { + break + } + idAndHostParts := strings.Split(string(idAndHost), ";") + if len(idAndHostParts) != 2 { + log.Printf("Invalid remote grain change message\n") + break + } + found := false + for _, r := range p.remotes { + if r.Host == string(idAndHostParts[1]) { + found = true + log.Printf("Remote grain %s changed to %s\n", idAndHostParts[0], idAndHostParts[1]) + p.remoteIndex[ToCartId(idAndHostParts[0])] = r.Pool + } + } + + if !found { + log.Printf("Remote host %s not found\n", idAndHostParts[1]) + SendPacket(conn, AckError, func(w io.Writer) error { + w.Write([]byte("remote host not found")) + return nil + }) + } else { + SendPacket(conn, AckChange, func(w io.Writer) error { + w.Write([]byte("ok")) + return nil + }) + } } } } @@ -203,7 +234,31 @@ func (p *SyncedPool) Negotiate(knownHosts []string) ([]string, error) { return ret, nil } -func (p *SyncedPool) ListChanged(ids []CartId) error { +func (r *RemoteHost) ConfirmChange(id CartId, host string) error { + SendPacket(r.connection, RemoteGrainChanged, func(w io.Writer) error { + + w.Write([]byte(fmt.Sprintf("%s;%s", id, host))) + return nil + }) + t, _, err := ReceivePacket(r.connection) + if err != nil { + return err + } + if t != AckChange { + return fmt.Errorf("unexpected message type %d", t) + } + return nil +} + +func (p *SyncedPool) OwnerChanged(id CartId, host string) error { + for _, r := range p.remotes { + err := r.ConfirmChange(id, host) + + if err != nil { + log.Printf("Error confirming change: %v\n", err) + return err + } + } return nil } @@ -241,6 +296,10 @@ func (p *SyncedPool) Process(id CartId, messages ...Message) ([]byte, error) { return remoteGrain.Process(id, messages...) } } + err := p.OwnerChanged(id, p.Hostname) + if err != nil { + return nil, err + } return p.local.Process(id, messages...) }