Files
go-cart-actor/cart-packet-queue.go
matst80 98d9b2451e
Some checks failed
Build and Publish / BuildAndDeployAmd64 (push) Failing after 27s
Build and Publish / BuildAndDeploy (push) Successful in 2m19s
use persistent connections
2024-11-12 17:09:46 +01:00

121 lines
2.5 KiB
Go

package main
import (
"bufio"
"fmt"
"log"
"sync"
"time"
)
type CartPacketQueue struct {
mu sync.RWMutex
expectedPackages map[uint32]*CartListener
}
const CurrentPacketVersion = 2
type CartListener map[CartId]Listener
func NewCartPacketQueue(connection *PersistentConnection) *CartPacketQueue {
queue := &CartPacketQueue{
expectedPackages: make(map[uint32]*CartListener),
}
go queue.HandleConnection(connection)
return queue
}
func (p *CartPacketQueue) RemoveListeners() {
p.mu.Lock()
defer p.mu.Unlock()
for _, l := range p.expectedPackages {
for _, l := range *l {
close(l.Chan)
}
}
p.expectedPackages = make(map[uint32]*CartListener)
}
func (p *CartPacketQueue) HandleConnection(connection *PersistentConnection) error {
defer p.RemoveListeners()
defer connection.Close()
var packet CartPacket
connection.SetReadDeadline(time.Now().Add(time.Millisecond * 200))
reader := bufio.NewReader(connection)
for {
err := ReadCartPacket(reader, &packet)
if err != nil {
log.Printf("Error receiving packet: %v\n", err)
return connection.HandleConnectionError(err)
}
if packet.Version != CurrentPacketVersion {
log.Printf("Incorrect version: %v\n", packet.Version)
return connection.HandleConnectionError(fmt.Errorf("incorrect version: %d", packet.Version))
}
if packet.DataLength == 0 {
go p.HandleData(packet.MessageType, packet.Id, CallResult{
StatusCode: packet.StatusCode,
Data: []byte{},
})
continue
}
data, err := GetPacketData(reader, packet.DataLength)
if err != nil {
log.Printf("Error receiving packet data: %v\n", err)
return connection.HandleConnectionError(err)
}
go p.HandleData(packet.MessageType, packet.Id, CallResult{
StatusCode: packet.StatusCode,
Data: data,
})
}
}
func (p *CartPacketQueue) HandleData(t uint32, id CartId, data CallResult) {
p.mu.Lock()
defer p.mu.Unlock()
pl, ok := p.expectedPackages[t]
if ok {
l, ok := (*pl)[id]
if ok {
l.Chan <- data
l.Count--
if l.Count == 0 {
close(l.Chan)
delete(*pl, id)
}
}
}
}
func (p *CartPacketQueue) Expect(messageType uint32, id CartId) <-chan CallResult {
p.mu.Lock()
defer p.mu.Unlock()
l, ok := p.expectedPackages[messageType]
if ok {
if idl, idOk := (*l)[id]; idOk {
idl.Count++
return idl.Chan
}
ch := make(chan CallResult)
(*l)[id] = Listener{
Chan: ch,
Count: 1,
}
return ch
}
ch := make(chan CallResult)
p.expectedPackages[messageType] = &CartListener{
id: Listener{
Chan: ch,
Count: 1,
},
}
return ch
}