From c70c5cd930a28b75f88394ce324a9f606069df0b Mon Sep 17 00:00:00 2001 From: matst80 Date: Sun, 10 Nov 2024 21:43:40 +0100 Subject: [PATCH] maybe --- cart-packet-queue.go | 6 +-- main.go | 12 +++--- packet-queue.go | 4 +- packet.go | 98 +++++++++--------------------------------- rpc-pool.go | 2 +- rpc-server.go | 12 +++--- synced-pool.go | 39 +++++++---------- tcp-cart-client.go | 6 +-- tcp-cart-mux-server.go | 24 +++++------ tcp-client.go | 10 ++--- tcp-mux-server.go | 18 ++++---- tcp-mux_test.go | 6 +-- 12 files changed, 84 insertions(+), 153 deletions(-) diff --git a/cart-packet-queue.go b/cart-packet-queue.go index 592b70e..3e3f061 100644 --- a/cart-packet-queue.go +++ b/cart-packet-queue.go @@ -10,7 +10,7 @@ import ( ) type CartPacketWithData struct { - MessageType uint16 + MessageType uint32 Id CartId Added time.Time Consumed bool @@ -33,7 +33,7 @@ func NewCartPacketQueue(connection net.Conn) *CartPacketQueue { defer connection.Close() var packet CartPacket for { - err := ReadPacket(connection, &packet) + err := ReadCartPacket(connection, &packet) ts := time.Now() if err != nil { @@ -73,7 +73,7 @@ func NewCartPacketQueue(connection net.Conn) *CartPacketQueue { return queue } -func (p *CartPacketQueue) Expect(messageType uint16, id CartId, timeToWait time.Duration) (*CartPacketWithData, error) { +func (p *CartPacketQueue) Expect(messageType uint32, id CartId, timeToWait time.Duration) (*CartPacketWithData, error) { start := time.Now().Add(-time.Millisecond) for { diff --git a/main.go b/main.go index 09e8803..677b8d1 100644 --- a/main.go +++ b/main.go @@ -101,7 +101,7 @@ func main() { var config *rest.Config var kerr error if podIp == "" { - config, kerr = clientcmd.BuildConfigFromFlags("", "/Users/mats/.kube/config") + config, kerr = clientcmd.BuildConfigFromFlags("", "/home/mats/.kube/config") } else { config, kerr = rest.InClusterConfig() } @@ -146,11 +146,11 @@ func main() { // }) // mux.HandleFunc("GET /save", app.HandleSave) - mux.HandleFunc("/debug/pprof/", pprof.Index) - mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - mux.HandleFunc("/debug/pprof/profile", pprof.Profile) - mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + mux.HandleFunc("/pprof/", pprof.Index) + mux.HandleFunc("/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/pprof/profile", pprof.Profile) + mux.HandleFunc("/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/pprof/trace", pprof.Trace) mux.Handle("/metrics", promhttp.Handler()) sigs := make(chan os.Signal, 1) diff --git a/packet-queue.go b/packet-queue.go index 2086def..2faad1d 100644 --- a/packet-queue.go +++ b/packet-queue.go @@ -10,7 +10,7 @@ import ( ) type PacketWithData struct { - MessageType uint16 + MessageType uint32 Added time.Time Consumed bool Data []byte @@ -69,7 +69,7 @@ func NewPacketQueue(connection net.Conn) *PacketQueue { return queue } -func (p *PacketQueue) Expect(messageType uint16, timeToWait time.Duration) (*PacketWithData, error) { +func (p *PacketQueue) Expect(messageType uint32, timeToWait time.Duration) (*PacketWithData, error) { start := time.Now().Add(-time.Millisecond) for { diff --git a/packet.go b/packet.go index d80625d..b860db2 100644 --- a/packet.go +++ b/packet.go @@ -2,112 +2,54 @@ package main import ( "encoding/binary" - "encoding/json" - "fmt" "io" ) const ( - RemoteGetState = uint16(0x01) - RemoteHandleMessage = uint16(0x02) - ResponseBody = uint16(0x03) - RemoteGetStateReply = uint16(0x04) - RemoteHandleMessageReply = uint16(0x05) + RemoteGetState = uint32(0x01) + RemoteHandleMutation = uint32(0x02) + ResponseBody = uint32(0x03) + RemoteGetStateReply = uint32(0x04) + RemoteHandleMutationReply = uint32(0x05) ) type CartPacket struct { - Version uint16 - MessageType uint16 - DataLength uint16 + Version uint32 + MessageType uint32 + DataLength uint64 Id CartId } type Packet struct { - Version uint16 - MessageType uint16 - DataLength uint16 + Version uint32 + MessageType uint32 + DataLength uint64 } -func SendCartPacket(conn io.Writer, id CartId, messageType uint16, datafn func(w io.Writer) error) error { - data, err := GetData(datafn) - if err != nil { - return err - } - if conn == nil { - return fmt.Errorf("no connection to send to") - } - binary.Write(conn, binary.LittleEndian, CartPacket{ - Version: 2, - MessageType: messageType, - DataLength: uint16(len(data)), - Id: id, - }) - _, err = conn.Write(data) - return err -} - -func SendPacket(conn io.Writer, messageType uint16, datafn func(w io.Writer) error) error { - data, err := GetData(datafn) - if err != nil { - return err - } - err = binary.Write(conn, binary.LittleEndian, Packet{ - Version: 1, - MessageType: messageType, - DataLength: uint16(len(data)), - }) - if err != nil { - return err - } - packetsSent.Inc() - _, err = conn.Write(data) - return err -} - -func SendRawResponse(conn io.Writer, data []byte) error { - err := binary.Write(conn, binary.LittleEndian, Packet{ - Version: 1, - MessageType: ResponseBody, - DataLength: uint16(len(data)), - }) - if err != nil { - return err - } - _, err = conn.Write(data) - return err -} - -func SendProxyResponse(conn io.Writer, data any) error { - return SendPacket(conn, ResponseBody, func(w io.Writer) error { - data, err := json.Marshal(data) - if err != nil { - return err - } - w.Write(data) - return nil - }) -} - -func ReadPacket[V Packet | CartPacket](conn io.Reader, packet *V) error { +func ReadPacket(conn io.Reader, packet *Packet) error { return binary.Read(conn, binary.LittleEndian, packet) } -func GetPacketData(conn io.Reader, len uint16) ([]byte, error) { +func ReadCartPacket(conn io.Reader, packet *CartPacket) error { + return binary.Read(conn, binary.LittleEndian, packet) +} + +func GetPacketData(conn io.Reader, len uint64) ([]byte, error) { data := make([]byte, len) _, err := conn.Read(data) return data, err } -func ReceivePacket(conn io.Reader) (uint16, []byte, error) { +func ReceivePacket(conn io.Reader) (uint32, []byte, error) { var packet Packet err := ReadPacket(conn, &packet) if err != nil { - return packet.MessageType, nil, err + return 0, nil, err } data, err := GetPacketData(conn, packet.DataLength) if err != nil { - return packet.MessageType, nil, err + return 0, nil, err } return packet.MessageType, data, nil } diff --git a/rpc-pool.go b/rpc-pool.go index f01764b..bd7bc46 100644 --- a/rpc-pool.go +++ b/rpc-pool.go @@ -47,7 +47,7 @@ func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, er if err != nil { return nil, err } - reply, err := g.Call(RemoteHandleMessage, g.Id, RemoteHandleMessageReply, data) + reply, err := g.Call(RemoteHandleMutation, g.Id, RemoteHandleMutationReply, data) if err != nil { return nil, err diff --git a/rpc-server.go b/rpc-server.go index 8b881c6..4e87f43 100644 --- a/rpc-server.go +++ b/rpc-server.go @@ -25,29 +25,29 @@ func NewGrainHandler(pool *GrainLocalPool, listen string) (*GrainHandler, error) CartServer: server, pool: pool, } - server.HandleCall(RemoteHandleMessage, handler.RemoteHandleMessageHandler) + server.HandleCall(RemoteHandleMutation, handler.RemoteHandleMessageHandler) server.HandleCall(RemoteGetState, handler.RemoteGetStateHandler) return handler, err } -func (h *GrainHandler) RemoteHandleMessageHandler(id CartId, data []byte) (uint16, []byte, error) { +func (h *GrainHandler) RemoteHandleMessageHandler(id CartId, data []byte) (uint32, []byte, error) { var msg Message err := ReadMessage(bytes.NewReader(data), &msg) if err != nil { fmt.Println("Error reading message:", err) - return RemoteHandleMessageReply, nil, err + return RemoteHandleMutationReply, nil, err } replyData, err := h.pool.Process(id, msg) if err != nil { fmt.Println("Error handling message:", err) } if err != nil { - return RemoteHandleMessageReply, nil, err + return RemoteHandleMutationReply, nil, err } - return RemoteHandleMessageReply, replyData, nil + return RemoteHandleMutationReply, replyData, nil } -func (h *GrainHandler) RemoteGetStateHandler(id CartId, data []byte) (uint16, []byte, error) { +func (h *GrainHandler) RemoteGetStateHandler(id CartId, data []byte) (uint32, []byte, error) { reply, err := h.pool.Get(id) if err != nil { return RemoteGetStateReply, nil, err diff --git a/synced-pool.go b/synced-pool.go index cdbffde..b87dc92 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -64,11 +64,11 @@ var ( }) ) -func (p *SyncedPool) PongHandler(data []byte) (uint16, []byte, error) { +func (p *SyncedPool) PongHandler(data []byte) (uint32, []byte, error) { return Pong, data, nil } -func (p *SyncedPool) GetCartIdHandler(data []byte) (uint16, []byte, error) { +func (p *SyncedPool) GetCartIdHandler(data []byte) (uint32, []byte, error) { ids := make([]string, 0, len(p.local.grains)) for id := range p.local.grains { ids = append(ids, id.String()) @@ -76,7 +76,7 @@ func (p *SyncedPool) GetCartIdHandler(data []byte) (uint16, []byte, error) { return CartIdsResponse, []byte(strings.Join(ids, ";")), nil } -func (p *SyncedPool) NegotiateHandler(data []byte) (uint16, []byte, error) { +func (p *SyncedPool) NegotiateHandler(data []byte) (uint32, []byte, error) { negotiationCount.Inc() log.Printf("Handling negotiation\n") for _, host := range p.ExcludeKnown(strings.Split(string(data), ";")) { @@ -89,7 +89,7 @@ func (p *SyncedPool) NegotiateHandler(data []byte) (uint16, []byte, error) { return RemoteNegotiateResponse, []byte("ok"), nil } -func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint16, []byte, error) { +func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint32, []byte, error) { grainSyncCount.Inc() idAndHostParts := strings.Split(string(data), ";") @@ -241,15 +241,15 @@ func (p *SyncedPool) RemoveHostMappedCarts(host *RemoteHost) { } const ( - RemoteNegotiate = uint16(3) - RemoteGrainChanged = uint16(4) - AckChange = uint16(5) - //AckError = uint16(6) - Ping = uint16(7) - Pong = uint16(8) - GetCartIds = uint16(9) - CartIdsResponse = uint16(10) - RemoteNegotiateResponse = uint16(11) + RemoteNegotiate = uint32(3) + RemoteGrainChanged = uint32(4) + AckChange = uint32(5) + //AckError = uint32(6) + Ping = uint32(7) + Pong = uint32(8) + GetCartIds = uint32(9) + CartIdsResponse = uint32(10) + RemoteNegotiateResponse = uint32(11) ) func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) { @@ -430,17 +430,8 @@ func (p *SyncedPool) getGrain(id CartId) (Grain, error) { remoteGrain, ok := p.remoteIndex[id] p.mu.RUnlock() if ok { - if remoteGrain == nil { - // p.remoteIndex[id].Delete(id) - p.mu.Lock() - delete(p.remoteIndex, id) - p.mu.Unlock() - log.Printf("Remote grain %s is nil\n", id) - //return nil, fmt.Errorf("remote pool is nil for %v", id) - } else { - remoteLookupCount.Inc() - return remoteGrain, nil - } + remoteLookupCount.Inc() + return remoteGrain, nil } err := p.RequestOwnership(id) diff --git a/tcp-cart-client.go b/tcp-cart-client.go index c2f13a2..246d2cd 100644 --- a/tcp-cart-client.go +++ b/tcp-cart-client.go @@ -64,7 +64,7 @@ func (m *CartTCPClient) Connect() error { return nil } -func (m *CartTCPClient) SendPacket(messageType uint16, id CartId, data []byte) error { +func (m *CartTCPClient) SendPacket(messageType uint32, id CartId, data []byte) error { err := m.Connect() if err != nil { return err @@ -72,7 +72,7 @@ func (m *CartTCPClient) SendPacket(messageType uint16, id CartId, data []byte) e err = binary.Write(m.Conn, binary.LittleEndian, CartPacket{ Version: 1, MessageType: messageType, - DataLength: uint16(len(data)), + DataLength: uint64(len(data)), Id: id, }) if err != nil { @@ -90,7 +90,7 @@ func (m *CartTCPClient) SendPacket(messageType uint16, id CartId, data []byte) e // return m.SendPacket(messageType, id, data) // } -func (m *CartTCPClient) Call(messageType uint16, id CartId, responseType uint16, data []byte) ([]byte, error) { +func (m *CartTCPClient) Call(messageType uint32, id CartId, responseType uint32, data []byte) ([]byte, error) { err := m.SendPacket(messageType, id, data) if err != nil { return nil, err diff --git a/tcp-cart-mux-server.go b/tcp-cart-mux-server.go index c84774c..b1ab0da 100644 --- a/tcp-cart-mux-server.go +++ b/tcp-cart-mux-server.go @@ -36,21 +36,21 @@ func CartListen(address string) (*CartServer, error) { type TCPCartServerMux struct { mu sync.RWMutex - listeners map[uint16]func(CartId, []byte) error - functions map[uint16]func(CartId, []byte) (uint16, []byte, error) + listeners map[uint32]func(CartId, []byte) error + functions map[uint32]func(CartId, []byte) (uint32, []byte, error) } func NewCartTCPServerMux(maxClients int) *TCPCartServerMux { m := &TCPCartServerMux{ mu: sync.RWMutex{}, - listeners: make(map[uint16]func(CartId, []byte) error), - functions: make(map[uint16]func(CartId, []byte) (uint16, []byte, error)), + listeners: make(map[uint32]func(CartId, []byte) error), + functions: make(map[uint32]func(CartId, []byte) (uint32, []byte, error)), } return m } -func (m *TCPCartServerMux) handleListener(messageType uint16, id CartId, data []byte) (bool, error) { +func (m *TCPCartServerMux) handleListener(messageType uint32, id CartId, data []byte) (bool, error) { m.mu.RLock() handler, ok := m.listeners[messageType] m.mu.RUnlock() @@ -63,19 +63,19 @@ func (m *TCPCartServerMux) handleListener(messageType uint16, id CartId, data [] return false, nil } -func (m *TCPCartServerMux) handleFunction(connection net.Conn, messageType uint16, id CartId, data []byte) (bool, error) { +func (m *TCPCartServerMux) handleFunction(connection net.Conn, messageType uint32, id CartId, data []byte) (bool, error) { m.mu.RLock() - function, ok := m.functions[messageType] + fn, ok := m.functions[messageType] m.mu.RUnlock() if ok { - responseType, responseData, err := function(id, data) + responseType, responseData, err := fn(id, data) if err != nil { return true, err } err = binary.Write(connection, binary.LittleEndian, CartPacket{ Version: 1, MessageType: responseType, - DataLength: uint16(len(responseData)), + DataLength: uint64(len(responseData)), Id: id, }) if err != nil { @@ -93,7 +93,7 @@ func (m *TCPCartServerMux) HandleConnection(connection net.Conn) error { var err error defer connection.Close() for { - err = ReadPacket(connection, &packet) + err = ReadCartPacket(connection, &packet) if err != nil { if err == io.EOF { return nil @@ -122,13 +122,13 @@ func (m *TCPCartServerMux) HandleConnection(connection net.Conn) error { } } -func (m *TCPCartServerMux) ListenFor(messageType uint16, handler func(CartId, []byte) error) { +func (m *TCPCartServerMux) ListenFor(messageType uint32, handler func(CartId, []byte) error) { m.mu.Lock() m.listeners[messageType] = handler m.mu.Unlock() } -func (m *TCPCartServerMux) HandleCall(messageType uint16, handler func(CartId, []byte) (uint16, []byte, error)) { +func (m *TCPCartServerMux) HandleCall(messageType uint32, handler func(CartId, []byte) (uint32, []byte, error)) { m.mu.Lock() m.functions[messageType] = handler m.mu.Unlock() diff --git a/tcp-client.go b/tcp-client.go index 4ab01c6..8b253f2 100644 --- a/tcp-client.go +++ b/tcp-client.go @@ -58,10 +58,8 @@ func (m *TCPClient) Connect() error { func (m *TCPClient) HandleConnectionError(err error) error { if err != nil { - m.Errors <- err m.ErrorCount++ - } return err } @@ -70,7 +68,7 @@ func (m *TCPClient) Close() { m.Conn.Close() } -func (m *TCPClient) SendPacket(messageType uint16, data []byte) error { +func (m *TCPClient) SendPacket(messageType uint32, data []byte) error { err := m.Connect() if err != nil { return err @@ -78,7 +76,7 @@ func (m *TCPClient) SendPacket(messageType uint16, data []byte) error { err = binary.Write(m.Conn, binary.LittleEndian, Packet{ Version: 1, MessageType: messageType, - DataLength: uint16(len(data)), + DataLength: uint64(len(data)), }) if err != nil { return m.HandleConnectionError(err) @@ -87,7 +85,7 @@ func (m *TCPClient) SendPacket(messageType uint16, data []byte) error { return m.HandleConnectionError(err) } -// func (m *TCPClient) SendPacketFn(messageType uint16, datafn func(w io.Writer) error) error { +// func (m *TCPClient) SendPacketFn(messageType uint32, datafn func(w io.Writer) error) error { // data, err := GetData(datafn) // if err != nil { // return err @@ -95,7 +93,7 @@ func (m *TCPClient) SendPacket(messageType uint16, data []byte) error { // return m.SendPacket(messageType, data) // } -func (m *TCPClient) Call(messageType uint16, responseType uint16, data []byte) ([]byte, error) { +func (m *TCPClient) Call(messageType uint32, responseType uint32, data []byte) ([]byte, error) { err := m.SendPacket(messageType, data) if err != nil { return nil, err diff --git a/tcp-mux-server.go b/tcp-mux-server.go index ac8ccbc..0130034 100644 --- a/tcp-mux-server.go +++ b/tcp-mux-server.go @@ -36,21 +36,21 @@ func Listen(address string) (*Server, error) { type TCPServerMux struct { mu sync.RWMutex - listeners map[uint16]func(data []byte) error - functions map[uint16]func(data []byte) (uint16, []byte, error) + listeners map[uint32]func(data []byte) error + functions map[uint32]func(data []byte) (uint32, []byte, error) } func NewTCPServerMux(maxClients int) *TCPServerMux { m := &TCPServerMux{ mu: sync.RWMutex{}, - listeners: make(map[uint16]func(data []byte) error), - functions: make(map[uint16]func(data []byte) (uint16, []byte, error)), + listeners: make(map[uint32]func(data []byte) error), + functions: make(map[uint32]func(data []byte) (uint32, []byte, error)), } return m } -func (m *TCPServerMux) handleListener(messageType uint16, data []byte) (bool, error) { +func (m *TCPServerMux) handleListener(messageType uint32, data []byte) (bool, error) { m.mu.RLock() handler, ok := m.listeners[messageType] m.mu.RUnlock() @@ -63,7 +63,7 @@ func (m *TCPServerMux) handleListener(messageType uint16, data []byte) (bool, er return false, nil } -func (m *TCPServerMux) handleFunction(connection net.Conn, messageType uint16, data []byte) (bool, error) { +func (m *TCPServerMux) handleFunction(connection net.Conn, messageType uint32, data []byte) (bool, error) { m.mu.RLock() function, ok := m.functions[messageType] m.mu.RUnlock() @@ -75,7 +75,7 @@ func (m *TCPServerMux) handleFunction(connection net.Conn, messageType uint16, d err = binary.Write(connection, binary.LittleEndian, Packet{ Version: 1, MessageType: responseType, - DataLength: uint16(len(responseData)), + DataLength: uint64(len(responseData)), }) if err != nil { return true, err @@ -116,13 +116,13 @@ func (m *TCPServerMux) HandleConnection(connection net.Conn) error { } } -func (m *TCPServerMux) ListenFor(messageType uint16, handler func(data []byte) error) { +func (m *TCPServerMux) ListenFor(messageType uint32, handler func(data []byte) error) { m.mu.Lock() m.listeners[messageType] = handler m.mu.Unlock() } -func (m *TCPServerMux) HandleCall(messageType uint16, handler func(data []byte) (uint16, []byte, error)) { +func (m *TCPServerMux) HandleCall(messageType uint32, handler func(data []byte) (uint32, []byte, error)) { m.mu.Lock() m.functions[messageType] = handler m.mu.Unlock() diff --git a/tcp-mux_test.go b/tcp-mux_test.go index e58a600..98da91f 100644 --- a/tcp-mux_test.go +++ b/tcp-mux_test.go @@ -7,11 +7,11 @@ import ( func TestTcpHelpers(t *testing.T) { - server, err := Listen(":1337") + server, err := Listen("localhost:51337") if err != nil { t.Errorf("Error listening: %v\n", err) } - client, err := Dial("localhost:1337") + client, err := Dial("localhost:51337") if err != nil { t.Errorf("Error dialing: %v\n", err) } @@ -21,7 +21,7 @@ func TestTcpHelpers(t *testing.T) { messageData = string(data) return nil }) - server.HandleCall(2, func(data []byte) (uint16, []byte, error) { + server.HandleCall(2, func(data []byte) (uint32, []byte, error) { log.Printf("Received call: %s\n", string(data)) return 3, []byte("Hello, client!"), nil })