mux sending
All checks were successful
Build and Publish / BuildAndDeployAmd64 (push) Successful in 27s
Build and Publish / BuildAndDeploy (push) Successful in 2m19s

This commit is contained in:
matst80
2024-11-12 22:35:03 +01:00
parent e1b254668f
commit addec8788f
4 changed files with 14 additions and 0 deletions

View File

@@ -4,6 +4,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"log" "log"
"sync"
"time" "time"
) )
@@ -29,6 +30,7 @@ func (c *Client) Close() {
type CartTCPClient struct { type CartTCPClient struct {
*PersistentConnection *PersistentConnection
sendMux sync.Mutex
ErrorCount int ErrorCount int
address string address string
*CartPacketQueue *CartPacketQueue
@@ -48,6 +50,8 @@ func NewCartTCPClient(address string) (*CartTCPClient, error) {
} }
func (m *CartTCPClient) SendPacket(messageType CartMessage, id CartId, data []byte) error { func (m *CartTCPClient) SendPacket(messageType CartMessage, id CartId, data []byte) error {
m.sendMux.Lock()
defer m.sendMux.Unlock()
m.Conn.Write(header[:]) m.Conn.Write(header[:])
err := binary.Write(m.Conn, binary.LittleEndian, CartPacket{ err := binary.Write(m.Conn, binary.LittleEndian, CartPacket{
Version: CurrentPacketVersion, Version: CurrentPacketVersion,

View File

@@ -37,6 +37,7 @@ func CartListen(address string) (*CartServer, error) {
type TCPCartServerMux struct { type TCPCartServerMux struct {
mu sync.RWMutex mu sync.RWMutex
sendMux sync.Mutex
listeners map[CartMessage]func(CartId, []byte) error listeners map[CartMessage]func(CartId, []byte) error
functions map[CartMessage]func(CartId, []byte) (CartMessage, []byte, error) functions map[CartMessage]func(CartId, []byte) (CartMessage, []byte, error)
} }
@@ -68,6 +69,8 @@ func (m *TCPCartServerMux) handleFunction(connection net.Conn, messageType CartM
m.mu.RLock() m.mu.RLock()
fn, ok := m.functions[messageType] fn, ok := m.functions[messageType]
m.mu.RUnlock() m.mu.RUnlock()
m.sendMux.Lock()
defer m.sendMux.Unlock()
if ok { if ok {
responseType, responseData, err := fn(id, data) responseType, responseData, err := fn(id, data)
connection.Write(header[:]) connection.Write(header[:])

View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"log" "log"
"net" "net"
"sync"
"time" "time"
) )
@@ -26,6 +27,7 @@ func Dial(address string) (*Client, error) {
type TCPClient struct { type TCPClient struct {
*PersistentConnection *PersistentConnection
sendMux sync.Mutex
ErrorCount int ErrorCount int
address string address string
*PacketQueue *PacketQueue
@@ -99,6 +101,8 @@ var (
) )
func (m *TCPClient) SendPacket(messageType PoolMessage, data []byte) error { func (m *TCPClient) SendPacket(messageType PoolMessage, data []byte) error {
m.sendMux.Lock()
defer m.sendMux.Unlock()
m.Conn.Write(header[:]) m.Conn.Write(header[:])
err := binary.Write(m.Conn, binary.LittleEndian, Packet{ err := binary.Write(m.Conn, binary.LittleEndian, Packet{
Version: CurrentPacketVersion, Version: CurrentPacketVersion,

View File

@@ -37,6 +37,7 @@ func Listen(address string) (*Server, error) {
type TCPServerMux struct { type TCPServerMux struct {
mu sync.RWMutex mu sync.RWMutex
sendMux sync.Mutex
listeners map[PoolMessage]func(data []byte) error listeners map[PoolMessage]func(data []byte) error
functions map[PoolMessage]func(data []byte) (PoolMessage, []byte, error) functions map[PoolMessage]func(data []byte) (PoolMessage, []byte, error)
} }
@@ -68,6 +69,8 @@ func (m *TCPServerMux) handleFunction(connection net.Conn, messageType PoolMessa
m.mu.RLock() m.mu.RLock()
function, ok := m.functions[messageType] function, ok := m.functions[messageType]
m.mu.RUnlock() m.mu.RUnlock()
m.sendMux.Lock()
defer m.sendMux.Unlock()
if ok { if ok {
connection.Write(header[:]) connection.Write(header[:])
responseType, responseData, err := function(data) responseType, responseData, err := function(data)