From cfbb2e29c224f2c647e09ec9111c856432a5b0d3 Mon Sep 17 00:00:00 2001 From: matst80 Date: Sat, 9 Nov 2024 11:46:00 +0100 Subject: [PATCH] add prometheus --- deployment.yaml | 28 +++++++- go.mod | 9 +++ go.sum | 2 + main.go | 28 ++++++-- packet.go | 8 +-- proto/messages.proto | 3 + synced-pool.go | 163 +++++++++++++++++++++++++++++++++++++++++-- synced-pool_test.go | 35 ++++++++++ 8 files changed, 260 insertions(+), 16 deletions(-) create mode 100644 synced-pool_test.go diff --git a/deployment.yaml b/deployment.yaml index ac7edcf..4085407 100644 --- a/deployment.yaml +++ b/deployment.yaml @@ -6,7 +6,7 @@ metadata: app: cart-actor name: cart-actor spec: - replicas: 3 + replicas: 2 selector: matchLabels: app: cart-actor @@ -50,11 +50,23 @@ spec: env: - name: TZ value: "Europe/Stockholm" + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName --- kind: Service apiVersion: v1 metadata: name: cart-actor + annotations: + prometheus.io/port: "8080" + prometheus.io/scrape: "true" + prometheus.io/path: "/metrics" spec: selector: app: cart-actor @@ -89,4 +101,16 @@ spec: service: name: cart-actor port: - number: 8080 \ No newline at end of file + number: 8080 +--- +apiVersion: autoscaling/v1 +kind: HorizontalPodAutoscaler +metadata: + name: cart-scaler +spec: + scaleTargetRef: + kind: Deployment + name: cart-actor + minReplicas: 1 + maxReplicas: 5 + targetCPUUtilizationPercentage: 50 \ No newline at end of file diff --git a/go.mod b/go.mod index 441defa..4f7dd90 100644 --- a/go.mod +++ b/go.mod @@ -5,5 +5,14 @@ go 1.23.0 toolchain go1.23.2 require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_golang v1.20.5 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + golang.org/x/sys v0.22.0 // indirect google.golang.org/protobuf v1.34.2 // indirect ) diff --git a/go.sum b/go.sum index 005d338..212b049 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/prometheus/client_golang v1.20.4 h1:Tgh3Yr67PaOv/uTqloMsCEdeuFTatm5zIq5+qNN23vI= github.com/prometheus/client_golang v1.20.4/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= +github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= diff --git a/main.go b/main.go index 437a403..391648b 100644 --- a/main.go +++ b/main.go @@ -8,9 +8,20 @@ import ( "time" messages "git.tornberg.me/go-cart-actor/proto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +var ( + opsProcessed = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_grain_spawned_total", + Help: "The total number of spawned grains", + }) ) func spawn(id CartId) (*CartGrain, error) { + opsProcessed.Inc() ret := &CartGrain{ Id: id, Items: []CartItem{}, @@ -100,9 +111,11 @@ func (s *PoolServer) Serve() *http.ServeMux { return mux } +var clientName = os.Getenv("NODE_NAME") + func main() { // Create a new instance of the server - storage, err := NewDiskStorage("data/state.gob") + storage, err := NewDiskStorage(fmt.Sprintf("data/%s_state.gob", clientName)) if err != nil { log.Printf("Error loading state: %v\n", err) } @@ -111,7 +124,13 @@ func main() { storage: storage, } - syncedPool := NewSyncedPool(app.pool) + syncedPool, err := NewSyncedPool(app.pool, clientName) + if err != nil { + log.Fatalf("Error creating synced pool: %v\n", err) + } + + // if local + syncedPool.AddRemote("localhost") rpcHandler, err := NewGrainHandler(app.pool, ":1337") if err != nil { @@ -123,10 +142,11 @@ func main() { mux := http.NewServeMux() mux.Handle("/api/", http.StripPrefix("/api", syncedServer.Serve())) mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) { - remotePool := NewRemoteGrainPool(fmt.Sprintf("%s:1337", r.PathValue("host"))) - syncedPool.AddRemote(remotePool) + syncedPool.AddRemote(r.PathValue("host")) }) mux.HandleFunc("GET /save", app.HandleSave) + mux.Handle("/metrics", promhttp.Handler()) + http.ListenAndServe(":8080", mux) } diff --git a/packet.go b/packet.go index fbb78ea..bdac0d7 100644 --- a/packet.go +++ b/packet.go @@ -19,7 +19,7 @@ type CartPacket struct { DataLength uint16 } -type ResponsePacket struct { +type Packet struct { Version uint16 MessageType uint16 DataLength uint16 @@ -45,7 +45,7 @@ func SendPacket(conn io.Writer, messageType uint16, datafn func(w io.Writer) err if err != nil { return err } - binary.Write(conn, binary.LittleEndian, ResponsePacket{ + binary.Write(conn, binary.LittleEndian, Packet{ Version: 1, MessageType: messageType, DataLength: uint16(len(data)), @@ -55,7 +55,7 @@ func SendPacket(conn io.Writer, messageType uint16, datafn func(w io.Writer) err } func SendRawResponse(conn io.Writer, data []byte) error { - binary.Write(conn, binary.LittleEndian, ResponsePacket{ + binary.Write(conn, binary.LittleEndian, Packet{ Version: 1, MessageType: ResponseBody, DataLength: uint16(len(data)), @@ -76,7 +76,7 @@ func SendProxyResponse(conn io.Writer, data any) error { } func ReceivePacket(conn io.Reader) (uint16, []byte, error) { - var packet ResponsePacket + var packet Packet err := binary.Read(conn, binary.LittleEndian, &packet) if err != nil { return packet.MessageType, nil, err diff --git a/proto/messages.proto b/proto/messages.proto index 49a899d..8389536 100644 --- a/proto/messages.proto +++ b/proto/messages.proto @@ -14,5 +14,8 @@ message AddItem { string Image = 6; } +message Negotiate { + repeated Host host = 1; +} diff --git a/synced-pool.go b/synced-pool.go index 7fe7e2d..190c959 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -1,22 +1,173 @@ package main +import ( + "encoding/binary" + "fmt" + "io" + "log" + "net" + "strings" +) + +type Quorum interface { + Negotiate(knownHosts []string) ([]string, error) + ListChanged([]CartId) error +} + +type RemoteHost struct { + Host string + Pool *RemoteGrainPool + connection net.Conn +} + type SyncedPool struct { + listener net.Listener + Hostname string local *GrainLocalPool - remotes []*RemoteGrainPool + remotes []RemoteHost remoteIndex map[CartId]*RemoteGrainPool } -func NewSyncedPool(local *GrainLocalPool) *SyncedPool { - return &SyncedPool{ +func NewSyncedPool(local *GrainLocalPool, hostname string) (*SyncedPool, error) { + listen := fmt.Sprintf("%s:1338", hostname) + l, err := net.Listen("tcp", listen) + if err != nil { + return nil, err + } + pool := &SyncedPool{ + Hostname: hostname, local: local, - remotes: make([]*RemoteGrainPool, 0), + listener: l, + remotes: make([]RemoteHost, 0), remoteIndex: make(map[CartId]*RemoteGrainPool), } + go func() { + for { + conn, err := l.Accept() + if err != nil { + log.Printf("Error accepting connection: %v\n", err) + continue + } + log.Printf("Got connection from %s", conn.RemoteAddr().String()) + go pool.handleConnection(conn) + } + }() + return pool, nil +} + +const ( + RemoteNegotiate = uint16(3) + RemoteGrainChanged = uint16(4) +) + +func (p *SyncedPool) handleConnection(conn net.Conn) { + defer conn.Close() + var packet Packet + for { + err := binary.Read(conn, binary.LittleEndian, &packet) + if err != nil { + if err == io.EOF { + break + } + log.Printf("Error in connection: %v\n", err) + } + // if packet.Version != 1 { + // log.Printf("Invalid version %d\n", packet.Version) + // return + // } + switch packet.MessageType { + case RemoteNegotiate: + data := make([]byte, packet.DataLength) + conn.Read(data) + knownHosts := strings.Split(string(data), ";") + log.Printf("Negotiated with remote, found %v hosts\n", knownHosts) + + for _, h := range knownHosts { + err = p.AddRemote(h) + if err != nil { + log.Printf("Error adding remote %s: %v\n", h, err) + } + } + + SendPacket(conn, RemoteNegotiate, func(w io.Writer) error { + hostnames := make([]string, 0, len(p.remotes)) + for _, r := range p.remotes { + hostnames = append(hostnames, r.Host) + } + w.Write([]byte(strings.Join(hostnames, ";"))) + return nil + }) + case RemoteGrainChanged: + // remote grain changed + log.Printf("Remote grain changed\n") + for err == nil { + id := make([]byte, 16) + _, err = conn.Read(id) + log.Printf("Remote grain %s changed\n", id) + } + } + } } -func (p *SyncedPool) AddRemote(remote *RemoteGrainPool) { +func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) { + SendPacket(h.connection, RemoteNegotiate, func(w io.Writer) error { + w.Write([]byte(strings.Join(knownHosts, ";"))) + return nil + }) + t, data, err := ReceivePacket(h.connection) + if err != nil { + return nil, err + } + if t != RemoteNegotiate { + return nil, fmt.Errorf("unexpected message type %d", t) + } + return strings.Split(string(data), ";"), nil +} + +func (p *SyncedPool) Negotiate(knownHosts []string) ([]string, error) { + allHosts := make(map[string]struct{}, 0) + for _, r := range p.remotes { + hosts, err := r.Negotiate(knownHosts) + if err != nil { + return nil, err + } + for _, h := range hosts { + allHosts[h] = struct{}{} + } + } + ret := make([]string, 0, len(allHosts)) + for h := range allHosts { + ret = append(ret, h) + } + return ret, nil +} + +func (p *SyncedPool) ListChanged(ids []CartId) error { + return nil +} + +func (p *SyncedPool) AddRemote(address string) error { + for _, r := range p.remotes { + if r.Host == address { + log.Printf("Remote %s already exists\n", address) + return fmt.Errorf("remote %s already exists", address) + } + } + connection, err := net.Dial("tcp", fmt.Sprintf("%s:1338", address)) + if err != nil { + return err + } + + pool := NewRemoteGrainPool(fmt.Sprintf(address, 1337)) + remote := RemoteHost{ + connection: connection, + Pool: pool, + Host: address, + } p.remotes = append(p.remotes, remote) - // get all available grains from remote, and start syncing + log.Printf("Added remote %s\n", remote.Host) + + return nil } func (p *SyncedPool) Process(id CartId, messages ...Message) ([]byte, error) { diff --git a/synced-pool_test.go b/synced-pool_test.go new file mode 100644 index 0000000..fc930a6 --- /dev/null +++ b/synced-pool_test.go @@ -0,0 +1,35 @@ +package main + +import ( + "testing" + "time" +) + +func TestConnection(t *testing.T) { + // TestConnection tests the connection to the server + t.Log("Testing connection to server") + localPool := NewGrainLocalPool(100, time.Minute, func(id CartId) (*CartGrain, error) { + return &CartGrain{ + Id: id, + storageMessages: []Message{}, + Items: []CartItem{}, + TotalPrice: 0, + }, nil + }) + pool, err := NewSyncedPool(localPool, "localhost") + if err != nil { + t.Errorf("Error creating pool: %v", err) + } + + err = pool.AddRemote("localhost") + if err != nil { + t.Errorf("Error adding remote: %v", err) + } + allHosts, err := pool.Negotiate([]string{"kalle", "pelle"}) + if err != nil { + t.Errorf("Error negotiating: %v", err) + } + if len(allHosts) != 1 { + t.Errorf("Expected 1 host, got %d", len(allHosts)) + } +}