From 5c0ba4c9a4658d0b6267abf85cb495e6f8432cb1 Mon Sep 17 00:00:00 2001 From: matst80 Date: Sat, 9 Nov 2024 21:28:11 +0100 Subject: [PATCH] check nodes --- deployment/deployment.yaml | 8 +-- discovery.go | 41 +++++++++++++++ synced-pool.go | 103 +++++++++++++------------------------ 3 files changed, 81 insertions(+), 71 deletions(-) create mode 100644 discovery.go diff --git a/deployment/deployment.yaml b/deployment/deployment.yaml index 3ee353a..b1000f8 100644 --- a/deployment/deployment.yaml +++ b/deployment/deployment.yaml @@ -84,10 +84,10 @@ metadata: name: cart-ingress annotations: cert-manager.io/cluster-issuer: letsencrypt-prod - nginx.ingress.kubernetes.io/affinity: "cookie" - nginx.ingress.kubernetes.io/session-cookie-name: "cart-session" - nginx.ingress.kubernetes.io/session-cookie-expires: "172800" - nginx.ingress.kubernetes.io/session-cookie-max-age: "172800" + # nginx.ingress.kubernetes.io/affinity: "cookie" + # nginx.ingress.kubernetes.io/session-cookie-name: "cart-session" + # nginx.ingress.kubernetes.io/session-cookie-expires: "172800" + # nginx.ingress.kubernetes.io/session-cookie-max-age: "172800" nginx.ingress.kubernetes.io/proxy-body-size: 4m spec: ingressClassName: nginx diff --git a/discovery.go b/discovery.go new file mode 100644 index 0000000..9b3b4d5 --- /dev/null +++ b/discovery.go @@ -0,0 +1,41 @@ +package main + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +type Discovery interface { + Discover() ([]string, error) +} + +type K8sDiscovery struct { + ctx context.Context + client *kubernetes.Clientset +} + +func (k *K8sDiscovery) Discover() ([]string, error) { + return k.DiscoverInNamespace("") +} +func (k *K8sDiscovery) DiscoverInNamespace(namespace string) ([]string, error) { + pods, err := k.client.CoreV1().Pods(namespace).List(k.ctx, metav1.ListOptions{ + LabelSelector: "actor-pool=cart", + }) + if err != nil { + return nil, err + } + hosts := make([]string, 0, len(pods.Items)) + for _, pod := range pods.Items { + hosts = append(hosts, pod.Status.PodIP) + } + return hosts, nil +} + +func NewK8sDiscovery(client *kubernetes.Clientset) *K8sDiscovery { + return &K8sDiscovery{ + ctx: context.Background(), + client: client, + } +} diff --git a/synced-pool.go b/synced-pool.go index 66e2e78..8ee8703 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -1,7 +1,6 @@ package main import ( - "context" "encoding/binary" "fmt" "io" @@ -13,43 +12,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" ) -type Discovery interface { - Discover() ([]string, error) -} - -type K8sDiscovery struct { - ctx context.Context - client *kubernetes.Clientset -} - -func (k *K8sDiscovery) Discover() ([]string, error) { - return k.DiscoverInNamespace("") -} -func (k *K8sDiscovery) DiscoverInNamespace(namespace string) ([]string, error) { - pods, err := k.client.CoreV1().Pods(namespace).List(k.ctx, metav1.ListOptions{ - LabelSelector: "actor-pool=cart", - }) - if err != nil { - return nil, err - } - hosts := make([]string, 0, len(pods.Items)) - for _, pod := range pods.Items { - hosts = append(hosts, pod.Status.PodIP) - } - return hosts, nil -} - -func NewK8sDiscovery(client *kubernetes.Clientset) *K8sDiscovery { - return &K8sDiscovery{ - ctx: context.Background(), - client: client, - } -} - type Quorum interface { Negotiate(knownHosts []string) ([]string, error) OwnerChanged(CartId, host string) error @@ -106,12 +70,14 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, d Discovery) (*Synced r.MissedPings = 0 } } + connectedRemotes.Set(float64(len(pool.remotes))) } }() if d != nil { discoveryTimer := time.NewTicker(time.Second * 5) go func() { <-discoveryTimer.C + log.Printf("Looking for new nodes\n") hosts, err := d.Discover() if err != nil { log.Printf("Error discovering hosts: %v\n", err) @@ -296,37 +262,37 @@ func (p *SyncedPool) handleConnection(conn net.Conn) { // remote grain changed grainSyncCount.Inc() log.Printf("Remote grain changed\n") - for err == nil { - idAndHost := make([]byte, packet.DataLength) - _, err = conn.Read(idAndHost) - log.Printf("Remote grain %s changed\n", idAndHost) - if err != nil { - break - } - idAndHostParts := strings.Split(string(idAndHost), ";") - if len(idAndHostParts) != 2 { - log.Printf("Invalid remote grain change message\n") - break - } - found := false - for _, r := range p.remotes { - if r.Host == string(idAndHostParts[1]) { - found = true - log.Printf("Remote grain %s changed to %s\n", idAndHostParts[0], idAndHostParts[1]) - p.remoteIndex[ToCartId(idAndHostParts[0])] = r.Pool - } - } - if !found { - log.Printf("Remote host %s not found\n", idAndHostParts[1]) - log.Printf("Remotes %v\n", p.remotes) - } else { - err = SendPacket(conn, AckChange, func(w io.Writer) error { - _, err := w.Write([]byte("ok")) - return err - }) + idAndHost := make([]byte, packet.DataLength) + _, err = conn.Read(idAndHost) + log.Printf("Remote grain %s changed\n", idAndHost) + if err != nil { + break + } + idAndHostParts := strings.Split(string(idAndHost), ";") + if len(idAndHostParts) != 2 { + log.Printf("Invalid remote grain change message\n") + break + } + found := false + for _, r := range p.remotes { + if r.Host == string(idAndHostParts[1]) { + found = true + log.Printf("Remote grain %s changed to %s\n", idAndHostParts[0], idAndHostParts[1]) + p.remoteIndex[ToCartId(idAndHostParts[0])] = r.Pool } } + + if !found { + log.Printf("Remote host %s not found\n", idAndHostParts[1]) + log.Printf("Remotes %v\n", p.remotes) + } else { + SendPacket(conn, AckChange, func(w io.Writer) error { + _, err := w.Write([]byte("ok")) + return err + }) + } + } } } @@ -364,11 +330,14 @@ func (p *SyncedPool) Negotiate(knownHosts []string) ([]string, error) { } func (r *RemoteHost) ConfirmChange(id CartId, host string) error { - SendPacket(r.connection, RemoteGrainChanged, func(w io.Writer) error { + err := SendPacket(r.connection, RemoteGrainChanged, func(w io.Writer) error { _, err := w.Write([]byte(fmt.Sprintf("%s;%s", id, host))) return err }) - _, err := r.queue.Expect(AckChange, time.Second) + if err != nil { + return err + } + _, err = r.queue.Expect(AckChange, time.Second) if err != nil { return err @@ -382,7 +351,7 @@ func (p *SyncedPool) OwnerChanged(id CartId, host string) error { err := r.ConfirmChange(id, host) if err != nil { - log.Printf("Error confirming change: %v\n", err) + log.Printf("Error confirming change: %v from %s\n", err, host) return err } }