package main import ( "fmt" "io" "log" "net" "sync" "time" ) type PacketWithData struct { MessageType uint16 Added time.Time Consumed bool Data []byte } type PacketQueue struct { mu sync.RWMutex Packets []PacketWithData //connection net.Conn } func NewPacketQueue(connection net.Conn) *PacketQueue { queue := &PacketQueue{ Packets: make([]PacketWithData, 0), //connection: connection, } go func() { defer connection.Close() var packet Packet for { err := ReadPacket(connection, &packet) ts := time.Now() if err != nil { if err == io.EOF { return } log.Printf("Error receiving packet: %v\n", err) //return } data, err := GetPacketData(connection, packet.DataLength) if err != nil { log.Printf("Error receiving packet data: %v\n", err) } queue.mu.Lock() l := make([]PacketWithData, 0, len(queue.Packets)) for _, packet := range queue.Packets { if !packet.Consumed && packet.Added.After(ts.Add(-time.Second)) { l = append(l, packet) } } queue.Packets = append(l, PacketWithData{ MessageType: packet.MessageType, Added: ts, Data: data, }) queue.mu.Unlock() packetsReceived.Inc() packetQueue.Set(float64(len(queue.Packets))) } }() return queue } func (p *PacketQueue) Expect(messageType uint16, timeToWait time.Duration) (*PacketWithData, error) { start := time.Now().Add(-time.Millisecond) for { if time.Since(start) > timeToWait { return nil, fmt.Errorf("timeout waiting for message type %d", messageType) } p.mu.RLock() for _, packet := range p.Packets { if packet.MessageType == messageType && packet.Added.After(start) { packet.Consumed = true p.mu.RUnlock() return &packet, nil } } p.mu.RUnlock() time.Sleep(time.Millisecond * 2) } }