package discovery import ( "context" "slices" "sync" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" toolsWatch "k8s.io/client-go/tools/watch" ) type K8sDiscovery struct { ctx context.Context client *kubernetes.Clientset } func (k *K8sDiscovery) Discover() ([]string, error) { return k.DiscoverInNamespace("") } func (k *K8sDiscovery) DiscoverInNamespace(namespace string) ([]string, error) { pods, err := k.client.CoreV1().Pods(namespace).List(k.ctx, metav1.ListOptions{ LabelSelector: "actor-pool=cart", }) if err != nil { return nil, err } hosts := make([]string, 0, len(pods.Items)) for _, pod := range pods.Items { hosts = append(hosts, pod.Status.PodIP) } return hosts, nil } func (k *K8sDiscovery) Watch() (<-chan HostChange, error) { timeout := int64(30) ipsThatAreReady := make(map[string]bool) m := sync.Mutex{} watcherFn := func(options metav1.ListOptions) (watch.Interface, error) { return k.client.CoreV1().Pods("").Watch(k.ctx, metav1.ListOptions{ LabelSelector: "actor-pool=cart", TimeoutSeconds: &timeout, }) } watcher, err := toolsWatch.NewRetryWatcherWithContext(k.ctx, "1", &cache.ListWatch{WatchFunc: watcherFn}) if err != nil { return nil, err } ch := make(chan HostChange) go func() { for event := range watcher.ResultChan() { pod := event.Object.(*v1.Pod) isReady := slices.ContainsFunc(pod.Status.Conditions, func(condition v1.PodCondition) bool { return condition.Type == v1.PodReady && condition.Status == v1.ConditionTrue }) m.Lock() oldState := ipsThatAreReady[pod.Status.PodIP] ipsThatAreReady[pod.Status.PodIP] = isReady m.Unlock() if oldState != isReady { ch <- HostChange{ Host: pod.Status.PodIP, IsReady: isReady, } } ch <- HostChange{ Host: pod.Status.PodIP, IsReady: isReady, } } }() return ch, nil } func NewK8sDiscovery(client *kubernetes.Clientset) *K8sDiscovery { return &K8sDiscovery{ ctx: context.Background(), client: client, } }