From c9a7113e124b1556a0a562001ee6b437cd11a523 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 13 Nov 2024 08:32:40 +0100 Subject: [PATCH] run on x86 for a while --- deployment/deployment.yaml | 2 +- remote-grain.go | 4 ++++ remote-host.go | 7 ++++--- synced-pool.go | 8 ++++---- tcp-cart-client.go | 24 ++++++++++++------------ tcp-cart_test.go | 2 +- tcp-client.go | 22 +++++++++++----------- 7 files changed, 37 insertions(+), 32 deletions(-) diff --git a/deployment/deployment.yaml b/deployment/deployment.yaml index ac3cdc7..61ecf17 100644 --- a/deployment/deployment.yaml +++ b/deployment/deployment.yaml @@ -92,7 +92,7 @@ metadata: arch: arm64 name: cart-actor-arm64 spec: - replicas: 3 + replicas: 0 selector: matchLabels: app: cart-actor diff --git a/remote-grain.go b/remote-grain.go index ed62981..c5ff0de 100644 --- a/remote-grain.go +++ b/remote-grain.go @@ -82,6 +82,10 @@ func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) (*CallResul return reply, err } +func (g *RemoteGrain) Close() { + g.CartClient.PersistentConnection.Close() +} + func (g *RemoteGrain) GetId() CartId { return g.Id } diff --git a/remote-host.go b/remote-host.go index 5bb33c4..bcd1ceb 100644 --- a/remote-host.go +++ b/remote-host.go @@ -13,11 +13,11 @@ type RemoteHost struct { } func (h *RemoteHost) IsHealthy() bool { - return !h.Dead && h.MissedPings < 3 + return !h.PersistentConnection.Dead && h.MissedPings < 3 } func (h *RemoteHost) Initialize(p *SyncedPool) { - + log.Printf("Initializing remote %s\n", h.Host) ids, err := h.GetCartMappings() if err != nil { log.Printf("Error getting remote mappings: %v\n", err) @@ -43,8 +43,9 @@ func (h *RemoteHost) Ping() error { if err != nil { h.MissedPings++ log.Printf("Error pinging remote %s, missed pings: %d", h.Host, h.MissedPings) - if h.MissedPings >= 3 { + if !h.IsHealthy() { h.Close() + return fmt.Errorf("remote %s is dead", h.Host) } } else { h.MissedPings = 0 diff --git a/synced-pool.go b/synced-pool.go index 0b1c8dd..24615d1 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -143,7 +143,7 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { return } go func() { - <-remote.Died + <-remote.PersistentConnection.Died p.RemoveRemoteGrain(id) p.HandleHostError(host) log.Printf("Remote grain %s died, host: %s\n", id.String(), host) @@ -371,7 +371,7 @@ func (p *SyncedPool) AddRemote(host string) error { } p.remotes[host] = &remote go func() { - <-remote.Died + <-remote.PersistentConnection.Died log.Printf("Removing host, remote died %s", host) p.RemoveHost(&remote) }() @@ -381,12 +381,12 @@ func (p *SyncedPool) AddRemote(host string) error { for err != nil { time.Sleep(time.Millisecond * 200) - err = remote.Ping() if !remote.IsHealthy() { log.Printf("Removing host, unable to communicate with %s", host) p.RemoveHost(&remote) - break + return } + err = remote.Ping() } } }() diff --git a/tcp-cart-client.go b/tcp-cart-client.go index a195522..99be9f6 100644 --- a/tcp-cart-client.go +++ b/tcp-cart-client.go @@ -25,14 +25,14 @@ func CartDial(address string) (*CartClient, error) { } func (c *Client) Close() { - c.Conn.Close() + c.PersistentConnection.Close() } type CartTCPClient struct { - *PersistentConnection - sendMux sync.Mutex - ErrorCount int - address string + PersistentConnection *PersistentConnection + sendMux sync.Mutex + ErrorCount int + address string *CartPacketQueue } @@ -52,31 +52,31 @@ func NewCartTCPClient(address string) (*CartTCPClient, error) { func (m *CartTCPClient) SendPacket(messageType CartMessage, id CartId, data []byte) error { m.sendMux.Lock() defer m.sendMux.Unlock() - m.Conn.Write(header[:]) - err := binary.Write(m.Conn, binary.LittleEndian, CartPacket{ + m.PersistentConnection.Conn.Write(header[:]) + err := binary.Write(m.PersistentConnection, binary.LittleEndian, CartPacket{ Version: CurrentPacketVersion, MessageType: messageType, DataLength: uint32(len(data)), Id: id, }) if err != nil { - return m.HandleConnectionError(err) + return m.PersistentConnection.HandleConnectionError(err) } - _, err = m.Conn.Write(data) - return m.HandleConnectionError(err) + _, err = m.PersistentConnection.Write(data) + return m.PersistentConnection.HandleConnectionError(err) } func (m *CartTCPClient) Call(messageType CartMessage, id CartId, responseType CartMessage, data []byte) (*CallResult, error) { packetChan := m.Expect(responseType, id) err := m.SendPacket(messageType, id, data) if err != nil { - return nil, m.HandleConnectionError(err) + return nil, m.PersistentConnection.HandleConnectionError(err) } select { case ret := <-packetChan: return &ret, nil case <-time.After(time.Second): log.Printf("Timeout waiting for cart response to message type %d\n", responseType) - return nil, m.HandleConnectionError(fmt.Errorf("timeout")) + return nil, m.PersistentConnection.HandleConnectionError(fmt.Errorf("timeout")) } } diff --git a/tcp-cart_test.go b/tcp-cart_test.go index 74a7433..0f11cf8 100644 --- a/tcp-cart_test.go +++ b/tcp-cart_test.go @@ -40,7 +40,7 @@ func TestCartTcpHelpers(t *testing.T) { t.Errorf("Error calling: %v\n", err) } s, err := client.Call(666, id, 3, []byte("Hello, server!")) - client.Close() + client.PersistentConnection.Close() if err != nil { t.Errorf("Error calling: %v\n", err) } diff --git a/tcp-client.go b/tcp-client.go index eb9f71a..1f6e699 100644 --- a/tcp-client.go +++ b/tcp-client.go @@ -26,10 +26,10 @@ func Dial(address string) (*Client, error) { } type TCPClient struct { - *PersistentConnection - sendMux sync.Mutex - ErrorCount int - address string + PersistentConnection *PersistentConnection + sendMux sync.Mutex + ErrorCount int + address string *PacketQueue } @@ -103,18 +103,18 @@ var ( func (m *TCPClient) SendPacket(messageType PoolMessage, data []byte) error { m.sendMux.Lock() defer m.sendMux.Unlock() - m.Conn.Write(header[:]) - err := binary.Write(m.Conn, binary.LittleEndian, Packet{ + m.PersistentConnection.Write(header[:]) + err := binary.Write(m.PersistentConnection, binary.LittleEndian, Packet{ Version: CurrentPacketVersion, MessageType: messageType, StatusCode: 0, DataLength: uint32(len(data)), }) if err != nil { - return m.HandleConnectionError(err) + return m.PersistentConnection.HandleConnectionError(err) } - _, err = m.Conn.Write(data) - return m.HandleConnectionError(err) + _, err = m.PersistentConnection.Write(data) + return m.PersistentConnection.HandleConnectionError(err) } func (m *TCPClient) Call(messageType PoolMessage, responseType PoolMessage, data []byte) (*CallResult, error) { @@ -122,7 +122,7 @@ func (m *TCPClient) Call(messageType PoolMessage, responseType PoolMessage, data err := m.SendPacket(messageType, data) if err != nil { m.RemoveListeners() - return nil, m.HandleConnectionError(err) + return nil, m.PersistentConnection.HandleConnectionError(err) } select { @@ -130,6 +130,6 @@ func (m *TCPClient) Call(messageType PoolMessage, responseType PoolMessage, data return &ret, nil case <-time.After(time.Second): log.Printf("Timeout waiting for cart response to message type %d\n", responseType) - return nil, m.HandleConnectionError(fmt.Errorf("timeout")) + return nil, m.PersistentConnection.HandleConnectionError(fmt.Errorf("timeout")) } }