package main import ( "encoding/binary" "fmt" "log" "net" "sync" "time" ) type Client struct { *TCPClient } func Dial(address string) (*Client, error) { mux, err := NewTCPClient(address) if err != nil { return nil, err } client := &Client{ TCPClient: mux, } return client, nil } type TCPClient struct { PersistentConnection *PersistentConnection sendMux sync.Mutex ErrorCount int address string *PacketQueue } type PersistentConnection struct { net.Conn Died chan bool Dead bool address string } func NewPersistentConnection(address string) (*PersistentConnection, error) { connection, err := net.Dial("tcp", address) if err != nil { return nil, err } return &PersistentConnection{ Conn: connection, Died: make(chan bool, 1), Dead: false, address: address, }, nil } func (m *PersistentConnection) Connect() error { if !m.Dead { connection, err := net.Dial("tcp", m.address) if err != nil { log.Printf("Error connecting to %s: %v\n", m.address, err) m.Died <- true m.Dead = true return err } m.Conn = connection } return nil } func (m *PersistentConnection) Close() { m.Conn.Close() m.Died <- true m.Dead = true } func (m *PersistentConnection) HandleConnectionError(err error) error { if err != nil { log.Printf("Error from to %s: %v\n", m.address, err) m.Conn.Close() m.Connect() } return err } func NewTCPClient(address string) (*TCPClient, error) { connection, err := NewPersistentConnection(address) if err != nil { return nil, err } return &TCPClient{ ErrorCount: 0, PersistentConnection: connection, address: address, PacketQueue: NewPacketQueue(connection), }, nil } type PacketHeader [4]byte var ( header = PacketHeader([4]byte{0x01, 0x02, 0x03, 0x04}) ) func (m *TCPClient) SendPacket(messageType PoolMessage, data []byte) error { m.sendMux.Lock() defer m.sendMux.Unlock() m.PersistentConnection.Write(header[:]) err := binary.Write(m.PersistentConnection, binary.LittleEndian, Packet{ Version: CurrentPacketVersion, MessageType: messageType, StatusCode: 0, DataLength: uint32(len(data)), }) if err != nil { return m.PersistentConnection.HandleConnectionError(err) } _, err = m.PersistentConnection.Write(data) return m.PersistentConnection.HandleConnectionError(err) } func (m *TCPClient) Call(messageType PoolMessage, responseType PoolMessage, data []byte) (*CallResult, error) { packetChan := m.Expect(responseType) err := m.SendPacket(messageType, data) if err != nil { m.RemoveListeners() return nil, m.PersistentConnection.HandleConnectionError(err) } select { case ret := <-packetChan: return &ret, nil case <-time.After(time.Second): log.Printf("Timeout waiting for cart response to message type %d\n", responseType) return nil, m.PersistentConnection.HandleConnectionError(fmt.Errorf("timeout")) } }