package main import ( "fmt" "io" "log" "net" "sync" "time" ) type CartPacketWithData struct { MessageType uint16 Id CartId Added time.Time Consumed bool Data []byte } type CartPacketQueue struct { mu sync.RWMutex Packets []CartPacketWithData //connection net.Conn } func NewCartPacketQueue(connection net.Conn) *CartPacketQueue { queue := &CartPacketQueue{ Packets: make([]CartPacketWithData, 0), //connection: connection, } go func() { defer connection.Close() var packet CartPacket for { err := ReadPacket(connection, &packet) ts := time.Now() if err != nil { if err == io.EOF { return } log.Printf("Error receiving packet: %v\n", err) //return } data, err := GetPacketData(connection, int(packet.DataLength)) if err != nil { log.Printf("Error receiving packet data: %v\n", err) return } queue.mu.Lock() l := make([]CartPacketWithData, 0, len(queue.Packets)) for _, packet := range queue.Packets { if !packet.Consumed && packet.Added.After(ts.Add(-time.Second)) { l = append(l, packet) } } queue.Packets = append(l, CartPacketWithData{ MessageType: packet.MessageType, Id: packet.Id, Added: ts, Data: data, }) queue.mu.Unlock() } }() return queue } func (p *CartPacketQueue) Expect(messageType uint16, id CartId, timeToWait time.Duration) (*CartPacketWithData, error) { start := time.Now().Add(-time.Millisecond) for { if time.Since(start) > timeToWait { return nil, fmt.Errorf("timeout waiting for message type %d", messageType) } p.mu.RLock() for _, packet := range p.Packets { if packet.MessageType == messageType && packet.Id == id && packet.Added.After(start) { packet.Consumed = true p.mu.RUnlock() return &packet, nil } } p.mu.RUnlock() time.Sleep(time.Millisecond * 2) } }