Files
go-cart-actor/packet-queue.go
matst80 110b5f00b8
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m46s
change check
2024-11-10 11:21:24 +01:00

85 lines
1.7 KiB
Go

package main
import (
"fmt"
"io"
"log"
"net"
"sync"
"time"
)
type PacketWithData struct {
MessageType uint16
Added time.Time
Data []byte
}
type PacketQueue struct {
mu sync.RWMutex
Packets []PacketWithData
connection net.Conn
}
func NewPacketQueue(connection net.Conn) *PacketQueue {
queue := &PacketQueue{
Packets: make([]PacketWithData, 0),
connection: connection,
}
go func() {
defer connection.Close()
for {
messageType, data, err := ReceivePacket(queue.connection)
ts := time.Now()
if err != nil {
log.Printf("Error receiving packet: %v\n", err)
if err == io.EOF {
return
}
//return
}
queue.mu.Lock()
for i, packet := range queue.Packets {
if time.Since(packet.Added) > time.Second {
stillInQueue := queue.Packets[i:]
log.Printf("DEBUG: Requeueing %v packets\n", stillInQueue)
queue.Packets = stillInQueue
packetQueue.Set(float64(len(queue.Packets)))
break
}
}
queue.Packets = append(queue.Packets, PacketWithData{
MessageType: messageType,
Added: ts,
Data: data,
})
queue.mu.Unlock()
packetsReceived.Inc()
packetQueue.Inc()
}
}()
return queue
}
func (p *PacketQueue) Expect(messageType uint16, timeToWait time.Duration) (*PacketWithData, error) {
start := time.Now().Add(-time.Millisecond)
for {
if time.Since(start) > timeToWait {
return nil, fmt.Errorf("timeout waiting for message type %d", messageType)
}
p.mu.RLock()
for _, packet := range p.Packets {
if packet.MessageType == messageType && packet.Added.After(start) {
p.mu.RUnlock()
return &packet, nil
}
}
p.mu.RUnlock()
time.Sleep(time.Millisecond * 5)
}
}