From 46d9f1dd965fe8f421baa6fbd16c38c412f50dfe Mon Sep 17 00:00:00 2001 From: matst80 Date: Mon, 11 Nov 2024 07:17:37 +0100 Subject: [PATCH] watch deletes --- discovery.go | 21 +++++++++++++-------- synced-pool.go | 30 +++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/discovery.go b/discovery.go index 26fed7e..20952a9 100644 --- a/discovery.go +++ b/discovery.go @@ -13,7 +13,7 @@ import ( type Discovery interface { Discover() ([]string, error) - Watch() (<-chan string, error) + Watch() (<-chan HostChange, error) } type K8sDiscovery struct { @@ -38,7 +38,12 @@ func (k *K8sDiscovery) DiscoverInNamespace(namespace string) ([]string, error) { return hosts, nil } -func (k *K8sDiscovery) Watch() (<-chan string, error) { +type HostChange struct { + Host string + Type watch.EventType +} + +func (k *K8sDiscovery) Watch() (<-chan HostChange, error) { timeout := int64(30) watcherFn := func(options metav1.ListOptions) (watch.Interface, error) { return k.client.CoreV1().Pods("").Watch(k.ctx, metav1.ListOptions{ @@ -50,15 +55,15 @@ func (k *K8sDiscovery) Watch() (<-chan string, error) { if err != nil { return nil, err } - ch := make(chan string) + ch := make(chan HostChange) go func() { for event := range watcher.ResultChan() { - if event.Type != watch.Added { - continue - } - pod := event.Object.(*v1.Pod) - ch <- pod.Status.PodIP + pod := event.Object.(*v1.Pod) + ch <- HostChange{ + Host: pod.Status.PodIP, + Type: event.Type, + } } }() return ch, nil diff --git a/synced-pool.go b/synced-pool.go index 23d9e57..b6e8dcd 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "k8s.io/apimachinery/pkg/watch" ) type Quorum interface { @@ -183,18 +184,29 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) log.Printf("Error discovering hosts: %v", err) return } - for host := range ch { - if pool.IsKnown(host) || host == "" { + for chng := range ch { + if chng.Host == "" { continue } - go func(h string) { - log.Printf("Discovered host %s, waiting for startup", h) - time.Sleep(time.Second) - err := pool.AddRemote(h) - if err != nil { - log.Printf("Error adding remote %s: %v", h, err) + known := pool.IsKnown(chng.Host) + if chng.Type == watch.Added && !known { + go func(h string) { + log.Printf("Discovered host %s, waiting for startup", h) + time.Sleep(time.Second) + err := pool.AddRemote(h) + if err != nil { + log.Printf("Error adding remote %s: %v", h, err) + } + }(chng.Host) + } else if chng.Type == watch.Deleted && known { + log.Printf("Host removed %s, removing from index", chng.Host) + for _, r := range pool.remotes { + if r.Host == chng.Host { + pool.RemoveHost(r) + break + } } - }(host) + } } }() } else {