update tests and discovery
This commit is contained in:
@@ -2,6 +2,8 @@ package discovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -33,13 +35,10 @@ func (k *K8sDiscovery) DiscoverInNamespace(namespace string) ([]string, error) {
|
||||
return hosts, nil
|
||||
}
|
||||
|
||||
type HostChange struct {
|
||||
Host string
|
||||
Type watch.EventType
|
||||
}
|
||||
|
||||
func (k *K8sDiscovery) Watch() (<-chan HostChange, error) {
|
||||
timeout := int64(30)
|
||||
ipsThatAreReady := make(map[string]bool)
|
||||
m := sync.Mutex{}
|
||||
watcherFn := func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return k.client.CoreV1().Pods("").Watch(k.ctx, metav1.ListOptions{
|
||||
LabelSelector: "actor-pool=cart",
|
||||
@@ -55,10 +54,22 @@ func (k *K8sDiscovery) Watch() (<-chan HostChange, error) {
|
||||
for event := range watcher.ResultChan() {
|
||||
|
||||
pod := event.Object.(*v1.Pod)
|
||||
// log.Printf("pod change %+v", pod.Status.Phase == v1.PodRunning)
|
||||
isReady := slices.ContainsFunc(pod.Status.Conditions, func(condition v1.PodCondition) bool {
|
||||
return condition.Type == v1.PodReady && condition.Status == v1.ConditionTrue
|
||||
})
|
||||
m.Lock()
|
||||
oldState := ipsThatAreReady[pod.Status.PodIP]
|
||||
ipsThatAreReady[pod.Status.PodIP] = isReady
|
||||
m.Unlock()
|
||||
if oldState != isReady {
|
||||
ch <- HostChange{
|
||||
Host: pod.Status.PodIP,
|
||||
IsReady: isReady,
|
||||
}
|
||||
}
|
||||
ch <- HostChange{
|
||||
Host: pod.Status.PodIP,
|
||||
Type: event.Type,
|
||||
Host: pod.Status.PodIP,
|
||||
IsReady: isReady,
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -2,9 +2,8 @@ package discovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
// MockDiscovery is an in-memory Discovery implementation for tests.
|
||||
@@ -56,14 +55,12 @@ func (m *MockDiscovery) AddHost(host string) {
|
||||
if m.closed {
|
||||
return
|
||||
}
|
||||
for _, h := range m.hosts {
|
||||
if h == host {
|
||||
return
|
||||
}
|
||||
if slices.Contains(m.hosts, host) {
|
||||
return
|
||||
}
|
||||
m.hosts = append(m.hosts, host)
|
||||
if m.started {
|
||||
m.events <- HostChange{Host: host, Type: watch.Added}
|
||||
m.events <- HostChange{Host: host, IsReady: true}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,7 +83,7 @@ func (m *MockDiscovery) RemoveHost(host string) {
|
||||
}
|
||||
m.hosts = append(m.hosts[:idx], m.hosts[idx+1:]...)
|
||||
if m.started {
|
||||
m.events <- HostChange{Host: host, Type: watch.Deleted}
|
||||
m.events <- HostChange{Host: host, IsReady: false}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
package discovery
|
||||
|
||||
type HostChange struct {
|
||||
Host string
|
||||
IsReady bool
|
||||
}
|
||||
|
||||
type Discovery interface {
|
||||
Discover() ([]string, error)
|
||||
Watch() (<-chan HostChange, error)
|
||||
|
||||
Reference in New Issue
Block a user