check nodes
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m55s

This commit is contained in:
matst80
2024-11-09 21:28:11 +01:00
parent 60ac1e5792
commit 5c0ba4c9a4
3 changed files with 81 additions and 71 deletions

View File

@@ -84,10 +84,10 @@ metadata:
name: cart-ingress name: cart-ingress
annotations: annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod cert-manager.io/cluster-issuer: letsencrypt-prod
nginx.ingress.kubernetes.io/affinity: "cookie" # nginx.ingress.kubernetes.io/affinity: "cookie"
nginx.ingress.kubernetes.io/session-cookie-name: "cart-session" # nginx.ingress.kubernetes.io/session-cookie-name: "cart-session"
nginx.ingress.kubernetes.io/session-cookie-expires: "172800" # nginx.ingress.kubernetes.io/session-cookie-expires: "172800"
nginx.ingress.kubernetes.io/session-cookie-max-age: "172800" # nginx.ingress.kubernetes.io/session-cookie-max-age: "172800"
nginx.ingress.kubernetes.io/proxy-body-size: 4m nginx.ingress.kubernetes.io/proxy-body-size: 4m
spec: spec:
ingressClassName: nginx ingressClassName: nginx

41
discovery.go Normal file
View File

@@ -0,0 +1,41 @@
package main
import (
"context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
type Discovery interface {
Discover() ([]string, error)
}
type K8sDiscovery struct {
ctx context.Context
client *kubernetes.Clientset
}
func (k *K8sDiscovery) Discover() ([]string, error) {
return k.DiscoverInNamespace("")
}
func (k *K8sDiscovery) DiscoverInNamespace(namespace string) ([]string, error) {
pods, err := k.client.CoreV1().Pods(namespace).List(k.ctx, metav1.ListOptions{
LabelSelector: "actor-pool=cart",
})
if err != nil {
return nil, err
}
hosts := make([]string, 0, len(pods.Items))
for _, pod := range pods.Items {
hosts = append(hosts, pod.Status.PodIP)
}
return hosts, nil
}
func NewK8sDiscovery(client *kubernetes.Clientset) *K8sDiscovery {
return &K8sDiscovery{
ctx: context.Background(),
client: client,
}
}

View File

@@ -1,7 +1,6 @@
package main package main
import ( import (
"context"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io" "io"
@@ -13,43 +12,8 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
) )
type Discovery interface {
Discover() ([]string, error)
}
type K8sDiscovery struct {
ctx context.Context
client *kubernetes.Clientset
}
func (k *K8sDiscovery) Discover() ([]string, error) {
return k.DiscoverInNamespace("")
}
func (k *K8sDiscovery) DiscoverInNamespace(namespace string) ([]string, error) {
pods, err := k.client.CoreV1().Pods(namespace).List(k.ctx, metav1.ListOptions{
LabelSelector: "actor-pool=cart",
})
if err != nil {
return nil, err
}
hosts := make([]string, 0, len(pods.Items))
for _, pod := range pods.Items {
hosts = append(hosts, pod.Status.PodIP)
}
return hosts, nil
}
func NewK8sDiscovery(client *kubernetes.Clientset) *K8sDiscovery {
return &K8sDiscovery{
ctx: context.Background(),
client: client,
}
}
type Quorum interface { type Quorum interface {
Negotiate(knownHosts []string) ([]string, error) Negotiate(knownHosts []string) ([]string, error)
OwnerChanged(CartId, host string) error OwnerChanged(CartId, host string) error
@@ -106,12 +70,14 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, d Discovery) (*Synced
r.MissedPings = 0 r.MissedPings = 0
} }
} }
connectedRemotes.Set(float64(len(pool.remotes)))
} }
}() }()
if d != nil { if d != nil {
discoveryTimer := time.NewTicker(time.Second * 5) discoveryTimer := time.NewTicker(time.Second * 5)
go func() { go func() {
<-discoveryTimer.C <-discoveryTimer.C
log.Printf("Looking for new nodes\n")
hosts, err := d.Discover() hosts, err := d.Discover()
if err != nil { if err != nil {
log.Printf("Error discovering hosts: %v\n", err) log.Printf("Error discovering hosts: %v\n", err)
@@ -296,37 +262,37 @@ func (p *SyncedPool) handleConnection(conn net.Conn) {
// remote grain changed // remote grain changed
grainSyncCount.Inc() grainSyncCount.Inc()
log.Printf("Remote grain changed\n") log.Printf("Remote grain changed\n")
for err == nil {
idAndHost := make([]byte, packet.DataLength)
_, err = conn.Read(idAndHost)
log.Printf("Remote grain %s changed\n", idAndHost)
if err != nil {
break
}
idAndHostParts := strings.Split(string(idAndHost), ";")
if len(idAndHostParts) != 2 {
log.Printf("Invalid remote grain change message\n")
break
}
found := false
for _, r := range p.remotes {
if r.Host == string(idAndHostParts[1]) {
found = true
log.Printf("Remote grain %s changed to %s\n", idAndHostParts[0], idAndHostParts[1])
p.remoteIndex[ToCartId(idAndHostParts[0])] = r.Pool
}
}
if !found { idAndHost := make([]byte, packet.DataLength)
log.Printf("Remote host %s not found\n", idAndHostParts[1]) _, err = conn.Read(idAndHost)
log.Printf("Remotes %v\n", p.remotes) log.Printf("Remote grain %s changed\n", idAndHost)
} else { if err != nil {
err = SendPacket(conn, AckChange, func(w io.Writer) error { break
_, err := w.Write([]byte("ok")) }
return err idAndHostParts := strings.Split(string(idAndHost), ";")
}) if len(idAndHostParts) != 2 {
log.Printf("Invalid remote grain change message\n")
break
}
found := false
for _, r := range p.remotes {
if r.Host == string(idAndHostParts[1]) {
found = true
log.Printf("Remote grain %s changed to %s\n", idAndHostParts[0], idAndHostParts[1])
p.remoteIndex[ToCartId(idAndHostParts[0])] = r.Pool
} }
} }
if !found {
log.Printf("Remote host %s not found\n", idAndHostParts[1])
log.Printf("Remotes %v\n", p.remotes)
} else {
SendPacket(conn, AckChange, func(w io.Writer) error {
_, err := w.Write([]byte("ok"))
return err
})
}
} }
} }
} }
@@ -364,11 +330,14 @@ func (p *SyncedPool) Negotiate(knownHosts []string) ([]string, error) {
} }
func (r *RemoteHost) ConfirmChange(id CartId, host string) error { func (r *RemoteHost) ConfirmChange(id CartId, host string) error {
SendPacket(r.connection, RemoteGrainChanged, func(w io.Writer) error { err := SendPacket(r.connection, RemoteGrainChanged, func(w io.Writer) error {
_, err := w.Write([]byte(fmt.Sprintf("%s;%s", id, host))) _, err := w.Write([]byte(fmt.Sprintf("%s;%s", id, host)))
return err return err
}) })
_, err := r.queue.Expect(AckChange, time.Second) if err != nil {
return err
}
_, err = r.queue.Expect(AckChange, time.Second)
if err != nil { if err != nil {
return err return err
@@ -382,7 +351,7 @@ func (p *SyncedPool) OwnerChanged(id CartId, host string) error {
err := r.ConfirmChange(id, host) err := r.ConfirmChange(id, host)
if err != nil { if err != nil {
log.Printf("Error confirming change: %v\n", err) log.Printf("Error confirming change: %v from %s\n", err, host)
return err return err
} }
} }