diff --git a/cart-packet-queue.go b/cart-packet-queue.go index 47043c8..e420a31 100644 --- a/cart-packet-queue.go +++ b/cart-packet-queue.go @@ -23,10 +23,12 @@ type CartPacketQueue struct { //connection net.Conn } +const cartCap = 150 + func NewCartPacketQueue(connection net.Conn) *CartPacketQueue { queue := &CartPacketQueue{ - Packets: make([]CartPacketWithData, 0), + Packets: make([]CartPacketWithData, 0, cartCap), //connection: connection, } go func() { @@ -55,14 +57,13 @@ func NewCartPacketQueue(connection net.Conn) *CartPacketQueue { func (p *CartPacketQueue) HandleData(t uint32, id CartId, data []byte) { ts := time.Now() - cap := 150 - l := make([]CartPacketWithData, 0, cap) + l := make([]CartPacketWithData, 0, cartCap) p.mu.RLock() breakAt := ts.Add(-time.Millisecond * 250) for _, packet := range p.Packets { if !packet.Consumed && packet.Added.After(breakAt) { l = append(l, packet) - if len(l) >= cap { + if len(l) >= cartCap { break } } @@ -89,7 +90,7 @@ func (p *CartPacketQueue) Expect(messageType uint32, id CartId, timeToWait time. } p.mu.RLock() for _, packet := range p.Packets { - if packet.MessageType == messageType && packet.Id == id && packet.Added.After(start) { + if !packet.Consumed && packet.MessageType == messageType && packet.Id == id && packet.Added.After(start) { packet.Consumed = true p.mu.RUnlock() return &packet, nil diff --git a/packet-queue.go b/packet-queue.go index 8b27d1c..c1b591b 100644 --- a/packet-queue.go +++ b/packet-queue.go @@ -22,10 +22,12 @@ type PacketQueue struct { //connection net.Conn } +const cap = 150 + func NewPacketQueue(connection net.Conn) *PacketQueue { queue := &PacketQueue{ - Packets: make([]PacketWithData, 0), + Packets: make([]PacketWithData, 0, cap), //connection: connection, } go func() { @@ -53,7 +55,6 @@ func NewPacketQueue(connection net.Conn) *PacketQueue { func (p *PacketQueue) HandleData(t uint32, data []byte) { ts := time.Now() - cap := 150 l := make([]PacketWithData, 0, cap) p.mu.RLock() breakAt := ts.Add(-time.Millisecond * 250) @@ -85,14 +86,13 @@ func (p *PacketQueue) Expect(messageType uint32, timeToWait time.Duration) (*Pac return nil, fmt.Errorf("timeout waiting for message type %d", messageType) } p.mu.RLock() + defer p.mu.RUnlock() for _, packet := range p.Packets { - if packet.MessageType == messageType && packet.Added.After(start) { + if !packet.Consumed && packet.MessageType == messageType && packet.Added.After(start) { packet.Consumed = true - p.mu.RUnlock() return &packet, nil } } - p.mu.RUnlock() - time.Sleep(time.Millisecond * 2) + time.Sleep(time.Millisecond * 4) } } diff --git a/packet.go b/packet.go index 9a4b194..29a2f55 100644 --- a/packet.go +++ b/packet.go @@ -43,16 +43,16 @@ func GetPacketData(conn io.Reader, len uint64) ([]byte, error) { return data, err } -func ReceivePacket(conn io.Reader) (uint32, []byte, error) { - var packet Packet - err := ReadPacket(conn, &packet) - if err != nil { - return 0, nil, err - } +// func ReceivePacket(conn io.Reader) (uint32, []byte, error) { +// var packet Packet +// err := ReadPacket(conn, &packet) +// if err != nil { +// return 0, nil, err +// } - data, err := GetPacketData(conn, packet.DataLength) - if err != nil { - return 0, nil, err - } - return packet.MessageType, data, nil -} +// data, err := GetPacketData(conn, packet.DataLength) +// if err != nil { +// return 0, nil, err +// } +// return packet.MessageType, data, nil +// } diff --git a/tcp-mux-server.go b/tcp-mux-server.go index 0130034..dc7ce0b 100644 --- a/tcp-mux-server.go +++ b/tcp-mux-server.go @@ -90,8 +90,9 @@ func (m *TCPServerMux) handleFunction(connection net.Conn, messageType uint32, d func (m *TCPServerMux) HandleConnection(connection net.Conn) error { defer connection.Close() + var packet Packet for { - messageType, data, err := ReceivePacket(connection) + err := ReadPacket(connection, &packet) if err != nil { if err == io.EOF { return nil @@ -99,18 +100,21 @@ func (m *TCPServerMux) HandleConnection(connection net.Conn) error { log.Printf("Error receiving packet: %v\n", err) return err } - - status, err := m.handleListener(messageType, data) + data, err := GetPacketData(connection, packet.DataLength) + if err != nil { + log.Printf("Error receiving packet data: %v\n", err) + } + status, err := m.handleListener(packet.MessageType, data) if err != nil { log.Printf("Error handling listener: %v\n", err) } if !status { - status, err = m.handleFunction(connection, messageType, data) + status, err = m.handleFunction(connection, packet.MessageType, data) if err != nil { log.Printf("Error handling function: %v\n", err) } if !status { - log.Printf("Unknown message type: %d\n", messageType) + log.Printf("Unknown message type: %d\n", packet.MessageType) } } }