diff --git a/deployment/roles.yaml b/deployment/roles.yaml index e99d64f..1cbebed 100644 --- a/deployment/roles.yaml +++ b/deployment/roles.yaml @@ -4,7 +4,7 @@ metadata: name: cart-discovery rules: - apiGroups: [""] # "" indicates the core API group - resources: ["pods","services"] + resources: ["pods","services", "deployments"] verbs: ["get", "watch", "list"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/deployment/scaling.yaml b/deployment/scaling.yaml index 7c5b082..0691558 100644 --- a/deployment/scaling.yaml +++ b/deployment/scaling.yaml @@ -4,8 +4,9 @@ metadata: name: cart-scaler spec: scaleTargetRef: + apiVersion: apps/v1 kind: Deployment name: cart-actor - minReplicas: 1 - maxReplicas: 5 + minReplicas: 2 + maxReplicas: 8 targetCPUUtilizationPercentage: 50 \ No newline at end of file diff --git a/discovery.go b/discovery.go index 9b3b4d5..26fed7e 100644 --- a/discovery.go +++ b/discovery.go @@ -3,12 +3,17 @@ package main import ( "context" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + toolsWatch "k8s.io/client-go/tools/watch" ) type Discovery interface { Discover() ([]string, error) + Watch() (<-chan string, error) } type K8sDiscovery struct { @@ -33,6 +38,32 @@ func (k *K8sDiscovery) DiscoverInNamespace(namespace string) ([]string, error) { return hosts, nil } +func (k *K8sDiscovery) Watch() (<-chan string, error) { + timeout := int64(30) + watcherFn := func(options metav1.ListOptions) (watch.Interface, error) { + return k.client.CoreV1().Pods("").Watch(k.ctx, metav1.ListOptions{ + LabelSelector: "actor-pool=cart", + TimeoutSeconds: &timeout, + }) + } + watcher, err := toolsWatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watcherFn}) + if err != nil { + return nil, err + } + ch := make(chan string) + go func() { + for event := range watcher.ResultChan() { + if event.Type != watch.Added { + continue + } + pod := event.Object.(*v1.Pod) + + ch <- pod.Status.PodIP + } + }() + return ch, nil +} + func NewK8sDiscovery(client *kubernetes.Clientset) *K8sDiscovery { return &K8sDiscovery{ ctx: context.Background(), diff --git a/k8s-discovery_test.go b/discovery_test.go similarity index 54% rename from k8s-discovery_test.go rename to discovery_test.go index 167cffc..7010755 100644 --- a/k8s-discovery_test.go +++ b/discovery_test.go @@ -2,6 +2,7 @@ package main import ( "testing" + "time" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -25,3 +26,24 @@ func TestDiscovery(t *testing.T) { t.Errorf("Expected at least one host, got none") } } + +func TestWatch(t *testing.T) { + config, err := clientcmd.BuildConfigFromFlags("", "/Users/mats/.kube/config") + if err != nil { + t.Errorf("Error building config: %v", err) + } + client, err := kubernetes.NewForConfig(config) + if err != nil { + t.Errorf("Error creating client: %v", err) + } + d := NewK8sDiscovery(client) + ch, err := d.Watch() + if err != nil { + t.Errorf("Error watching: %v", err) + } + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Errorf("Timeout waiting for watch") + } +} diff --git a/packet.go b/packet.go index 49e3921..2911d02 100644 --- a/packet.go +++ b/packet.go @@ -54,6 +54,7 @@ func SendPacket(conn io.Writer, messageType uint16, datafn func(w io.Writer) err MessageType: messageType, DataLength: uint16(len(data)), }) + packetsSent.Inc() _, err = conn.Write(data) return err } diff --git a/synced-pool.go b/synced-pool.go index e085a4f..14cb433 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -78,20 +78,23 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, d Discovery) (*Synced if d != nil { go func() { - for range time.Tick(time.Second * 5) { - hosts, err := d.Discover() - if err != nil { - log.Printf("Error discovering hosts: %v", err) + ch, err := d.Watch() + if err != nil { + log.Printf("Error discovering hosts: %v", err) + return + } + for host := range ch { + if pool.IsKnown(host) { + continue } - for _, h := range pool.ExcludeKnown(hosts) { - - log.Printf("Discovered host %s", h) - + go func(h string) { + log.Printf("Discovered host %s, waiting for startup", h) + time.Sleep(time.Second) err := pool.AddRemote(h) if err != nil { log.Printf("Error adding remote %s: %v", h, err) } - } + }(host) } }() } else { @@ -112,6 +115,15 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, d Discovery) (*Synced return pool, nil } +func (p *SyncedPool) IsKnown(host string) bool { + for _, r := range p.remotes { + if r.Host == host { + return true + } + } + return host != p.Hostname +} + func (p *SyncedPool) ExcludeKnown(hosts []string) []string { ret := make([]string, 0, len(hosts)) for _, h := range hosts { @@ -172,6 +184,14 @@ var ( Name: "cart_packet_queue_size", Help: "The total number of packets in the queue", }) + packetsSent = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_pool_packets_sent_total", + Help: "The total number of packets sent", + }) + packetsReceived = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_pool_packets_received_total", + Help: "The total number of packets received", + }) ) const ( @@ -216,7 +236,7 @@ func NewPacketQueue(connection net.Conn) *PacketQueue { //return } - packetQueue.Inc() + queue.mu.Lock() for i, packet := range queue.Packets { if time.Since(packet.Added) < time.Second*5 { @@ -231,6 +251,8 @@ func NewPacketQueue(connection net.Conn) *PacketQueue { Data: data, }) queue.mu.Unlock() + packetsReceived.Inc() + packetQueue.Inc() } }() return queue