From 110b5f00b8a6622afeb827b4dfd4ca7529590bd7 Mon Sep 17 00:00:00 2001 From: matst80 Date: Sun, 10 Nov 2024 11:21:24 +0100 Subject: [PATCH] change check --- packet-queue.go | 84 +++++++++++++++++++++++++++++++++++++++++++++++++ synced-pool.go | 74 ------------------------------------------- 2 files changed, 84 insertions(+), 74 deletions(-) create mode 100644 packet-queue.go diff --git a/packet-queue.go b/packet-queue.go new file mode 100644 index 0000000..55d121d --- /dev/null +++ b/packet-queue.go @@ -0,0 +1,84 @@ +package main + +import ( + "fmt" + "io" + "log" + "net" + "sync" + "time" +) + +type PacketWithData struct { + MessageType uint16 + Added time.Time + Data []byte +} + +type PacketQueue struct { + mu sync.RWMutex + Packets []PacketWithData + connection net.Conn +} + +func NewPacketQueue(connection net.Conn) *PacketQueue { + + queue := &PacketQueue{ + Packets: make([]PacketWithData, 0), + connection: connection, + } + go func() { + defer connection.Close() + for { + messageType, data, err := ReceivePacket(queue.connection) + ts := time.Now() + if err != nil { + log.Printf("Error receiving packet: %v\n", err) + if err == io.EOF { + return + } + + //return + } + + queue.mu.Lock() + for i, packet := range queue.Packets { + if time.Since(packet.Added) > time.Second { + stillInQueue := queue.Packets[i:] + log.Printf("DEBUG: Requeueing %v packets\n", stillInQueue) + queue.Packets = stillInQueue + packetQueue.Set(float64(len(queue.Packets))) + break + } + } + queue.Packets = append(queue.Packets, PacketWithData{ + MessageType: messageType, + Added: ts, + Data: data, + }) + queue.mu.Unlock() + packetsReceived.Inc() + packetQueue.Inc() + } + }() + return queue +} + +func (p *PacketQueue) Expect(messageType uint16, timeToWait time.Duration) (*PacketWithData, error) { + start := time.Now().Add(-time.Millisecond) + + for { + if time.Since(start) > timeToWait { + return nil, fmt.Errorf("timeout waiting for message type %d", messageType) + } + p.mu.RLock() + for _, packet := range p.Packets { + if packet.MessageType == messageType && packet.Added.After(start) { + p.mu.RUnlock() + return &packet, nil + } + } + p.mu.RUnlock() + time.Sleep(time.Millisecond * 5) + } +} diff --git a/synced-pool.go b/synced-pool.go index 08f45fc..77ceaa4 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -205,80 +205,6 @@ const ( CartIdsResponse = uint16(10) ) -type PacketWithData struct { - MessageType uint16 - Added time.Time - Data []byte -} - -type PacketQueue struct { - mu sync.RWMutex - Packets []PacketWithData - connection net.Conn -} - -func NewPacketQueue(connection net.Conn) *PacketQueue { - - queue := &PacketQueue{ - Packets: make([]PacketWithData, 0), - connection: connection, - } - go func() { - defer connection.Close() - for { - messageType, data, err := ReceivePacket(queue.connection) - ts := time.Now() - if err != nil { - log.Printf("Error receiving packet: %v\n", err) - if err == io.EOF { - return - } - - //return - } - - queue.mu.Lock() - for i, packet := range queue.Packets { - if time.Since(packet.Added) < time.Second { - stillInQueue := queue.Packets[i:] - log.Printf("DEBUG: Requeueing %v packets\n", stillInQueue) - queue.Packets = stillInQueue - packetQueue.Set(float64(len(queue.Packets))) - break - } - } - queue.Packets = append(queue.Packets, PacketWithData{ - MessageType: messageType, - Added: ts, - Data: data, - }) - queue.mu.Unlock() - packetsReceived.Inc() - packetQueue.Inc() - } - }() - return queue -} - -func (p *PacketQueue) Expect(messageType uint16, timeToWait time.Duration) (*PacketWithData, error) { - start := time.Now().Add(-time.Millisecond) - - for { - if time.Since(start) > timeToWait { - return nil, fmt.Errorf("timeout waiting for message type %d", messageType) - } - p.mu.RLock() - for _, packet := range p.Packets { - if packet.MessageType == messageType && packet.Added.After(start) { - p.mu.RUnlock() - return &packet, nil - } - } - p.mu.RUnlock() - time.Sleep(time.Millisecond * 5) - } -} - func (p *SyncedPool) handleConnection(conn net.Conn) { defer conn.Close() var packet Packet