package main import ( "io" "log" "net" "sync" ) // type PacketWithData struct { // MessageType uint32 // Added time.Time // Consumed bool // Data []byte // } type PacketQueue struct { mu sync.RWMutex expectedPackages map[uint32]*Listener //Packets []PacketWithData //connection net.Conn } //const cap = 150 type Listener struct { Count int Chan chan []byte } func NewPacketQueue(connection net.Conn) *PacketQueue { queue := &PacketQueue{ expectedPackages: make(map[uint32]*Listener), //Packets: make([]PacketWithData, 0, cap+1), //connection: connection, } go queue.HandleConnection(connection) return queue } func (p *PacketQueue) HandleConnection(connection net.Conn) error { defer connection.Close() var packet Packet for { err := ReadPacket(connection, &packet) if err != nil { if err == io.EOF { return nil } log.Printf("Error receiving packet: %v\n", err) return err } data, err := GetPacketData(connection, packet.DataLength) if err != nil { log.Printf("Error receiving packet data: %v\n", err) return err } go p.HandleData(packet.MessageType, data) } } func (p *PacketQueue) HandleData(t uint32, data []byte) { p.mu.Lock() defer p.mu.Unlock() l, ok := p.expectedPackages[t] if ok { l.Chan <- data l.Count-- if l.Count == 0 { close(l.Chan) delete(p.expectedPackages, t) } return } data = nil } func (p *PacketQueue) Expect(messageType uint32) <-chan []byte { p.mu.Lock() defer p.mu.Unlock() l, ok := p.expectedPackages[messageType] if ok { l.Count++ return l.Chan } ch := make(chan []byte) p.expectedPackages[messageType] = &Listener{ Count: 1, Chan: ch, } return ch }