Files
go-cart-actor/cart-packet-queue.go
matst80 0b290a32bf
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 2m2s
implement statuscode in packets
2024-11-11 23:24:03 +01:00

212 lines
4.4 KiB
Go

package main
import (
"io"
"log"
"net"
"sync"
)
type CartPacketQueue struct {
mu sync.RWMutex
expectedPackages map[uint32]*CartListener
}
const CurrentPacketVersion = 2
type CartListener map[CartId]Listener
func NewCartPacketQueue(connection net.Conn) *CartPacketQueue {
queue := &CartPacketQueue{
expectedPackages: make(map[uint32]*CartListener),
}
go queue.HandleConnection(connection)
return queue
}
func (p *CartPacketQueue) HandleConnection(connection net.Conn) error {
defer connection.Close()
var packet CartPacket
for {
err := ReadCartPacket(connection, &packet)
if err != nil {
if err == io.EOF {
return nil
}
log.Printf("Error receiving packet: %v\n", err)
return err
}
if packet.Version != CurrentPacketVersion {
log.Printf("Error receiving packet: %v\n", err)
continue
}
if packet.DataLength == 0 {
go p.HandleData(packet.MessageType, packet.Id, CallResult{
StatusCode: packet.StatusCode,
Data: []byte{},
})
continue
}
data, err := GetPacketData(connection, packet.DataLength)
if err != nil {
log.Printf("Error receiving packet data: %v\n", err)
return err
}
go p.HandleData(packet.MessageType, packet.Id, CallResult{
StatusCode: packet.StatusCode,
Data: data,
})
}
}
func (p *CartPacketQueue) HandleData(t uint32, id CartId, data CallResult) {
p.mu.Lock()
defer p.mu.Unlock()
pl, ok := p.expectedPackages[t]
if ok {
l, ok := (*pl)[id]
if ok {
l.Chan <- data
l.Count--
if l.Count == 0 {
close(l.Chan)
delete(*pl, id)
}
}
}
}
func (p *CartPacketQueue) Expect(messageType uint32, id CartId) <-chan CallResult {
p.mu.Lock()
defer p.mu.Unlock()
l, ok := p.expectedPackages[messageType]
if ok {
if idl, idOk := (*l)[id]; idOk {
idl.Count++
return idl.Chan
}
ch := make(chan CallResult)
(*l)[id] = Listener{
Chan: ch,
Count: 1,
}
return ch
}
ch := make(chan CallResult)
p.expectedPackages[messageType] = &CartListener{
id: Listener{
Chan: ch,
Count: 1,
},
}
return ch
}
// package main
// import (
// "fmt"
// "io"
// "log"
// "net"
// "sync"
// "time"
// )
// type CartPacketWithData struct {
// MessageType uint32
// Id CartId
// Added time.Time
// Consumed bool
// Data []byte
// }
// type CartPacketQueue struct {
// mu sync.RWMutex
// Packets []CartPacketWithData
// //connection net.Conn
// }
// const cartCap = 150
// func NewCartPacketQueue(connection net.Conn) *CartPacketQueue {
// queue := &CartPacketQueue{
// Packets: make([]CartPacketWithData, 0, cartCap),
// //connection: connection,
// }
// go func() {
// defer connection.Close()
// var packet CartPacket
// for {
// err := ReadCartPacket(connection, &packet)
// if err != nil {
// if err == io.EOF {
// return
// }
// log.Printf("Error receiving packet: %v\n", err)
// //return
// }
// data, err := GetPacketData(connection, packet.DataLength)
// if err != nil {
// log.Printf("Error receiving packet data: %v\n", err)
// return
// }
// go queue.HandleData(packet.MessageType, packet.Id, data)
// }
// }()
// return queue
// }
// func (p *CartPacketQueue) HandleData(t uint32, id CartId, data []byte) {
// ts := time.Now()
// l := make([]CartPacketWithData, 0, cartCap)
// p.mu.RLock()
// breakAt := ts.Add(-time.Millisecond * 250)
// for _, packet := range p.Packets {
// if !packet.Consumed && packet.Added.After(breakAt) {
// l = append(l, packet)
// if len(l) >= cartCap {
// break
// }
// }
// }
// p.mu.RUnlock()
// p.mu.Lock()
// p.Packets = append([]CartPacketWithData{
// {
// MessageType: t,
// Id: id,
// Added: ts,
// Data: data,
// },
// }, l...)
// p.mu.Unlock()
// }
// func (p *CartPacketQueue) Expect(messageType uint32, id CartId, timeToWait time.Duration) (*CartPacketWithData, error) {
// start := time.Now().Add(-time.Millisecond)
// for {
// if time.Since(start) > timeToWait {
// return nil, fmt.Errorf("timeout waiting for message type %d", messageType)
// }
// p.mu.RLock()
// for _, packet := range p.Packets {
// if !packet.Consumed && packet.MessageType == messageType && packet.Id == id && packet.Added.After(start) {
// packet.Consumed = true
// p.mu.RUnlock()
// return &packet, nil
// }
// }
// p.mu.RUnlock()
// time.Sleep(time.Millisecond * 2)
// }
// }