package main import ( "context" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" toolsWatch "k8s.io/client-go/tools/watch" ) type Discovery interface { Discover() ([]string, error) Watch() (<-chan HostChange, error) } type K8sDiscovery struct { ctx context.Context client *kubernetes.Clientset } func (k *K8sDiscovery) Discover() ([]string, error) { return k.DiscoverInNamespace("") } func (k *K8sDiscovery) DiscoverInNamespace(namespace string) ([]string, error) { pods, err := k.client.CoreV1().Pods(namespace).List(k.ctx, metav1.ListOptions{ LabelSelector: "actor-pool=cart", }) if err != nil { return nil, err } hosts := make([]string, 0, len(pods.Items)) for _, pod := range pods.Items { hosts = append(hosts, pod.Status.PodIP) } return hosts, nil } type HostChange struct { Host string Type watch.EventType } func (k *K8sDiscovery) Watch() (<-chan HostChange, error) { timeout := int64(30) watcherFn := func(options metav1.ListOptions) (watch.Interface, error) { return k.client.CoreV1().Pods("").Watch(k.ctx, metav1.ListOptions{ LabelSelector: "actor-pool=cart", TimeoutSeconds: &timeout, }) } watcher, err := toolsWatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watcherFn}) if err != nil { return nil, err } ch := make(chan HostChange) go func() { for event := range watcher.ResultChan() { pod := event.Object.(*v1.Pod) ch <- HostChange{ Host: pod.Status.PodIP, Type: event.Type, } } }() return ch, nil } func NewK8sDiscovery(client *kubernetes.Clientset) *K8sDiscovery { return &K8sDiscovery{ ctx: context.Background(), client: client, } }