watch deletes
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m58s
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m58s
This commit is contained in:
21
discovery.go
21
discovery.go
@@ -13,7 +13,7 @@ import (
|
||||
|
||||
type Discovery interface {
|
||||
Discover() ([]string, error)
|
||||
Watch() (<-chan string, error)
|
||||
Watch() (<-chan HostChange, error)
|
||||
}
|
||||
|
||||
type K8sDiscovery struct {
|
||||
@@ -38,7 +38,12 @@ func (k *K8sDiscovery) DiscoverInNamespace(namespace string) ([]string, error) {
|
||||
return hosts, nil
|
||||
}
|
||||
|
||||
func (k *K8sDiscovery) Watch() (<-chan string, error) {
|
||||
type HostChange struct {
|
||||
Host string
|
||||
Type watch.EventType
|
||||
}
|
||||
|
||||
func (k *K8sDiscovery) Watch() (<-chan HostChange, error) {
|
||||
timeout := int64(30)
|
||||
watcherFn := func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return k.client.CoreV1().Pods("").Watch(k.ctx, metav1.ListOptions{
|
||||
@@ -50,15 +55,15 @@ func (k *K8sDiscovery) Watch() (<-chan string, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ch := make(chan string)
|
||||
ch := make(chan HostChange)
|
||||
go func() {
|
||||
for event := range watcher.ResultChan() {
|
||||
if event.Type != watch.Added {
|
||||
continue
|
||||
}
|
||||
pod := event.Object.(*v1.Pod)
|
||||
|
||||
ch <- pod.Status.PodIP
|
||||
pod := event.Object.(*v1.Pod)
|
||||
ch <- HostChange{
|
||||
Host: pod.Status.PodIP,
|
||||
Type: event.Type,
|
||||
}
|
||||
}
|
||||
}()
|
||||
return ch, nil
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
type Quorum interface {
|
||||
@@ -183,10 +184,12 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery)
|
||||
log.Printf("Error discovering hosts: %v", err)
|
||||
return
|
||||
}
|
||||
for host := range ch {
|
||||
if pool.IsKnown(host) || host == "" {
|
||||
for chng := range ch {
|
||||
if chng.Host == "" {
|
||||
continue
|
||||
}
|
||||
known := pool.IsKnown(chng.Host)
|
||||
if chng.Type == watch.Added && !known {
|
||||
go func(h string) {
|
||||
log.Printf("Discovered host %s, waiting for startup", h)
|
||||
time.Sleep(time.Second)
|
||||
@@ -194,7 +197,16 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery)
|
||||
if err != nil {
|
||||
log.Printf("Error adding remote %s: %v", h, err)
|
||||
}
|
||||
}(host)
|
||||
}(chng.Host)
|
||||
} else if chng.Type == watch.Deleted && known {
|
||||
log.Printf("Host removed %s, removing from index", chng.Host)
|
||||
for _, r := range pool.remotes {
|
||||
if r.Host == chng.Host {
|
||||
pool.RemoveHost(r)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user