package main import ( "fmt" "io" "log" "net" "sync" "time" ) type PacketWithData struct { MessageType uint32 Added time.Time Consumed bool Data []byte } type PacketQueue struct { mu sync.RWMutex Packets []PacketWithData //connection net.Conn } func NewPacketQueue(connection net.Conn) *PacketQueue { queue := &PacketQueue{ Packets: make([]PacketWithData, 0), //connection: connection, } go func() { defer connection.Close() var packet Packet for { err := ReadPacket(connection, &packet) if err != nil { if err == io.EOF { return } log.Printf("Error receiving packet: %v\n", err) //return } data, err := GetPacketData(connection, packet.DataLength) if err != nil { log.Printf("Error receiving packet data: %v\n", err) } go queue.HandleData(packet.MessageType, data) } }() return queue } func (p *PacketQueue) HandleData(t uint32, data []byte) { ts := time.Now() cap := 150 l := make([]PacketWithData, 0, cap) p.mu.RLock() breakAt := ts.Add(-time.Millisecond * 250) for _, packet := range p.Packets { if !packet.Consumed && packet.Added.After(breakAt) { l = append(l, packet) if len(l) >= cap { break } } } p.mu.RUnlock() p.mu.Lock() p.Packets = append([]PacketWithData{ { MessageType: t, Added: ts, Data: data, }, }, l...) p.mu.Unlock() } func (p *PacketQueue) Expect(messageType uint32, timeToWait time.Duration) (*PacketWithData, error) { start := time.Now().Add(-time.Millisecond) for { if time.Since(start) > timeToWait { return nil, fmt.Errorf("timeout waiting for message type %d", messageType) } p.mu.RLock() for _, packet := range p.Packets { if packet.MessageType == messageType && packet.Added.After(start) { packet.Consumed = true p.mu.RUnlock() return &packet, nil } } p.mu.RUnlock() time.Sleep(time.Millisecond * 2) } }