package main import ( "fmt" "io" "log" "net" "sync" "time" ) type CartPacketWithData struct { MessageType uint32 Id CartId Added time.Time Consumed bool Data []byte } type CartPacketQueue struct { mu sync.RWMutex Packets []CartPacketWithData //connection net.Conn } const cartCap = 150 func NewCartPacketQueue(connection net.Conn) *CartPacketQueue { queue := &CartPacketQueue{ Packets: make([]CartPacketWithData, 0, cartCap), //connection: connection, } go func() { defer connection.Close() var packet CartPacket for { err := ReadCartPacket(connection, &packet) if err != nil { if err == io.EOF { return } log.Printf("Error receiving packet: %v\n", err) //return } data, err := GetPacketData(connection, packet.DataLength) if err != nil { log.Printf("Error receiving packet data: %v\n", err) return } go queue.HandleData(packet.MessageType, packet.Id, data) } }() return queue } func (p *CartPacketQueue) HandleData(t uint32, id CartId, data []byte) { ts := time.Now() l := make([]CartPacketWithData, 0, cartCap) p.mu.RLock() breakAt := ts.Add(-time.Millisecond * 250) for _, packet := range p.Packets { if !packet.Consumed && packet.Added.After(breakAt) { l = append(l, packet) if len(l) >= cartCap { break } } } p.mu.RUnlock() p.mu.Lock() p.Packets = append([]CartPacketWithData{ { MessageType: t, Id: id, Added: ts, Data: data, }, }, l...) p.mu.Unlock() } func (p *CartPacketQueue) Expect(messageType uint32, id CartId, timeToWait time.Duration) (*CartPacketWithData, error) { start := time.Now().Add(-time.Millisecond) for { if time.Since(start) > timeToWait { return nil, fmt.Errorf("timeout waiting for message type %d", messageType) } p.mu.RLock() for _, packet := range p.Packets { if !packet.Consumed && packet.MessageType == messageType && packet.Id == id && packet.Added.After(start) { packet.Consumed = true p.mu.RUnlock() return &packet, nil } } p.mu.RUnlock() time.Sleep(time.Millisecond * 2) } }