maybe
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m58s

This commit is contained in:
matst80
2024-11-10 21:43:40 +01:00
parent 0f3b22e8da
commit c70c5cd930
12 changed files with 84 additions and 153 deletions

View File

@@ -10,7 +10,7 @@ import (
)
type CartPacketWithData struct {
MessageType uint16
MessageType uint32
Id CartId
Added time.Time
Consumed bool
@@ -33,7 +33,7 @@ func NewCartPacketQueue(connection net.Conn) *CartPacketQueue {
defer connection.Close()
var packet CartPacket
for {
err := ReadPacket(connection, &packet)
err := ReadCartPacket(connection, &packet)
ts := time.Now()
if err != nil {
@@ -73,7 +73,7 @@ func NewCartPacketQueue(connection net.Conn) *CartPacketQueue {
return queue
}
func (p *CartPacketQueue) Expect(messageType uint16, id CartId, timeToWait time.Duration) (*CartPacketWithData, error) {
func (p *CartPacketQueue) Expect(messageType uint32, id CartId, timeToWait time.Duration) (*CartPacketWithData, error) {
start := time.Now().Add(-time.Millisecond)
for {

12
main.go
View File

@@ -101,7 +101,7 @@ func main() {
var config *rest.Config
var kerr error
if podIp == "" {
config, kerr = clientcmd.BuildConfigFromFlags("", "/Users/mats/.kube/config")
config, kerr = clientcmd.BuildConfigFromFlags("", "/home/mats/.kube/config")
} else {
config, kerr = rest.InClusterConfig()
}
@@ -146,11 +146,11 @@ func main() {
// })
// mux.HandleFunc("GET /save", app.HandleSave)
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.HandleFunc("/pprof/", pprof.Index)
mux.HandleFunc("/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/pprof/profile", pprof.Profile)
mux.HandleFunc("/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/pprof/trace", pprof.Trace)
mux.Handle("/metrics", promhttp.Handler())
sigs := make(chan os.Signal, 1)

View File

@@ -10,7 +10,7 @@ import (
)
type PacketWithData struct {
MessageType uint16
MessageType uint32
Added time.Time
Consumed bool
Data []byte
@@ -69,7 +69,7 @@ func NewPacketQueue(connection net.Conn) *PacketQueue {
return queue
}
func (p *PacketQueue) Expect(messageType uint16, timeToWait time.Duration) (*PacketWithData, error) {
func (p *PacketQueue) Expect(messageType uint32, timeToWait time.Duration) (*PacketWithData, error) {
start := time.Now().Add(-time.Millisecond)
for {

View File

@@ -2,112 +2,54 @@ package main
import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
)
const (
RemoteGetState = uint16(0x01)
RemoteHandleMessage = uint16(0x02)
ResponseBody = uint16(0x03)
RemoteGetStateReply = uint16(0x04)
RemoteHandleMessageReply = uint16(0x05)
RemoteGetState = uint32(0x01)
RemoteHandleMutation = uint32(0x02)
ResponseBody = uint32(0x03)
RemoteGetStateReply = uint32(0x04)
RemoteHandleMutationReply = uint32(0x05)
)
type CartPacket struct {
Version uint16
MessageType uint16
DataLength uint16
Version uint32
MessageType uint32
DataLength uint64
Id CartId
}
type Packet struct {
Version uint16
MessageType uint16
DataLength uint16
Version uint32
MessageType uint32
DataLength uint64
}
func SendCartPacket(conn io.Writer, id CartId, messageType uint16, datafn func(w io.Writer) error) error {
data, err := GetData(datafn)
if err != nil {
return err
}
if conn == nil {
return fmt.Errorf("no connection to send to")
}
binary.Write(conn, binary.LittleEndian, CartPacket{
Version: 2,
MessageType: messageType,
DataLength: uint16(len(data)),
Id: id,
})
_, err = conn.Write(data)
return err
}
func SendPacket(conn io.Writer, messageType uint16, datafn func(w io.Writer) error) error {
data, err := GetData(datafn)
if err != nil {
return err
}
err = binary.Write(conn, binary.LittleEndian, Packet{
Version: 1,
MessageType: messageType,
DataLength: uint16(len(data)),
})
if err != nil {
return err
}
packetsSent.Inc()
_, err = conn.Write(data)
return err
}
func SendRawResponse(conn io.Writer, data []byte) error {
err := binary.Write(conn, binary.LittleEndian, Packet{
Version: 1,
MessageType: ResponseBody,
DataLength: uint16(len(data)),
})
if err != nil {
return err
}
_, err = conn.Write(data)
return err
}
func SendProxyResponse(conn io.Writer, data any) error {
return SendPacket(conn, ResponseBody, func(w io.Writer) error {
data, err := json.Marshal(data)
if err != nil {
return err
}
w.Write(data)
return nil
})
}
func ReadPacket[V Packet | CartPacket](conn io.Reader, packet *V) error {
func ReadPacket(conn io.Reader, packet *Packet) error {
return binary.Read(conn, binary.LittleEndian, packet)
}
func GetPacketData(conn io.Reader, len uint16) ([]byte, error) {
func ReadCartPacket(conn io.Reader, packet *CartPacket) error {
return binary.Read(conn, binary.LittleEndian, packet)
}
func GetPacketData(conn io.Reader, len uint64) ([]byte, error) {
data := make([]byte, len)
_, err := conn.Read(data)
return data, err
}
func ReceivePacket(conn io.Reader) (uint16, []byte, error) {
func ReceivePacket(conn io.Reader) (uint32, []byte, error) {
var packet Packet
err := ReadPacket(conn, &packet)
if err != nil {
return packet.MessageType, nil, err
return 0, nil, err
}
data, err := GetPacketData(conn, packet.DataLength)
if err != nil {
return packet.MessageType, nil, err
return 0, nil, err
}
return packet.MessageType, data, nil
}

View File

@@ -47,7 +47,7 @@ func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, er
if err != nil {
return nil, err
}
reply, err := g.Call(RemoteHandleMessage, g.Id, RemoteHandleMessageReply, data)
reply, err := g.Call(RemoteHandleMutation, g.Id, RemoteHandleMutationReply, data)
if err != nil {
return nil, err

View File

@@ -25,29 +25,29 @@ func NewGrainHandler(pool *GrainLocalPool, listen string) (*GrainHandler, error)
CartServer: server,
pool: pool,
}
server.HandleCall(RemoteHandleMessage, handler.RemoteHandleMessageHandler)
server.HandleCall(RemoteHandleMutation, handler.RemoteHandleMessageHandler)
server.HandleCall(RemoteGetState, handler.RemoteGetStateHandler)
return handler, err
}
func (h *GrainHandler) RemoteHandleMessageHandler(id CartId, data []byte) (uint16, []byte, error) {
func (h *GrainHandler) RemoteHandleMessageHandler(id CartId, data []byte) (uint32, []byte, error) {
var msg Message
err := ReadMessage(bytes.NewReader(data), &msg)
if err != nil {
fmt.Println("Error reading message:", err)
return RemoteHandleMessageReply, nil, err
return RemoteHandleMutationReply, nil, err
}
replyData, err := h.pool.Process(id, msg)
if err != nil {
fmt.Println("Error handling message:", err)
}
if err != nil {
return RemoteHandleMessageReply, nil, err
return RemoteHandleMutationReply, nil, err
}
return RemoteHandleMessageReply, replyData, nil
return RemoteHandleMutationReply, replyData, nil
}
func (h *GrainHandler) RemoteGetStateHandler(id CartId, data []byte) (uint16, []byte, error) {
func (h *GrainHandler) RemoteGetStateHandler(id CartId, data []byte) (uint32, []byte, error) {
reply, err := h.pool.Get(id)
if err != nil {
return RemoteGetStateReply, nil, err

View File

@@ -64,11 +64,11 @@ var (
})
)
func (p *SyncedPool) PongHandler(data []byte) (uint16, []byte, error) {
func (p *SyncedPool) PongHandler(data []byte) (uint32, []byte, error) {
return Pong, data, nil
}
func (p *SyncedPool) GetCartIdHandler(data []byte) (uint16, []byte, error) {
func (p *SyncedPool) GetCartIdHandler(data []byte) (uint32, []byte, error) {
ids := make([]string, 0, len(p.local.grains))
for id := range p.local.grains {
ids = append(ids, id.String())
@@ -76,7 +76,7 @@ func (p *SyncedPool) GetCartIdHandler(data []byte) (uint16, []byte, error) {
return CartIdsResponse, []byte(strings.Join(ids, ";")), nil
}
func (p *SyncedPool) NegotiateHandler(data []byte) (uint16, []byte, error) {
func (p *SyncedPool) NegotiateHandler(data []byte) (uint32, []byte, error) {
negotiationCount.Inc()
log.Printf("Handling negotiation\n")
for _, host := range p.ExcludeKnown(strings.Split(string(data), ";")) {
@@ -89,7 +89,7 @@ func (p *SyncedPool) NegotiateHandler(data []byte) (uint16, []byte, error) {
return RemoteNegotiateResponse, []byte("ok"), nil
}
func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint16, []byte, error) {
func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint32, []byte, error) {
grainSyncCount.Inc()
idAndHostParts := strings.Split(string(data), ";")
@@ -241,15 +241,15 @@ func (p *SyncedPool) RemoveHostMappedCarts(host *RemoteHost) {
}
const (
RemoteNegotiate = uint16(3)
RemoteGrainChanged = uint16(4)
AckChange = uint16(5)
//AckError = uint16(6)
Ping = uint16(7)
Pong = uint16(8)
GetCartIds = uint16(9)
CartIdsResponse = uint16(10)
RemoteNegotiateResponse = uint16(11)
RemoteNegotiate = uint32(3)
RemoteGrainChanged = uint32(4)
AckChange = uint32(5)
//AckError = uint32(6)
Ping = uint32(7)
Pong = uint32(8)
GetCartIds = uint32(9)
CartIdsResponse = uint32(10)
RemoteNegotiateResponse = uint32(11)
)
func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) {
@@ -430,17 +430,8 @@ func (p *SyncedPool) getGrain(id CartId) (Grain, error) {
remoteGrain, ok := p.remoteIndex[id]
p.mu.RUnlock()
if ok {
if remoteGrain == nil {
// p.remoteIndex[id].Delete(id)
p.mu.Lock()
delete(p.remoteIndex, id)
p.mu.Unlock()
log.Printf("Remote grain %s is nil\n", id)
//return nil, fmt.Errorf("remote pool is nil for %v", id)
} else {
remoteLookupCount.Inc()
return remoteGrain, nil
}
remoteLookupCount.Inc()
return remoteGrain, nil
}
err := p.RequestOwnership(id)

View File

@@ -64,7 +64,7 @@ func (m *CartTCPClient) Connect() error {
return nil
}
func (m *CartTCPClient) SendPacket(messageType uint16, id CartId, data []byte) error {
func (m *CartTCPClient) SendPacket(messageType uint32, id CartId, data []byte) error {
err := m.Connect()
if err != nil {
return err
@@ -72,7 +72,7 @@ func (m *CartTCPClient) SendPacket(messageType uint16, id CartId, data []byte) e
err = binary.Write(m.Conn, binary.LittleEndian, CartPacket{
Version: 1,
MessageType: messageType,
DataLength: uint16(len(data)),
DataLength: uint64(len(data)),
Id: id,
})
if err != nil {
@@ -90,7 +90,7 @@ func (m *CartTCPClient) SendPacket(messageType uint16, id CartId, data []byte) e
// return m.SendPacket(messageType, id, data)
// }
func (m *CartTCPClient) Call(messageType uint16, id CartId, responseType uint16, data []byte) ([]byte, error) {
func (m *CartTCPClient) Call(messageType uint32, id CartId, responseType uint32, data []byte) ([]byte, error) {
err := m.SendPacket(messageType, id, data)
if err != nil {
return nil, err

View File

@@ -36,21 +36,21 @@ func CartListen(address string) (*CartServer, error) {
type TCPCartServerMux struct {
mu sync.RWMutex
listeners map[uint16]func(CartId, []byte) error
functions map[uint16]func(CartId, []byte) (uint16, []byte, error)
listeners map[uint32]func(CartId, []byte) error
functions map[uint32]func(CartId, []byte) (uint32, []byte, error)
}
func NewCartTCPServerMux(maxClients int) *TCPCartServerMux {
m := &TCPCartServerMux{
mu: sync.RWMutex{},
listeners: make(map[uint16]func(CartId, []byte) error),
functions: make(map[uint16]func(CartId, []byte) (uint16, []byte, error)),
listeners: make(map[uint32]func(CartId, []byte) error),
functions: make(map[uint32]func(CartId, []byte) (uint32, []byte, error)),
}
return m
}
func (m *TCPCartServerMux) handleListener(messageType uint16, id CartId, data []byte) (bool, error) {
func (m *TCPCartServerMux) handleListener(messageType uint32, id CartId, data []byte) (bool, error) {
m.mu.RLock()
handler, ok := m.listeners[messageType]
m.mu.RUnlock()
@@ -63,19 +63,19 @@ func (m *TCPCartServerMux) handleListener(messageType uint16, id CartId, data []
return false, nil
}
func (m *TCPCartServerMux) handleFunction(connection net.Conn, messageType uint16, id CartId, data []byte) (bool, error) {
func (m *TCPCartServerMux) handleFunction(connection net.Conn, messageType uint32, id CartId, data []byte) (bool, error) {
m.mu.RLock()
function, ok := m.functions[messageType]
fn, ok := m.functions[messageType]
m.mu.RUnlock()
if ok {
responseType, responseData, err := function(id, data)
responseType, responseData, err := fn(id, data)
if err != nil {
return true, err
}
err = binary.Write(connection, binary.LittleEndian, CartPacket{
Version: 1,
MessageType: responseType,
DataLength: uint16(len(responseData)),
DataLength: uint64(len(responseData)),
Id: id,
})
if err != nil {
@@ -93,7 +93,7 @@ func (m *TCPCartServerMux) HandleConnection(connection net.Conn) error {
var err error
defer connection.Close()
for {
err = ReadPacket(connection, &packet)
err = ReadCartPacket(connection, &packet)
if err != nil {
if err == io.EOF {
return nil
@@ -122,13 +122,13 @@ func (m *TCPCartServerMux) HandleConnection(connection net.Conn) error {
}
}
func (m *TCPCartServerMux) ListenFor(messageType uint16, handler func(CartId, []byte) error) {
func (m *TCPCartServerMux) ListenFor(messageType uint32, handler func(CartId, []byte) error) {
m.mu.Lock()
m.listeners[messageType] = handler
m.mu.Unlock()
}
func (m *TCPCartServerMux) HandleCall(messageType uint16, handler func(CartId, []byte) (uint16, []byte, error)) {
func (m *TCPCartServerMux) HandleCall(messageType uint32, handler func(CartId, []byte) (uint32, []byte, error)) {
m.mu.Lock()
m.functions[messageType] = handler
m.mu.Unlock()

View File

@@ -58,10 +58,8 @@ func (m *TCPClient) Connect() error {
func (m *TCPClient) HandleConnectionError(err error) error {
if err != nil {
m.Errors <- err
m.ErrorCount++
}
return err
}
@@ -70,7 +68,7 @@ func (m *TCPClient) Close() {
m.Conn.Close()
}
func (m *TCPClient) SendPacket(messageType uint16, data []byte) error {
func (m *TCPClient) SendPacket(messageType uint32, data []byte) error {
err := m.Connect()
if err != nil {
return err
@@ -78,7 +76,7 @@ func (m *TCPClient) SendPacket(messageType uint16, data []byte) error {
err = binary.Write(m.Conn, binary.LittleEndian, Packet{
Version: 1,
MessageType: messageType,
DataLength: uint16(len(data)),
DataLength: uint64(len(data)),
})
if err != nil {
return m.HandleConnectionError(err)
@@ -87,7 +85,7 @@ func (m *TCPClient) SendPacket(messageType uint16, data []byte) error {
return m.HandleConnectionError(err)
}
// func (m *TCPClient) SendPacketFn(messageType uint16, datafn func(w io.Writer) error) error {
// func (m *TCPClient) SendPacketFn(messageType uint32, datafn func(w io.Writer) error) error {
// data, err := GetData(datafn)
// if err != nil {
// return err
@@ -95,7 +93,7 @@ func (m *TCPClient) SendPacket(messageType uint16, data []byte) error {
// return m.SendPacket(messageType, data)
// }
func (m *TCPClient) Call(messageType uint16, responseType uint16, data []byte) ([]byte, error) {
func (m *TCPClient) Call(messageType uint32, responseType uint32, data []byte) ([]byte, error) {
err := m.SendPacket(messageType, data)
if err != nil {
return nil, err

View File

@@ -36,21 +36,21 @@ func Listen(address string) (*Server, error) {
type TCPServerMux struct {
mu sync.RWMutex
listeners map[uint16]func(data []byte) error
functions map[uint16]func(data []byte) (uint16, []byte, error)
listeners map[uint32]func(data []byte) error
functions map[uint32]func(data []byte) (uint32, []byte, error)
}
func NewTCPServerMux(maxClients int) *TCPServerMux {
m := &TCPServerMux{
mu: sync.RWMutex{},
listeners: make(map[uint16]func(data []byte) error),
functions: make(map[uint16]func(data []byte) (uint16, []byte, error)),
listeners: make(map[uint32]func(data []byte) error),
functions: make(map[uint32]func(data []byte) (uint32, []byte, error)),
}
return m
}
func (m *TCPServerMux) handleListener(messageType uint16, data []byte) (bool, error) {
func (m *TCPServerMux) handleListener(messageType uint32, data []byte) (bool, error) {
m.mu.RLock()
handler, ok := m.listeners[messageType]
m.mu.RUnlock()
@@ -63,7 +63,7 @@ func (m *TCPServerMux) handleListener(messageType uint16, data []byte) (bool, er
return false, nil
}
func (m *TCPServerMux) handleFunction(connection net.Conn, messageType uint16, data []byte) (bool, error) {
func (m *TCPServerMux) handleFunction(connection net.Conn, messageType uint32, data []byte) (bool, error) {
m.mu.RLock()
function, ok := m.functions[messageType]
m.mu.RUnlock()
@@ -75,7 +75,7 @@ func (m *TCPServerMux) handleFunction(connection net.Conn, messageType uint16, d
err = binary.Write(connection, binary.LittleEndian, Packet{
Version: 1,
MessageType: responseType,
DataLength: uint16(len(responseData)),
DataLength: uint64(len(responseData)),
})
if err != nil {
return true, err
@@ -116,13 +116,13 @@ func (m *TCPServerMux) HandleConnection(connection net.Conn) error {
}
}
func (m *TCPServerMux) ListenFor(messageType uint16, handler func(data []byte) error) {
func (m *TCPServerMux) ListenFor(messageType uint32, handler func(data []byte) error) {
m.mu.Lock()
m.listeners[messageType] = handler
m.mu.Unlock()
}
func (m *TCPServerMux) HandleCall(messageType uint16, handler func(data []byte) (uint16, []byte, error)) {
func (m *TCPServerMux) HandleCall(messageType uint32, handler func(data []byte) (uint32, []byte, error)) {
m.mu.Lock()
m.functions[messageType] = handler
m.mu.Unlock()

View File

@@ -7,11 +7,11 @@ import (
func TestTcpHelpers(t *testing.T) {
server, err := Listen(":1337")
server, err := Listen("localhost:51337")
if err != nil {
t.Errorf("Error listening: %v\n", err)
}
client, err := Dial("localhost:1337")
client, err := Dial("localhost:51337")
if err != nil {
t.Errorf("Error dialing: %v\n", err)
}
@@ -21,7 +21,7 @@ func TestTcpHelpers(t *testing.T) {
messageData = string(data)
return nil
})
server.HandleCall(2, func(data []byte) (uint16, []byte, error) {
server.HandleCall(2, func(data []byte) (uint32, []byte, error) {
log.Printf("Received call: %s\n", string(data))
return 3, []byte("Hello, client!"), nil
})