package main import ( "fmt" "io" "log" "net" "sync" "time" ) type PacketWithData struct { MessageType uint16 Added time.Time Consumed bool Data []byte } type PacketQueue struct { mu sync.RWMutex Packets []PacketWithData connection net.Conn } func NewPacketQueue(connection net.Conn) *PacketQueue { queue := &PacketQueue{ Packets: make([]PacketWithData, 0), connection: connection, } go func() { defer connection.Close() for { messageType, data, err := ReceivePacket(queue.connection) ts := time.Now() if err != nil { log.Printf("Error receiving packet: %v\n", err) if err == io.EOF { return } //return } queue.mu.Lock() l := make([]PacketWithData, 0, len(queue.Packets)) for _, packet := range queue.Packets { if !packet.Consumed { l = append(l, packet) } } queue.Packets = append(l, PacketWithData{ MessageType: messageType, Added: ts, Data: data, }) queue.mu.Unlock() packetsReceived.Inc() packetQueue.Set(float64(len(queue.Packets))) } }() return queue } func (p *PacketQueue) Expect(messageType uint16, timeToWait time.Duration) (*PacketWithData, error) { start := time.Now().Add(-time.Millisecond) for { if time.Since(start) > timeToWait { return nil, fmt.Errorf("timeout waiting for message type %d", messageType) } p.mu.RLock() for _, packet := range p.Packets { if packet.MessageType == messageType && packet.Added.After(start) { packet.Consumed = true p.mu.RUnlock() return &packet, nil } } p.mu.RUnlock() time.Sleep(time.Millisecond * 2) } }