From addec8788f3e374448fd53a4d0a6a21d7a5228b6 Mon Sep 17 00:00:00 2001 From: matst80 Date: Tue, 12 Nov 2024 22:35:03 +0100 Subject: [PATCH] mux sending --- tcp-cart-client.go | 4 ++++ tcp-cart-mux-server.go | 3 +++ tcp-client.go | 4 ++++ tcp-mux-server.go | 3 +++ 4 files changed, 14 insertions(+) diff --git a/tcp-cart-client.go b/tcp-cart-client.go index 52adb02..a195522 100644 --- a/tcp-cart-client.go +++ b/tcp-cart-client.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "fmt" "log" + "sync" "time" ) @@ -29,6 +30,7 @@ func (c *Client) Close() { type CartTCPClient struct { *PersistentConnection + sendMux sync.Mutex ErrorCount int address string *CartPacketQueue @@ -48,6 +50,8 @@ func NewCartTCPClient(address string) (*CartTCPClient, error) { } func (m *CartTCPClient) SendPacket(messageType CartMessage, id CartId, data []byte) error { + m.sendMux.Lock() + defer m.sendMux.Unlock() m.Conn.Write(header[:]) err := binary.Write(m.Conn, binary.LittleEndian, CartPacket{ Version: CurrentPacketVersion, diff --git a/tcp-cart-mux-server.go b/tcp-cart-mux-server.go index 51433e2..0ca4100 100644 --- a/tcp-cart-mux-server.go +++ b/tcp-cart-mux-server.go @@ -37,6 +37,7 @@ func CartListen(address string) (*CartServer, error) { type TCPCartServerMux struct { mu sync.RWMutex + sendMux sync.Mutex listeners map[CartMessage]func(CartId, []byte) error functions map[CartMessage]func(CartId, []byte) (CartMessage, []byte, error) } @@ -68,6 +69,8 @@ func (m *TCPCartServerMux) handleFunction(connection net.Conn, messageType CartM m.mu.RLock() fn, ok := m.functions[messageType] m.mu.RUnlock() + m.sendMux.Lock() + defer m.sendMux.Unlock() if ok { responseType, responseData, err := fn(id, data) connection.Write(header[:]) diff --git a/tcp-client.go b/tcp-client.go index 5bb6d33..eb9f71a 100644 --- a/tcp-client.go +++ b/tcp-client.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "net" + "sync" "time" ) @@ -26,6 +27,7 @@ func Dial(address string) (*Client, error) { type TCPClient struct { *PersistentConnection + sendMux sync.Mutex ErrorCount int address string *PacketQueue @@ -99,6 +101,8 @@ var ( ) func (m *TCPClient) SendPacket(messageType PoolMessage, data []byte) error { + m.sendMux.Lock() + defer m.sendMux.Unlock() m.Conn.Write(header[:]) err := binary.Write(m.Conn, binary.LittleEndian, Packet{ Version: CurrentPacketVersion, diff --git a/tcp-mux-server.go b/tcp-mux-server.go index ff2f0d3..a5c9c28 100644 --- a/tcp-mux-server.go +++ b/tcp-mux-server.go @@ -37,6 +37,7 @@ func Listen(address string) (*Server, error) { type TCPServerMux struct { mu sync.RWMutex + sendMux sync.Mutex listeners map[PoolMessage]func(data []byte) error functions map[PoolMessage]func(data []byte) (PoolMessage, []byte, error) } @@ -68,6 +69,8 @@ func (m *TCPServerMux) handleFunction(connection net.Conn, messageType PoolMessa m.mu.RLock() function, ok := m.functions[messageType] m.mu.RUnlock() + m.sendMux.Lock() + defer m.sendMux.Unlock() if ok { connection.Write(header[:]) responseType, responseData, err := function(data)