From eea165a6299dac2d1797ae02561490b3b3e4a826 Mon Sep 17 00:00:00 2001 From: matst80 Date: Sat, 9 Nov 2024 21:59:01 +0100 Subject: [PATCH] should work better --- synced-pool.go | 49 ++++++++++++++++++++++++++++++++------------- synced-pool_test.go | 2 +- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/synced-pool.go b/synced-pool.go index b1e74c4..ff69246 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -163,6 +163,7 @@ const ( type PacketWithData struct { MessageType uint16 + Added time.Time Data []byte } @@ -182,34 +183,52 @@ func NewPacketQueue(connection net.Conn) *PacketQueue { messageType, data, err := ReceivePacket(queue.connection) if err != nil { log.Printf("Error receiving packet: %v\n", err) - return + //return } + log.Printf("Received packet %d\n", messageType) queue.mu.Lock() queue.Packets = append(queue.Packets, PacketWithData{ MessageType: messageType, + Added: time.Now(), Data: data, }) queue.mu.Unlock() } }() + go func(queueTimer *time.Ticker) { + for { + <-queueTimer.C + queue.mu.Lock() + for i, packet := range queue.Packets { + if time.Since(packet.Added) > time.Second*5 { + queue.Packets = append(queue.Packets[:i], queue.Packets[i+1:]...) + } + } + queue.mu.Unlock() + } + }(time.NewTicker(time.Second)) return queue } -func (p *PacketQueue) Expect(messageType uint16, timeToWait time.Duration) (PacketWithData, error) { +func (p *PacketQueue) Expect(messageType uint16, timeToWait time.Duration) (*PacketWithData, error) { start := time.Now() for { if time.Since(start) > timeToWait { - return PacketWithData{}, fmt.Errorf("timeout waiting for message type %d", messageType) + return nil, fmt.Errorf("timeout waiting for message type %d", messageType) } for i, packet := range p.Packets { - if packet.MessageType == messageType { + if packet.MessageType == messageType && packet.Added.After(start) { + toReturn := PacketWithData{ + MessageType: packet.MessageType, + Data: packet.Data, + } p.mu.Lock() p.Packets = append(p.Packets[:i], p.Packets[i+1:]...) p.mu.Unlock() - return packet, nil + return &toReturn, nil } } - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Millisecond * 5) } } @@ -243,13 +262,6 @@ func (p *SyncedPool) handleConnection(conn net.Conn) { knownHosts := strings.Split(string(data), ";") log.Printf("Negotiated with remote, found %v hosts\n", knownHosts) - for _, h := range knownHosts { - err = p.AddRemoteWithConnection(h, conn) - if err != nil { - log.Printf("Error adding remote %s: %v\n", h, err) - } - } - SendPacket(conn, RemoteNegotiate, func(w io.Writer) error { hostnames := make([]string, 0, len(p.remotes)) for _, r := range p.remotes { @@ -258,6 +270,12 @@ func (p *SyncedPool) handleConnection(conn net.Conn) { w.Write([]byte(strings.Join(hostnames, ";"))) return nil }) + for _, h := range knownHosts { + err = p.AddRemote(h) + if err != nil { + log.Printf("Error adding remote %s: %v\n", h, err) + } + } case RemoteGrainChanged: // remote grain changed grainSyncCount.Inc() @@ -298,10 +316,13 @@ func (p *SyncedPool) handleConnection(conn net.Conn) { } func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) { - SendPacket(h.connection, RemoteNegotiate, func(w io.Writer) error { + err := SendPacket(h.connection, RemoteNegotiate, func(w io.Writer) error { w.Write([]byte(strings.Join(knownHosts, ";"))) return nil }) + if err != nil { + return nil, err + } packet, err := h.queue.Expect(RemoteNegotiate, time.Second) if err != nil { diff --git a/synced-pool_test.go b/synced-pool_test.go index 00ce388..89d1ec2 100644 --- a/synced-pool_test.go +++ b/synced-pool_test.go @@ -7,7 +7,7 @@ import ( func TestConnection(t *testing.T) { // TestConnection tests the connection to the server - t.Log("Testing connection to server") + localPool := NewGrainLocalPool(100, time.Minute, func(id CartId) (*CartGrain, error) { return &CartGrain{ Id: id,