package main import ( "bufio" "io" "log" "net" "sync" "time" ) type CartPacketQueue struct { mu sync.RWMutex expectedPackages map[uint32]*CartListener } const CurrentPacketVersion = 2 type CartListener map[CartId]Listener func NewCartPacketQueue(connection net.Conn) *CartPacketQueue { queue := &CartPacketQueue{ expectedPackages: make(map[uint32]*CartListener), } go queue.HandleConnection(connection) return queue } func (p *CartPacketQueue) RemoveListeners() { p.mu.Lock() defer p.mu.Unlock() for _, l := range p.expectedPackages { for _, l := range *l { close(l.Chan) } } p.expectedPackages = make(map[uint32]*CartListener) } func (p *CartPacketQueue) HandleConnection(connection net.Conn) error { defer p.RemoveListeners() defer connection.Close() var packet CartPacket connection.SetReadDeadline(time.Now().Add(time.Millisecond * 200)) reader := bufio.NewReader(connection) for { err := ReadCartPacket(reader, &packet) if err != nil { if err == io.EOF { return nil } log.Printf("Error receiving packet: %v\n", err) return err } if packet.Version != CurrentPacketVersion { log.Printf("Incorrect version: %v\n", packet.Version) return nil } if packet.DataLength == 0 { go p.HandleData(packet.MessageType, packet.Id, CallResult{ StatusCode: packet.StatusCode, Data: []byte{}, }) continue } data, err := GetPacketData(reader, packet.DataLength) if err != nil { log.Printf("Error receiving packet data: %v\n", err) return err } go p.HandleData(packet.MessageType, packet.Id, CallResult{ StatusCode: packet.StatusCode, Data: data, }) } } func (p *CartPacketQueue) HandleData(t uint32, id CartId, data CallResult) { p.mu.Lock() defer p.mu.Unlock() pl, ok := p.expectedPackages[t] if ok { l, ok := (*pl)[id] if ok { l.Chan <- data l.Count-- if l.Count == 0 { close(l.Chan) delete(*pl, id) } } } } func (p *CartPacketQueue) Expect(messageType uint32, id CartId) <-chan CallResult { p.mu.Lock() defer p.mu.Unlock() l, ok := p.expectedPackages[messageType] if ok { if idl, idOk := (*l)[id]; idOk { idl.Count++ return idl.Chan } ch := make(chan CallResult) (*l)[id] = Listener{ Chan: ch, Count: 1, } return ch } ch := make(chan CallResult) p.expectedPackages[messageType] = &CartListener{ id: Listener{ Chan: ch, Count: 1, }, } return ch } // package main // import ( // "fmt" // "io" // "log" // "net" // "sync" // "time" // ) // type CartPacketWithData struct { // MessageType uint32 // Id CartId // Added time.Time // Consumed bool // Data []byte // } // type CartPacketQueue struct { // mu sync.RWMutex // Packets []CartPacketWithData // //connection net.Conn // } // const cartCap = 150 // func NewCartPacketQueue(connection net.Conn) *CartPacketQueue { // queue := &CartPacketQueue{ // Packets: make([]CartPacketWithData, 0, cartCap), // //connection: connection, // } // go func() { // defer connection.Close() // var packet CartPacket // for { // err := ReadCartPacket(connection, &packet) // if err != nil { // if err == io.EOF { // return // } // log.Printf("Error receiving packet: %v\n", err) // //return // } // data, err := GetPacketData(connection, packet.DataLength) // if err != nil { // log.Printf("Error receiving packet data: %v\n", err) // return // } // go queue.HandleData(packet.MessageType, packet.Id, data) // } // }() // return queue // } // func (p *CartPacketQueue) HandleData(t uint32, id CartId, data []byte) { // ts := time.Now() // l := make([]CartPacketWithData, 0, cartCap) // p.mu.RLock() // breakAt := ts.Add(-time.Millisecond * 250) // for _, packet := range p.Packets { // if !packet.Consumed && packet.Added.After(breakAt) { // l = append(l, packet) // if len(l) >= cartCap { // break // } // } // } // p.mu.RUnlock() // p.mu.Lock() // p.Packets = append([]CartPacketWithData{ // { // MessageType: t, // Id: id, // Added: ts, // Data: data, // }, // }, l...) // p.mu.Unlock() // } // func (p *CartPacketQueue) Expect(messageType uint32, id CartId, timeToWait time.Duration) (*CartPacketWithData, error) { // start := time.Now().Add(-time.Millisecond) // for { // if time.Since(start) > timeToWait { // return nil, fmt.Errorf("timeout waiting for message type %d", messageType) // } // p.mu.RLock() // for _, packet := range p.Packets { // if !packet.Consumed && packet.MessageType == messageType && packet.Id == id && packet.Added.After(start) { // packet.Consumed = true // p.mu.RUnlock() // return &packet, nil // } // } // p.mu.RUnlock() // time.Sleep(time.Millisecond * 2) // } // }