From e63a92213b645312d0559a17e7c2f7ba4cbaae31 Mon Sep 17 00:00:00 2001 From: matst80 Date: Mon, 11 Nov 2024 19:40:29 +0100 Subject: [PATCH] more efficient read --- cart-packet-queue.go | 49 +++++++++++++++++++++++++------------------- packet-queue.go | 45 ++++++++++++++++++++++++---------------- 2 files changed, 55 insertions(+), 39 deletions(-) diff --git a/cart-packet-queue.go b/cart-packet-queue.go index 3e3f061..47043c8 100644 --- a/cart-packet-queue.go +++ b/cart-packet-queue.go @@ -34,9 +34,7 @@ func NewCartPacketQueue(connection net.Conn) *CartPacketQueue { var packet CartPacket for { err := ReadCartPacket(connection, &packet) - ts := time.Now() if err != nil { - if err == io.EOF { return } @@ -49,30 +47,39 @@ func NewCartPacketQueue(connection net.Conn) *CartPacketQueue { log.Printf("Error receiving packet data: %v\n", err) return } - - queue.mu.Lock() - - l := make([]CartPacketWithData, 0, len(queue.Packets)) - - for _, packet := range queue.Packets { - if !packet.Consumed && packet.Added.After(ts.Add(-time.Second)) { - l = append(l, packet) - } - } - - queue.Packets = append(l, CartPacketWithData{ - MessageType: packet.MessageType, - Id: packet.Id, - Added: ts, - Data: data, - }) - queue.mu.Unlock() - + go queue.HandleData(packet.MessageType, packet.Id, data) } }() return queue } +func (p *CartPacketQueue) HandleData(t uint32, id CartId, data []byte) { + ts := time.Now() + cap := 150 + l := make([]CartPacketWithData, 0, cap) + p.mu.RLock() + breakAt := ts.Add(-time.Millisecond * 250) + for _, packet := range p.Packets { + if !packet.Consumed && packet.Added.After(breakAt) { + l = append(l, packet) + if len(l) >= cap { + break + } + } + } + p.mu.RUnlock() + p.mu.Lock() + p.Packets = append([]CartPacketWithData{ + { + MessageType: t, + Id: id, + Added: ts, + Data: data, + }, + }, l...) + p.mu.Unlock() +} + func (p *CartPacketQueue) Expect(messageType uint32, id CartId, timeToWait time.Duration) (*CartPacketWithData, error) { start := time.Now().Add(-time.Millisecond) diff --git a/packet-queue.go b/packet-queue.go index 109cb1b..8b27d1c 100644 --- a/packet-queue.go +++ b/packet-queue.go @@ -33,9 +33,7 @@ func NewPacketQueue(connection net.Conn) *PacketQueue { var packet Packet for { err := ReadPacket(connection, &packet) - ts := time.Now() if err != nil { - if err == io.EOF { return } @@ -46,28 +44,39 @@ func NewPacketQueue(connection net.Conn) *PacketQueue { if err != nil { log.Printf("Error receiving packet data: %v\n", err) } - queue.mu.Lock() - l := make([]PacketWithData, 0, len(queue.Packets)) - - for _, packet := range queue.Packets { - if !packet.Consumed && packet.Added.After(ts.Add(-time.Second)) { - l = append(l, packet) - } - } - packetQueue.Set(float64(len(queue.Packets))) - queue.Packets = append(l, PacketWithData{ - MessageType: packet.MessageType, - Added: ts, - Data: data, - }) - queue.mu.Unlock() - packetsReceived.Inc() + go queue.HandleData(packet.MessageType, data) } }() return queue } +func (p *PacketQueue) HandleData(t uint32, data []byte) { + ts := time.Now() + cap := 150 + l := make([]PacketWithData, 0, cap) + p.mu.RLock() + breakAt := ts.Add(-time.Millisecond * 250) + for _, packet := range p.Packets { + if !packet.Consumed && packet.Added.After(breakAt) { + l = append(l, packet) + if len(l) >= cap { + break + } + } + } + p.mu.RUnlock() + p.mu.Lock() + p.Packets = append([]PacketWithData{ + { + MessageType: t, + Added: ts, + Data: data, + }, + }, l...) + p.mu.Unlock() +} + func (p *PacketQueue) Expect(messageType uint32, timeToWait time.Duration) (*PacketWithData, error) { start := time.Now().Add(-time.Millisecond)