package main import ( "bufio" "fmt" "log" "sync" ) type CartPacketQueue struct { mu sync.RWMutex expectedPackages map[CartMessage]*CartListener } const CurrentPacketVersion = 2 type CartListener map[CartId]Listener func NewCartPacketQueue(connection *PersistentConnection) *CartPacketQueue { queue := &CartPacketQueue{ expectedPackages: make(map[CartMessage]*CartListener), } go queue.HandleConnection(connection) return queue } func (p *CartPacketQueue) RemoveListeners() { p.mu.Lock() defer p.mu.Unlock() for _, l := range p.expectedPackages { for _, l := range *l { close(l.Chan) } } p.expectedPackages = make(map[CartMessage]*CartListener) } func (p *CartPacketQueue) HandleConnection(connection *PersistentConnection) error { defer p.RemoveListeners() defer connection.Close() var packet CartPacket reader := bufio.NewReader(connection) for { err := ReadCartPacket(reader, &packet) if err != nil { log.Printf("Error receiving packet: %v\n", err) return connection.HandleConnectionError(err) } if packet.Version != CurrentPacketVersion { log.Printf("Incorrect version: %v\n", packet.Version) return connection.HandleConnectionError(fmt.Errorf("incorrect version: %d", packet.Version)) } if packet.DataLength == 0 { go p.HandleData(packet.MessageType, packet.Id, CallResult{ StatusCode: packet.StatusCode, Data: []byte{}, }) continue } data, err := GetPacketData(reader, packet.DataLength) if err != nil { log.Printf("Error receiving packet data: %v\n", err) return connection.HandleConnectionError(err) } go p.HandleData(packet.MessageType, packet.Id, CallResult{ StatusCode: packet.StatusCode, Data: data, }) } } func (p *CartPacketQueue) HandleData(t CartMessage, id CartId, data CallResult) { p.mu.Lock() defer p.mu.Unlock() pl, ok := p.expectedPackages[t] if ok { l, ok := (*pl)[id] if ok { l.Chan <- data l.Count-- if l.Count == 0 { close(l.Chan) delete(*pl, id) } } } } func (p *CartPacketQueue) Expect(messageType CartMessage, id CartId) <-chan CallResult { p.mu.Lock() defer p.mu.Unlock() l, ok := p.expectedPackages[messageType] if ok { if idl, idOk := (*l)[id]; idOk { idl.Count++ return idl.Chan } ch := make(chan CallResult) (*l)[id] = Listener{ Chan: ch, Count: 1, } return ch } ch := make(chan CallResult) p.expectedPackages[messageType] = &CartListener{ id: Listener{ Chan: ch, Count: 1, }, } return ch }