From 3615d2d7d142201b5ff49dec80435f67c4602bc8 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 13 Nov 2024 08:58:40 +0100 Subject: [PATCH] slaskit --- discovery_test.go | 4 +++- synced-pool.go | 5 +++++ synced-pool_test.go | 12 +++++++++--- tcp-client.go | 2 ++ 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/discovery_test.go b/discovery_test.go index 7010755..a3b4c40 100644 --- a/discovery_test.go +++ b/discovery_test.go @@ -41,8 +41,10 @@ func TestWatch(t *testing.T) { if err != nil { t.Errorf("Error watching: %v", err) } + select { - case <-ch: + case m := <-ch: + t.Logf("Received watch %v", m) case <-time.After(5 * time.Second): t.Errorf("Timeout waiting for watch") } diff --git a/synced-pool.go b/synced-pool.go index 24615d1..75faee8 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -244,6 +244,7 @@ func (p *SyncedPool) IsKnown(host string) bool { return true } } + return host == p.Hostname } @@ -321,6 +322,7 @@ func (p *SyncedPool) Negotiate() { func (p *SyncedPool) RequestOwnership(id CartId) error { ok := 0 all := 0 + p.mu.RLock() for _, r := range p.remotes { log.Printf("Asking for confirmation change of %s to %s (me) with %s\n", id, p.Hostname, r.Host) err := r.ConfirmChange(id, p.Hostname) @@ -336,6 +338,7 @@ func (p *SyncedPool) RequestOwnership(id CartId) error { } ok++ } + p.mu.RUnlock() if ok == 0 && all > 0 { p.removeLocalGrain(id) return fmt.Errorf("no remotes confirmed change") @@ -354,6 +357,8 @@ func (p *SyncedPool) removeLocalGrain(id CartId) { } func (p *SyncedPool) AddRemote(host string) error { + p.mu.Lock() + defer p.mu.Unlock() _, hasHost := p.remotes[host] if host == "" || p.IsKnown(host) || hasHost { return nil diff --git a/synced-pool_test.go b/synced-pool_test.go index 5937f0f..df37e90 100644 --- a/synced-pool_test.go +++ b/synced-pool_test.go @@ -16,12 +16,12 @@ func TestConnection(t *testing.T) { TotalPrice: 0, }, nil }) - pool, err := NewSyncedPool(localPool, "localhost", nil) + pool, err := NewSyncedPool(localPool, "127.0.0.1", nil) if err != nil { t.Errorf("Error creating pool: %v", err) } - err = pool.AddRemote("localhost") + err = pool.AddRemote("127.0.0.1") if err != nil { t.Errorf("Error adding remote: %v", err) } @@ -35,5 +35,11 @@ func TestConnection(t *testing.T) { t.Errorf("Expected data, got nil") } time.Sleep(2 * time.Millisecond) - + data, err = pool.Get(ToCartId("kalle")) + if err != nil { + t.Errorf("Error getting data: %v", err) + } + if data == nil { + t.Errorf("Expected data, got nil") + } } diff --git a/tcp-client.go b/tcp-client.go index 1f6e699..e05e214 100644 --- a/tcp-client.go +++ b/tcp-client.go @@ -57,6 +57,7 @@ func (m *PersistentConnection) Connect() error { if !m.Dead { connection, err := net.Dial("tcp", m.address) if err != nil { + log.Printf("Error connecting to %s: %v\n", m.address, err) m.Died <- true m.Dead = true return err @@ -74,6 +75,7 @@ func (m *PersistentConnection) Close() { func (m *PersistentConnection) HandleConnectionError(err error) error { if err != nil { + log.Printf("Error from to %s: %v\n", m.address, err) m.Conn.Close() m.Connect() }