more mutex
Some checks failed
Build and Publish / BuildAndDeploy (push) Has been cancelled

This commit is contained in:
matst80
2024-11-09 22:06:30 +01:00
parent 14dfabc652
commit b1f6d8b071
2 changed files with 13 additions and 0 deletions

View File

@@ -5,10 +5,12 @@ import (
"io"
"net"
"strings"
"sync"
"time"
)
type RemoteGrainPool struct {
mu sync.RWMutex
Host string
grains map[CartId]RemoteGrain
}
@@ -87,7 +89,9 @@ func NewRemoteGrainPool(addr string) *RemoteGrainPool {
}
func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain {
p.mu.RLock()
grain, ok := p.grains[id]
p.mu.RUnlock()
if !ok {
return nil
}
@@ -99,7 +103,9 @@ func (p *RemoteGrainPool) findOrCreateGrain(id CartId) *RemoteGrain {
grain := p.findRemoteGrain(id)
if grain == nil {
grain = NewRemoteGrain(id, p.Host)
p.mu.Lock()
p.grains[id] = *grain
p.mu.Unlock()
grain.Connect()
}
return grain

View File

@@ -28,6 +28,7 @@ type RemoteHost struct {
}
type SyncedPool struct {
mu sync.RWMutex
Discovery Discovery
listener net.Listener
Hostname string
@@ -296,7 +297,9 @@ func (p *SyncedPool) handleConnection(conn net.Conn) {
if r.Host == string(idAndHostParts[1]) {
found = true
log.Printf("Remote grain %s changed to %s\n", idAndHostParts[0], idAndHostParts[1])
p.mu.Lock()
p.remoteIndex[ToCartId(idAndHostParts[0])] = r.Pool
p.mu.Unlock()
}
}
@@ -446,7 +449,9 @@ func (p *SyncedPool) Process(id CartId, messages ...Message) ([]byte, error) {
_, ok := p.local.grains[id]
if !ok {
// check if remote grain exists
p.mu.RLock()
remoteGrain, ok := p.remoteIndex[id]
p.mu.RUnlock()
if ok {
remoteLookupCount.Inc()
return remoteGrain.Process(id, messages...)
@@ -465,7 +470,9 @@ func (p *SyncedPool) Get(id CartId) ([]byte, error) {
_, ok := p.local.grains[id]
if !ok {
// check if remote grain exists
p.mu.RLock()
remoteGrain, ok := p.remoteIndex[id]
p.mu.RUnlock()
if ok {
remoteLookupCount.Inc()
return remoteGrain.Get(id)