watch pods instead of polling
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m57s

This commit is contained in:
matst80
2024-11-10 10:26:21 +01:00
parent 1628c8ca31
commit 351280347b
6 changed files with 90 additions and 13 deletions

View File

@@ -4,7 +4,7 @@ metadata:
name: cart-discovery name: cart-discovery
rules: rules:
- apiGroups: [""] # "" indicates the core API group - apiGroups: [""] # "" indicates the core API group
resources: ["pods","services"] resources: ["pods","services", "deployments"]
verbs: ["get", "watch", "list"] verbs: ["get", "watch", "list"]
--- ---
apiVersion: rbac.authorization.k8s.io/v1 apiVersion: rbac.authorization.k8s.io/v1

View File

@@ -4,8 +4,9 @@ metadata:
name: cart-scaler name: cart-scaler
spec: spec:
scaleTargetRef: scaleTargetRef:
apiVersion: apps/v1
kind: Deployment kind: Deployment
name: cart-actor name: cart-actor
minReplicas: 1 minReplicas: 2
maxReplicas: 5 maxReplicas: 8
targetCPUUtilizationPercentage: 50 targetCPUUtilizationPercentage: 50

View File

@@ -3,12 +3,17 @@ package main
import ( import (
"context" "context"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
toolsWatch "k8s.io/client-go/tools/watch"
) )
type Discovery interface { type Discovery interface {
Discover() ([]string, error) Discover() ([]string, error)
Watch() (<-chan string, error)
} }
type K8sDiscovery struct { type K8sDiscovery struct {
@@ -33,6 +38,32 @@ func (k *K8sDiscovery) DiscoverInNamespace(namespace string) ([]string, error) {
return hosts, nil return hosts, nil
} }
func (k *K8sDiscovery) Watch() (<-chan string, error) {
timeout := int64(30)
watcherFn := func(options metav1.ListOptions) (watch.Interface, error) {
return k.client.CoreV1().Pods("").Watch(k.ctx, metav1.ListOptions{
LabelSelector: "actor-pool=cart",
TimeoutSeconds: &timeout,
})
}
watcher, err := toolsWatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watcherFn})
if err != nil {
return nil, err
}
ch := make(chan string)
go func() {
for event := range watcher.ResultChan() {
if event.Type != watch.Added {
continue
}
pod := event.Object.(*v1.Pod)
ch <- pod.Status.PodIP
}
}()
return ch, nil
}
func NewK8sDiscovery(client *kubernetes.Clientset) *K8sDiscovery { func NewK8sDiscovery(client *kubernetes.Clientset) *K8sDiscovery {
return &K8sDiscovery{ return &K8sDiscovery{
ctx: context.Background(), ctx: context.Background(),

View File

@@ -2,6 +2,7 @@ package main
import ( import (
"testing" "testing"
"time"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
@@ -25,3 +26,24 @@ func TestDiscovery(t *testing.T) {
t.Errorf("Expected at least one host, got none") t.Errorf("Expected at least one host, got none")
} }
} }
func TestWatch(t *testing.T) {
config, err := clientcmd.BuildConfigFromFlags("", "/Users/mats/.kube/config")
if err != nil {
t.Errorf("Error building config: %v", err)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
t.Errorf("Error creating client: %v", err)
}
d := NewK8sDiscovery(client)
ch, err := d.Watch()
if err != nil {
t.Errorf("Error watching: %v", err)
}
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Errorf("Timeout waiting for watch")
}
}

View File

@@ -54,6 +54,7 @@ func SendPacket(conn io.Writer, messageType uint16, datafn func(w io.Writer) err
MessageType: messageType, MessageType: messageType,
DataLength: uint16(len(data)), DataLength: uint16(len(data)),
}) })
packetsSent.Inc()
_, err = conn.Write(data) _, err = conn.Write(data)
return err return err
} }

View File

@@ -78,20 +78,23 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, d Discovery) (*Synced
if d != nil { if d != nil {
go func() { go func() {
for range time.Tick(time.Second * 5) { ch, err := d.Watch()
hosts, err := d.Discover() if err != nil {
if err != nil { log.Printf("Error discovering hosts: %v", err)
log.Printf("Error discovering hosts: %v", err) return
}
for host := range ch {
if pool.IsKnown(host) {
continue
} }
for _, h := range pool.ExcludeKnown(hosts) { go func(h string) {
log.Printf("Discovered host %s, waiting for startup", h)
log.Printf("Discovered host %s", h) time.Sleep(time.Second)
err := pool.AddRemote(h) err := pool.AddRemote(h)
if err != nil { if err != nil {
log.Printf("Error adding remote %s: %v", h, err) log.Printf("Error adding remote %s: %v", h, err)
} }
} }(host)
} }
}() }()
} else { } else {
@@ -112,6 +115,15 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, d Discovery) (*Synced
return pool, nil return pool, nil
} }
func (p *SyncedPool) IsKnown(host string) bool {
for _, r := range p.remotes {
if r.Host == host {
return true
}
}
return host != p.Hostname
}
func (p *SyncedPool) ExcludeKnown(hosts []string) []string { func (p *SyncedPool) ExcludeKnown(hosts []string) []string {
ret := make([]string, 0, len(hosts)) ret := make([]string, 0, len(hosts))
for _, h := range hosts { for _, h := range hosts {
@@ -172,6 +184,14 @@ var (
Name: "cart_packet_queue_size", Name: "cart_packet_queue_size",
Help: "The total number of packets in the queue", Help: "The total number of packets in the queue",
}) })
packetsSent = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_pool_packets_sent_total",
Help: "The total number of packets sent",
})
packetsReceived = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_pool_packets_received_total",
Help: "The total number of packets received",
})
) )
const ( const (
@@ -216,7 +236,7 @@ func NewPacketQueue(connection net.Conn) *PacketQueue {
//return //return
} }
packetQueue.Inc()
queue.mu.Lock() queue.mu.Lock()
for i, packet := range queue.Packets { for i, packet := range queue.Packets {
if time.Since(packet.Added) < time.Second*5 { if time.Since(packet.Added) < time.Second*5 {
@@ -231,6 +251,8 @@ func NewPacketQueue(connection net.Conn) *PacketQueue {
Data: data, Data: data,
}) })
queue.mu.Unlock() queue.mu.Unlock()
packetsReceived.Inc()
packetQueue.Inc()
} }
}() }()
return queue return queue