craps
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type CartPacketQueue struct {
|
type CartPacketQueue struct {
|
||||||
@@ -70,14 +71,34 @@ func (p *CartPacketQueue) HandleConnection(connection *PersistentConnection) err
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *CartPacketQueue) HandleData(t CartMessage, id CartId, data CallResult) {
|
func (p *CartPacketQueue) HandleData(t CartMessage, id CartId, data CallResult) {
|
||||||
|
p.getListener(t, id, func(l *Listener) {
|
||||||
|
l.Chan <- data
|
||||||
|
l.Count--
|
||||||
|
})
|
||||||
|
// p.mu.Lock()
|
||||||
|
// defer p.mu.Unlock()
|
||||||
|
// pl, ok := p.expectedPackages[t]
|
||||||
|
// if ok {
|
||||||
|
// l, ok := (*pl)[id]
|
||||||
|
// if ok {
|
||||||
|
// l.Chan <- data
|
||||||
|
// l.Count--
|
||||||
|
// if l.Count == 0 {
|
||||||
|
// close(l.Chan)
|
||||||
|
// delete(*pl, id)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *CartPacketQueue) getListener(t CartMessage, id CartId, fn func(*Listener)) {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
pl, ok := p.expectedPackages[t]
|
pl, ok := p.expectedPackages[t]
|
||||||
if ok {
|
if ok {
|
||||||
l, ok := (*pl)[id]
|
l, ok := (*pl)[id]
|
||||||
if ok {
|
if ok {
|
||||||
l.Chan <- data
|
fn(&l)
|
||||||
l.Count--
|
|
||||||
if l.Count == 0 {
|
if l.Count == 0 {
|
||||||
close(l.Chan)
|
close(l.Chan)
|
||||||
delete(*pl, id)
|
delete(*pl, id)
|
||||||
@@ -86,6 +107,30 @@ func (p *CartPacketQueue) HandleData(t CartMessage, id CartId, data CallResult)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func CallResultWithTimeout(onTimeout func() CallResult) chan CallResult {
|
||||||
|
ch := make(chan CallResult, 1)
|
||||||
|
resultCh := make(chan CallResult, 1)
|
||||||
|
select {
|
||||||
|
case ret := <-resultCh:
|
||||||
|
ch <- ret
|
||||||
|
case <-time.After(300 * time.Millisecond):
|
||||||
|
ch <- onTimeout()
|
||||||
|
}
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *CartPacketQueue) MakeChannel(messageType CartMessage, id CartId) chan CallResult {
|
||||||
|
return CallResultWithTimeout(func() CallResult {
|
||||||
|
p.getListener(messageType, id, func(l *Listener) {
|
||||||
|
l.Count--
|
||||||
|
})
|
||||||
|
return CallResult{
|
||||||
|
StatusCode: 504,
|
||||||
|
Data: []byte("timeout cart call"),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (p *CartPacketQueue) Expect(messageType CartMessage, id CartId) <-chan CallResult {
|
func (p *CartPacketQueue) Expect(messageType CartMessage, id CartId) <-chan CallResult {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
@@ -95,15 +140,17 @@ func (p *CartPacketQueue) Expect(messageType CartMessage, id CartId) <-chan Call
|
|||||||
idl.Count++
|
idl.Count++
|
||||||
return idl.Chan
|
return idl.Chan
|
||||||
}
|
}
|
||||||
ch := make(chan CallResult)
|
ch := p.MakeChannel(messageType, id)
|
||||||
|
|
||||||
(*l)[id] = Listener{
|
(*l)[id] = Listener{
|
||||||
Chan: ch,
|
Chan: ch,
|
||||||
Count: 1,
|
Count: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
ch := make(chan CallResult)
|
ch := p.MakeChannel(messageType, id)
|
||||||
p.expectedPackages[messageType] = &CartListener{
|
p.expectedPackages[messageType] = &CartListener{
|
||||||
id: Listener{
|
id: Listener{
|
||||||
Chan: ch,
|
Chan: ch,
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PacketQueue struct {
|
type PacketQueue struct {
|
||||||
@@ -98,7 +99,19 @@ func (p *PacketQueue) Expect(messageType PoolMessage) <-chan CallResult {
|
|||||||
return l.Chan
|
return l.Chan
|
||||||
}
|
}
|
||||||
|
|
||||||
ch := make(chan CallResult)
|
ch := make(chan CallResult, 1)
|
||||||
|
go func() {
|
||||||
|
time.Sleep(time.Millisecond * 300)
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
|
ch <- CallResult{
|
||||||
|
StatusCode: 504,
|
||||||
|
Data: []byte("timeout cart call"),
|
||||||
|
}
|
||||||
|
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
p.expectedPackages[messageType] = &Listener{
|
p.expectedPackages[messageType] = &Listener{
|
||||||
Count: 1,
|
Count: 1,
|
||||||
Chan: ch,
|
Chan: ch,
|
||||||
|
|||||||
@@ -2,10 +2,8 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type CartClient struct {
|
type CartClient struct {
|
||||||
@@ -73,13 +71,9 @@ func (m *CartTCPClient) call(messageType CartMessage, id CartId, responseType Ca
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, m.PersistentConnection.HandleConnectionError(err)
|
return nil, m.PersistentConnection.HandleConnectionError(err)
|
||||||
}
|
}
|
||||||
select {
|
|
||||||
case ret := <-packetChan:
|
ret := <-packetChan
|
||||||
return &ret, nil
|
return &ret, nil
|
||||||
case <-time.After(time.Millisecond * 300):
|
|
||||||
log.Printf("Timeout waiting for cart response to message type %d\n", responseType)
|
|
||||||
return nil, m.PersistentConnection.HandleConnectionError(fmt.Errorf("timeout"))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func isRetirableError(err error) bool {
|
func isRetirableError(err error) bool {
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -139,11 +138,7 @@ func (m *TCPClient) Call(messageType PoolMessage, responseType PoolMessage, data
|
|||||||
return nil, m.PersistentConnection.HandleConnectionError(err)
|
return nil, m.PersistentConnection.HandleConnectionError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
ret := <-packetChan
|
||||||
case ret := <-packetChan:
|
return &ret, nil
|
||||||
return &ret, nil
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
log.Printf("Timeout waiting for cart response to message type %d\n", responseType)
|
|
||||||
return nil, m.PersistentConnection.HandleConnectionError(fmt.Errorf("timeout"))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
172
tcp-connection.go
Normal file
172
tcp-connection.go
Normal file
@@ -0,0 +1,172 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Connection struct {
|
||||||
|
address string
|
||||||
|
count uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type FrameType uint32
|
||||||
|
|
||||||
|
type Frame struct {
|
||||||
|
Id uint64
|
||||||
|
Type FrameType
|
||||||
|
StatusCode uint32
|
||||||
|
Length uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
type FrameWithPayload struct {
|
||||||
|
Frame
|
||||||
|
Payload []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type FrameData interface {
|
||||||
|
ToBytes() []byte
|
||||||
|
FromBytes([]byte) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConnection(address string) *Connection {
|
||||||
|
return &Connection{
|
||||||
|
count: 0,
|
||||||
|
address: address,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func SendFrame(conn net.Conn, data *FrameWithPayload) error {
|
||||||
|
_, err := conn.Write(header[:])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = binary.Write(conn, binary.LittleEndian, data.Frame)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = conn.Write(data.Payload)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Connection) CallAsync(msg FrameType, data FrameData, ch chan<- *FrameWithPayload) error {
|
||||||
|
conn, err := net.Dial("tcp", c.address)
|
||||||
|
go WaitForFrame(conn, ch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
payload := data.ToBytes()
|
||||||
|
toSend := &FrameWithPayload{
|
||||||
|
Frame: Frame{
|
||||||
|
Id: c.count,
|
||||||
|
Type: msg,
|
||||||
|
StatusCode: 0,
|
||||||
|
Length: uint32(len(payload)),
|
||||||
|
},
|
||||||
|
Payload: payload,
|
||||||
|
}
|
||||||
|
|
||||||
|
err = SendFrame(conn, toSend)
|
||||||
|
if err != nil {
|
||||||
|
close(ch)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.count++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Connection) Call(msg FrameType, data FrameData) (*FrameWithPayload, error) {
|
||||||
|
ch := make(chan *FrameWithPayload, 1)
|
||||||
|
c.CallAsync(msg, data, ch)
|
||||||
|
select {
|
||||||
|
case ret := <-ch:
|
||||||
|
return ret, nil
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
return nil, fmt.Errorf("timeout")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WaitForFrame(conn net.Conn, resultChan chan<- *FrameWithPayload) error {
|
||||||
|
defer conn.Close()
|
||||||
|
var err error
|
||||||
|
r := bufio.NewReader(conn)
|
||||||
|
h := make([]byte, 4)
|
||||||
|
r.Read(h)
|
||||||
|
if h[0] == header[0] && h[1] == header[1] && h[2] == header[2] && h[3] == header[3] {
|
||||||
|
frame := Frame{}
|
||||||
|
err = binary.Read(r, binary.LittleEndian, &frame)
|
||||||
|
payload := make([]byte, frame.Length)
|
||||||
|
_, err = r.Read(payload)
|
||||||
|
resultChan <- &FrameWithPayload{
|
||||||
|
Frame: frame,
|
||||||
|
Payload: payload,
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resultChan <- nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
type GenericListener struct {
|
||||||
|
Closed bool
|
||||||
|
handlers map[FrameType]func(*FrameWithPayload, chan<- *FrameWithPayload) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Connection) Listen() (*GenericListener, error) {
|
||||||
|
l, err := net.Listen("tcp", c.address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ret := &GenericListener{
|
||||||
|
handlers: make(map[FrameType]func(*FrameWithPayload, chan<- *FrameWithPayload) error),
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
for !ret.Closed {
|
||||||
|
connection, err := l.Accept()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error accepting connection: %v\n", err)
|
||||||
|
}
|
||||||
|
go ret.HandleConnection(connection)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return ret, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *GenericListener) HandleConnection(conn net.Conn) {
|
||||||
|
ch := make(chan *FrameWithPayload, 1)
|
||||||
|
go WaitForFrame(conn, ch)
|
||||||
|
select {
|
||||||
|
case frame := <-ch:
|
||||||
|
go l.HandleFrame(conn, frame)
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
close(ch)
|
||||||
|
log.Printf("Timeout waiting for frame\n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *GenericListener) AddHandler(msg FrameType, handler func(*FrameWithPayload, chan<- *FrameWithPayload) error) {
|
||||||
|
l.handlers[msg] = handler
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *GenericListener) HandleFrame(conn net.Conn, frame *FrameWithPayload) {
|
||||||
|
handler, ok := l.handlers[frame.Type]
|
||||||
|
defer conn.Close()
|
||||||
|
if ok {
|
||||||
|
go func() {
|
||||||
|
resultChan := make(chan *FrameWithPayload, 1)
|
||||||
|
defer close(resultChan)
|
||||||
|
err := handler(frame, resultChan)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error handling frame: %v\n", err)
|
||||||
|
}
|
||||||
|
SendFrame(conn, <-resultChan)
|
||||||
|
}()
|
||||||
|
} else {
|
||||||
|
log.Fatalf("No handler for frame type %d\n", frame.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
56
tcp-connection_test.go
Normal file
56
tcp-connection_test.go
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
type StringData string
|
||||||
|
|
||||||
|
func (s StringData) ToBytes() []byte {
|
||||||
|
return []byte(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s StringData) FromBytes(data []byte) error {
|
||||||
|
s = StringData(data)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGenericConnection(t *testing.T) {
|
||||||
|
conn := NewConnection("localhost:51337")
|
||||||
|
listener, err := conn.Listen()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error listening: %v\n", err)
|
||||||
|
}
|
||||||
|
listener.AddHandler(1, func(input *FrameWithPayload, resultChan chan<- *FrameWithPayload) error {
|
||||||
|
payload := []byte("Hello, world!")
|
||||||
|
resultChan <- &FrameWithPayload{
|
||||||
|
Frame: Frame{
|
||||||
|
Type: 2,
|
||||||
|
Id: input.Id,
|
||||||
|
StatusCode: 200,
|
||||||
|
Length: uint32(len("Hello, world!")),
|
||||||
|
},
|
||||||
|
Payload: payload,
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
r, err := conn.Call(1, StringData("Hello, world!"))
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error calling: %v\n", err)
|
||||||
|
}
|
||||||
|
if r.Type != 2 {
|
||||||
|
t.Errorf("Expected type 2, got %d\n", r.Type)
|
||||||
|
}
|
||||||
|
i := 100
|
||||||
|
results := make(chan *FrameWithPayload, i)
|
||||||
|
for i > 0 {
|
||||||
|
conn.CallAsync(1, StringData("Hello, world!"), results)
|
||||||
|
i--
|
||||||
|
}
|
||||||
|
for i < 100 {
|
||||||
|
r := <-results
|
||||||
|
if r.Type != 2 {
|
||||||
|
t.Errorf("Expected type 2, got %d\n", r.Type)
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user