package discovery import ( "context" "slices" "sync" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" toolsWatch "k8s.io/client-go/tools/watch" ) type K8sDiscovery struct { ctx context.Context client *kubernetes.Clientset listOptions metav1.ListOptions } func (k *K8sDiscovery) Discover() ([]string, error) { return k.DiscoverInNamespace("") } func (k *K8sDiscovery) DiscoverInNamespace(namespace string) ([]string, error) { pods, err := k.client.CoreV1().Pods(namespace).List(k.ctx, k.listOptions) if err != nil { return nil, err } hosts := make([]string, 0, len(pods.Items)) for _, pod := range pods.Items { if hasReadyCondition(&pod) { hosts = append(hosts, pod.Status.PodIP) } } return hosts, nil } func hasReadyCondition(pod *v1.Pod) bool { return slices.ContainsFunc(pod.Status.Conditions, func(condition v1.PodCondition) bool { return condition.Type == v1.PodReady && condition.Status == v1.ConditionTrue }) } func (k *K8sDiscovery) Watch() (<-chan HostChange, error) { ipsThatAreReady := make(map[string]bool) m := sync.Mutex{} watcherFn := func(options metav1.ListOptions) (watch.Interface, error) { return k.client.CoreV1().Pods("").Watch(k.ctx, k.listOptions) } watcher, err := toolsWatch.NewRetryWatcherWithContext(k.ctx, "1", &cache.ListWatch{WatchFunc: watcherFn}) if err != nil { return nil, err } ch := make(chan HostChange) go func() { for event := range watcher.ResultChan() { pod := event.Object.(*v1.Pod) isReady := hasReadyCondition(pod) m.Lock() oldState := ipsThatAreReady[pod.Status.PodIP] ipsThatAreReady[pod.Status.PodIP] = isReady m.Unlock() if oldState != isReady { ch <- HostChange{ Host: pod.Status.PodIP, IsReady: isReady, } } ch <- HostChange{ Host: pod.Status.PodIP, IsReady: isReady, } } }() return ch, nil } func NewK8sDiscovery(client *kubernetes.Clientset, listOptions metav1.ListOptions) *K8sDiscovery { return &K8sDiscovery{ ctx: context.Background(), client: client, listOptions: listOptions, } }