run on x86 for a while
All checks were successful
Build and Publish / BuildAndDeployAmd64 (push) Successful in 29s
Build and Publish / BuildAndDeploy (push) Successful in 2m28s

This commit is contained in:
matst80
2024-11-13 08:32:40 +01:00
parent ce5f19d287
commit c9a7113e12
7 changed files with 37 additions and 32 deletions

View File

@@ -92,7 +92,7 @@ metadata:
arch: arm64 arch: arm64
name: cart-actor-arm64 name: cart-actor-arm64
spec: spec:
replicas: 3 replicas: 0
selector: selector:
matchLabels: matchLabels:
app: cart-actor app: cart-actor

View File

@@ -82,6 +82,10 @@ func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) (*CallResul
return reply, err return reply, err
} }
func (g *RemoteGrain) Close() {
g.CartClient.PersistentConnection.Close()
}
func (g *RemoteGrain) GetId() CartId { func (g *RemoteGrain) GetId() CartId {
return g.Id return g.Id
} }

View File

@@ -13,11 +13,11 @@ type RemoteHost struct {
} }
func (h *RemoteHost) IsHealthy() bool { func (h *RemoteHost) IsHealthy() bool {
return !h.Dead && h.MissedPings < 3 return !h.PersistentConnection.Dead && h.MissedPings < 3
} }
func (h *RemoteHost) Initialize(p *SyncedPool) { func (h *RemoteHost) Initialize(p *SyncedPool) {
log.Printf("Initializing remote %s\n", h.Host)
ids, err := h.GetCartMappings() ids, err := h.GetCartMappings()
if err != nil { if err != nil {
log.Printf("Error getting remote mappings: %v\n", err) log.Printf("Error getting remote mappings: %v\n", err)
@@ -43,8 +43,9 @@ func (h *RemoteHost) Ping() error {
if err != nil { if err != nil {
h.MissedPings++ h.MissedPings++
log.Printf("Error pinging remote %s, missed pings: %d", h.Host, h.MissedPings) log.Printf("Error pinging remote %s, missed pings: %d", h.Host, h.MissedPings)
if h.MissedPings >= 3 { if !h.IsHealthy() {
h.Close() h.Close()
return fmt.Errorf("remote %s is dead", h.Host)
} }
} else { } else {
h.MissedPings = 0 h.MissedPings = 0

View File

@@ -143,7 +143,7 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
return return
} }
go func() { go func() {
<-remote.Died <-remote.PersistentConnection.Died
p.RemoveRemoteGrain(id) p.RemoveRemoteGrain(id)
p.HandleHostError(host) p.HandleHostError(host)
log.Printf("Remote grain %s died, host: %s\n", id.String(), host) log.Printf("Remote grain %s died, host: %s\n", id.String(), host)
@@ -371,7 +371,7 @@ func (p *SyncedPool) AddRemote(host string) error {
} }
p.remotes[host] = &remote p.remotes[host] = &remote
go func() { go func() {
<-remote.Died <-remote.PersistentConnection.Died
log.Printf("Removing host, remote died %s", host) log.Printf("Removing host, remote died %s", host)
p.RemoveHost(&remote) p.RemoveHost(&remote)
}() }()
@@ -381,12 +381,12 @@ func (p *SyncedPool) AddRemote(host string) error {
for err != nil { for err != nil {
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
err = remote.Ping()
if !remote.IsHealthy() { if !remote.IsHealthy() {
log.Printf("Removing host, unable to communicate with %s", host) log.Printf("Removing host, unable to communicate with %s", host)
p.RemoveHost(&remote) p.RemoveHost(&remote)
break return
} }
err = remote.Ping()
} }
} }
}() }()

View File

@@ -25,14 +25,14 @@ func CartDial(address string) (*CartClient, error) {
} }
func (c *Client) Close() { func (c *Client) Close() {
c.Conn.Close() c.PersistentConnection.Close()
} }
type CartTCPClient struct { type CartTCPClient struct {
*PersistentConnection PersistentConnection *PersistentConnection
sendMux sync.Mutex sendMux sync.Mutex
ErrorCount int ErrorCount int
address string address string
*CartPacketQueue *CartPacketQueue
} }
@@ -52,31 +52,31 @@ func NewCartTCPClient(address string) (*CartTCPClient, error) {
func (m *CartTCPClient) SendPacket(messageType CartMessage, id CartId, data []byte) error { func (m *CartTCPClient) SendPacket(messageType CartMessage, id CartId, data []byte) error {
m.sendMux.Lock() m.sendMux.Lock()
defer m.sendMux.Unlock() defer m.sendMux.Unlock()
m.Conn.Write(header[:]) m.PersistentConnection.Conn.Write(header[:])
err := binary.Write(m.Conn, binary.LittleEndian, CartPacket{ err := binary.Write(m.PersistentConnection, binary.LittleEndian, CartPacket{
Version: CurrentPacketVersion, Version: CurrentPacketVersion,
MessageType: messageType, MessageType: messageType,
DataLength: uint32(len(data)), DataLength: uint32(len(data)),
Id: id, Id: id,
}) })
if err != nil { if err != nil {
return m.HandleConnectionError(err) return m.PersistentConnection.HandleConnectionError(err)
} }
_, err = m.Conn.Write(data) _, err = m.PersistentConnection.Write(data)
return m.HandleConnectionError(err) return m.PersistentConnection.HandleConnectionError(err)
} }
func (m *CartTCPClient) Call(messageType CartMessage, id CartId, responseType CartMessage, data []byte) (*CallResult, error) { func (m *CartTCPClient) Call(messageType CartMessage, id CartId, responseType CartMessage, data []byte) (*CallResult, error) {
packetChan := m.Expect(responseType, id) packetChan := m.Expect(responseType, id)
err := m.SendPacket(messageType, id, data) err := m.SendPacket(messageType, id, data)
if err != nil { if err != nil {
return nil, m.HandleConnectionError(err) return nil, m.PersistentConnection.HandleConnectionError(err)
} }
select { select {
case ret := <-packetChan: case ret := <-packetChan:
return &ret, nil return &ret, nil
case <-time.After(time.Second): case <-time.After(time.Second):
log.Printf("Timeout waiting for cart response to message type %d\n", responseType) log.Printf("Timeout waiting for cart response to message type %d\n", responseType)
return nil, m.HandleConnectionError(fmt.Errorf("timeout")) return nil, m.PersistentConnection.HandleConnectionError(fmt.Errorf("timeout"))
} }
} }

View File

@@ -40,7 +40,7 @@ func TestCartTcpHelpers(t *testing.T) {
t.Errorf("Error calling: %v\n", err) t.Errorf("Error calling: %v\n", err)
} }
s, err := client.Call(666, id, 3, []byte("Hello, server!")) s, err := client.Call(666, id, 3, []byte("Hello, server!"))
client.Close() client.PersistentConnection.Close()
if err != nil { if err != nil {
t.Errorf("Error calling: %v\n", err) t.Errorf("Error calling: %v\n", err)
} }

View File

@@ -26,10 +26,10 @@ func Dial(address string) (*Client, error) {
} }
type TCPClient struct { type TCPClient struct {
*PersistentConnection PersistentConnection *PersistentConnection
sendMux sync.Mutex sendMux sync.Mutex
ErrorCount int ErrorCount int
address string address string
*PacketQueue *PacketQueue
} }
@@ -103,18 +103,18 @@ var (
func (m *TCPClient) SendPacket(messageType PoolMessage, data []byte) error { func (m *TCPClient) SendPacket(messageType PoolMessage, data []byte) error {
m.sendMux.Lock() m.sendMux.Lock()
defer m.sendMux.Unlock() defer m.sendMux.Unlock()
m.Conn.Write(header[:]) m.PersistentConnection.Write(header[:])
err := binary.Write(m.Conn, binary.LittleEndian, Packet{ err := binary.Write(m.PersistentConnection, binary.LittleEndian, Packet{
Version: CurrentPacketVersion, Version: CurrentPacketVersion,
MessageType: messageType, MessageType: messageType,
StatusCode: 0, StatusCode: 0,
DataLength: uint32(len(data)), DataLength: uint32(len(data)),
}) })
if err != nil { if err != nil {
return m.HandleConnectionError(err) return m.PersistentConnection.HandleConnectionError(err)
} }
_, err = m.Conn.Write(data) _, err = m.PersistentConnection.Write(data)
return m.HandleConnectionError(err) return m.PersistentConnection.HandleConnectionError(err)
} }
func (m *TCPClient) Call(messageType PoolMessage, responseType PoolMessage, data []byte) (*CallResult, error) { func (m *TCPClient) Call(messageType PoolMessage, responseType PoolMessage, data []byte) (*CallResult, error) {
@@ -122,7 +122,7 @@ func (m *TCPClient) Call(messageType PoolMessage, responseType PoolMessage, data
err := m.SendPacket(messageType, data) err := m.SendPacket(messageType, data)
if err != nil { if err != nil {
m.RemoveListeners() m.RemoveListeners()
return nil, m.HandleConnectionError(err) return nil, m.PersistentConnection.HandleConnectionError(err)
} }
select { select {
@@ -130,6 +130,6 @@ func (m *TCPClient) Call(messageType PoolMessage, responseType PoolMessage, data
return &ret, nil return &ret, nil
case <-time.After(time.Second): case <-time.After(time.Second):
log.Printf("Timeout waiting for cart response to message type %d\n", responseType) log.Printf("Timeout waiting for cart response to message type %d\n", responseType)
return nil, m.HandleConnectionError(fmt.Errorf("timeout")) return nil, m.PersistentConnection.HandleConnectionError(fmt.Errorf("timeout"))
} }
} }