more locks
This commit is contained in:
@@ -69,6 +69,8 @@ func (p *SyncedPool) PongHandler(data *FrameWithPayload, resultChan chan<- Frame
|
|||||||
|
|
||||||
func (p *SyncedPool) GetCartIdHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
|
func (p *SyncedPool) GetCartIdHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
|
||||||
ids := make([]string, 0, len(p.local.grains))
|
ids := make([]string, 0, len(p.local.grains))
|
||||||
|
p.mu.RLock()
|
||||||
|
defer p.mu.RUnlock()
|
||||||
for id := range p.local.grains {
|
for id := range p.local.grains {
|
||||||
if p.local.grains[id] == nil {
|
if p.local.grains[id] == nil {
|
||||||
continue
|
continue
|
||||||
@@ -140,29 +142,37 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
|
|||||||
log.Printf("Invalid grain id, %s", id)
|
log.Printf("Invalid grain id, %s", id)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if p.local.grains[id] != nil {
|
p.mu.RLock()
|
||||||
|
localGrain, ok := p.local.grains[id]
|
||||||
|
p.mu.RUnlock()
|
||||||
|
|
||||||
|
if ok && localGrain != nil {
|
||||||
log.Printf("Grain %s already exists locally, owner is (%s)", id, host)
|
log.Printf("Grain %s already exists locally, owner is (%s)", id, host)
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
delete(p.local.grains, id)
|
delete(p.local.grains, id)
|
||||||
p.mu.Unlock()
|
p.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
remote, err := NewRemoteGrain(id, host)
|
go func(i CartId, h string) {
|
||||||
if err != nil {
|
remote, err := NewRemoteGrain(i, h)
|
||||||
log.Printf("Error creating remote grain %v", err)
|
if err != nil {
|
||||||
return
|
log.Printf("Error creating remote grain %v", err)
|
||||||
}
|
return
|
||||||
|
}
|
||||||
|
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
p.remoteIndex[id] = remote
|
p.remoteIndex[i] = remote
|
||||||
p.mu.Unlock()
|
p.mu.Unlock()
|
||||||
|
}(id, host)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SyncedPool) HandleHostError(host string) {
|
func (p *SyncedPool) HandleHostError(host string) {
|
||||||
|
p.mu.RLock()
|
||||||
|
defer p.mu.RUnlock()
|
||||||
for _, r := range p.remotes {
|
for _, r := range p.remotes {
|
||||||
if r.Host == host {
|
if r.Host == host {
|
||||||
if !r.IsHealthy() {
|
if !r.IsHealthy() {
|
||||||
p.RemoveHost(r)
|
go p.RemoveHost(r)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -80,15 +80,14 @@ func (c *Connection) CallAsync(msg FrameType, payload []byte, ch chan<- FrameWit
|
|||||||
}
|
}
|
||||||
go WaitForFrame(conn, ch)
|
go WaitForFrame(conn, ch)
|
||||||
|
|
||||||
toSend := MakeFrameWithPayload(msg, 1, payload)
|
go func(toSend FrameWithPayload) {
|
||||||
|
err = SendFrame(conn, &toSend)
|
||||||
err = SendFrame(conn, &toSend)
|
if err != nil {
|
||||||
if err != nil {
|
log.Printf("Error sending frame: %v\n", err)
|
||||||
log.Printf("Error sending frame: %v\n", err)
|
//close(ch)
|
||||||
close(ch)
|
//conn.Close()
|
||||||
conn.Close()
|
}
|
||||||
return nil, err
|
}(MakeFrameWithPayload(msg, 1, payload))
|
||||||
}
|
|
||||||
|
|
||||||
c.count++
|
c.count++
|
||||||
return conn, nil
|
return conn, nil
|
||||||
@@ -97,10 +96,13 @@ func (c *Connection) CallAsync(msg FrameType, payload []byte, ch chan<- FrameWit
|
|||||||
func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error) {
|
func (c *Connection) Call(msg FrameType, data []byte) (*FrameWithPayload, error) {
|
||||||
ch := make(chan FrameWithPayload, 1)
|
ch := make(chan FrameWithPayload, 1)
|
||||||
conn, err := c.CallAsync(msg, data, ch)
|
conn, err := c.CallAsync(msg, data, ch)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case ret := <-ch:
|
case ret := <-ch:
|
||||||
return &ret, nil
|
return &ret, nil
|
||||||
|
|||||||
Reference in New Issue
Block a user