From 893abe9ae2d044ebad71c68026d35d3fa6ae5f8a Mon Sep 17 00:00:00 2001 From: matst80 Date: Thu, 21 Nov 2024 18:54:18 +0100 Subject: [PATCH] more locks --- synced-pool.go | 30 ++++++++++++++++++++---------- tcp-connection.go | 20 +++++++++++--------- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/synced-pool.go b/synced-pool.go index aa91021..884d1ac 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -69,6 +69,8 @@ func (p *SyncedPool) PongHandler(data *FrameWithPayload, resultChan chan<- Frame func (p *SyncedPool) GetCartIdHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error { ids := make([]string, 0, len(p.local.grains)) + p.mu.RLock() + defer p.mu.RUnlock() for id := range p.local.grains { if p.local.grains[id] == nil { continue @@ -140,29 +142,37 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { log.Printf("Invalid grain id, %s", id) return } - if p.local.grains[id] != nil { + p.mu.RLock() + localGrain, ok := p.local.grains[id] + p.mu.RUnlock() + + if ok && localGrain != nil { log.Printf("Grain %s already exists locally, owner is (%s)", id, host) p.mu.Lock() delete(p.local.grains, id) p.mu.Unlock() } - remote, err := NewRemoteGrain(id, host) - if err != nil { - log.Printf("Error creating remote grain %v", err) - return - } + go func(i CartId, h string) { + remote, err := NewRemoteGrain(i, h) + if err != nil { + log.Printf("Error creating remote grain %v", err) + return + } - p.mu.Lock() - p.remoteIndex[id] = remote - p.mu.Unlock() + p.mu.Lock() + p.remoteIndex[i] = remote + p.mu.Unlock() + }(id, host) } func (p *SyncedPool) HandleHostError(host string) { + p.mu.RLock() + defer p.mu.RUnlock() for _, r := range p.remotes { if r.Host == host { if !r.IsHealthy() { - p.RemoveHost(r) + go p.RemoveHost(r) } return } diff --git a/tcp-connection.go b/tcp-connection.go index af91b0b..b0e5525 100644 --- a/tcp-connection.go +++ b/tcp-connection.go @@ -80,15 +80,14 @@ func (c *Connection) CallAsync(msg FrameType, payload []byte, ch chan<- FrameWit } go WaitForFrame(conn, ch) - toSend := MakeFrameWithPayload(msg, 1, payload) - - err = SendFrame(conn, &toSend) - if err != nil { - log.Printf("Error sending frame: %v\n", err) - close(ch) - conn.Close() - return nil, err - } + go func(toSend FrameWithPayload) { + err = SendFrame(conn, &toSend) + if err != nil { + log.Printf("Error sending frame: %v\n", err) + //close(ch) + //conn.Close() + } + }(MakeFrameWithPayload(msg, 1, payload)) c.count++ return conn, nil @@ -97,10 +96,13 @@ func (c *Connection) CallAsync(msg FrameType, payload []byte, ch chan<- FrameWit func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error) { ch := make(chan FrameWithPayload, 1) conn, err := c.CallAsync(msg, data, ch) + if err != nil { return nil, err } + defer conn.Close() + select { case ret := <-ch: return &ret, nil