173 lines
3.7 KiB
Go
173 lines
3.7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/tools/cache"
|
|
toolsWatch "k8s.io/client-go/tools/watch"
|
|
)
|
|
|
|
type Discovery interface {
|
|
Discover() ([]string, error)
|
|
Watch() (<-chan HostChange, error)
|
|
}
|
|
|
|
type K8sDiscovery struct {
|
|
ctx context.Context
|
|
client *kubernetes.Clientset
|
|
}
|
|
|
|
func (k *K8sDiscovery) Discover() ([]string, error) {
|
|
return k.DiscoverInNamespace("")
|
|
}
|
|
func (k *K8sDiscovery) DiscoverInNamespace(namespace string) ([]string, error) {
|
|
pods, err := k.client.CoreV1().Pods(namespace).List(k.ctx, metav1.ListOptions{
|
|
LabelSelector: "actor-pool=cart",
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
hosts := make([]string, 0, len(pods.Items))
|
|
for _, pod := range pods.Items {
|
|
hosts = append(hosts, pod.Status.PodIP)
|
|
}
|
|
return hosts, nil
|
|
}
|
|
|
|
type HostChange struct {
|
|
Host string
|
|
Type watch.EventType
|
|
}
|
|
|
|
func (k *K8sDiscovery) Watch() (<-chan HostChange, error) {
|
|
timeout := int64(30)
|
|
watcherFn := func(options metav1.ListOptions) (watch.Interface, error) {
|
|
return k.client.CoreV1().Pods("").Watch(k.ctx, metav1.ListOptions{
|
|
LabelSelector: "actor-pool=cart",
|
|
TimeoutSeconds: &timeout,
|
|
})
|
|
}
|
|
watcher, err := toolsWatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watcherFn})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ch := make(chan HostChange)
|
|
go func() {
|
|
for event := range watcher.ResultChan() {
|
|
|
|
pod := event.Object.(*v1.Pod)
|
|
ch <- HostChange{
|
|
Host: pod.Status.PodIP,
|
|
Type: event.Type,
|
|
}
|
|
}
|
|
}()
|
|
return ch, nil
|
|
}
|
|
|
|
func NewK8sDiscovery(client *kubernetes.Clientset) *K8sDiscovery {
|
|
return &K8sDiscovery{
|
|
ctx: context.Background(),
|
|
client: client,
|
|
}
|
|
}
|
|
|
|
// MockDiscovery is an in-memory Discovery implementation for tests.
|
|
// It allows deterministic injection of host additions/removals without
|
|
// depending on Kubernetes API machinery.
|
|
type MockDiscovery struct {
|
|
mu sync.RWMutex
|
|
hosts []string
|
|
events chan HostChange
|
|
closed bool
|
|
started bool
|
|
}
|
|
|
|
// NewMockDiscovery creates a mock discovery with an initial host list.
|
|
func NewMockDiscovery(initial []string) *MockDiscovery {
|
|
cp := make([]string, len(initial))
|
|
copy(cp, initial)
|
|
return &MockDiscovery{
|
|
hosts: cp,
|
|
events: make(chan HostChange, 32),
|
|
}
|
|
}
|
|
|
|
// Discover returns the current host snapshot.
|
|
func (m *MockDiscovery) Discover() ([]string, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
cp := make([]string, len(m.hosts))
|
|
copy(cp, m.hosts)
|
|
return cp, nil
|
|
}
|
|
|
|
// Watch returns a channel that will receive HostChange events.
|
|
// The channel is buffered; AddHost/RemoveHost push events non-blockingly.
|
|
func (m *MockDiscovery) Watch() (<-chan HostChange, error) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if m.closed {
|
|
return nil, context.Canceled
|
|
}
|
|
m.started = true
|
|
return m.events, nil
|
|
}
|
|
|
|
// AddHost inserts a new host (if absent) and emits an Added event.
|
|
func (m *MockDiscovery) AddHost(host string) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if m.closed {
|
|
return
|
|
}
|
|
for _, h := range m.hosts {
|
|
if h == host {
|
|
return
|
|
}
|
|
}
|
|
m.hosts = append(m.hosts, host)
|
|
if m.started {
|
|
m.events <- HostChange{Host: host, Type: watch.Added}
|
|
}
|
|
}
|
|
|
|
// RemoveHost removes a host (if present) and emits a Deleted event.
|
|
func (m *MockDiscovery) RemoveHost(host string) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if m.closed {
|
|
return
|
|
}
|
|
idx := -1
|
|
for i, h := range m.hosts {
|
|
if h == host {
|
|
idx = i
|
|
break
|
|
}
|
|
}
|
|
if idx == -1 {
|
|
return
|
|
}
|
|
m.hosts = append(m.hosts[:idx], m.hosts[idx+1:]...)
|
|
if m.started {
|
|
m.events <- HostChange{Host: host, Type: watch.Deleted}
|
|
}
|
|
}
|
|
|
|
// Close closes the event channel (idempotent).
|
|
func (m *MockDiscovery) Close() {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if m.closed {
|
|
return
|
|
}
|
|
m.closed = true
|
|
close(m.events)
|
|
}
|