refactor everything again
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m51s
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m51s
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
@@ -25,12 +26,11 @@ type RemoteHost struct {
|
||||
|
||||
type SyncedPool struct {
|
||||
*Server
|
||||
mu sync.RWMutex
|
||||
//Discovery Discovery
|
||||
mu sync.RWMutex
|
||||
Hostname string
|
||||
local *GrainLocalPool
|
||||
remotes []*RemoteHost
|
||||
remoteIndex map[CartId]*RemoteGrainPool
|
||||
remoteIndex map[CartId]*RemoteGrain
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -97,16 +97,23 @@ func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint16, []byte, error
|
||||
log.Printf("Invalid remote grain change message\n")
|
||||
return AckChange, []byte("incorrect"), nil
|
||||
}
|
||||
id := ToCartId(idAndHostParts[0])
|
||||
host := idAndHostParts[1]
|
||||
|
||||
for _, r := range p.remotes {
|
||||
if r.Host == string(idAndHostParts[1]) {
|
||||
log.Printf("Remote grain %s changed to %s\n", idAndHostParts[0], idAndHostParts[1])
|
||||
if r.Host == host {
|
||||
log.Printf("Remote grain %s changed to %s\n", id, host)
|
||||
p.mu.Lock()
|
||||
if p.local.grains[ToCartId(idAndHostParts[0])] != nil {
|
||||
log.Printf("Grain %s already exists locally, deleting\n", idAndHostParts[0])
|
||||
delete(p.local.grains, ToCartId(idAndHostParts[0]))
|
||||
if p.local.grains[id] != nil {
|
||||
log.Printf("Grain %s already exists locally, deleting\n", id)
|
||||
delete(p.local.grains, id)
|
||||
}
|
||||
p.remoteIndex[ToCartId(idAndHostParts[0])] = r.Pool
|
||||
grain, err := NewRemoteGrain(id, r.Host)
|
||||
if err != nil {
|
||||
log.Printf("Error creating remote grain %s: %v\n", id, err)
|
||||
return AckChange, []byte("error"), nil
|
||||
}
|
||||
p.remoteIndex[id] = grain
|
||||
p.mu.Unlock()
|
||||
return AckChange, []byte("ok"), nil
|
||||
}
|
||||
@@ -131,7 +138,7 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery)
|
||||
local: local,
|
||||
|
||||
remotes: make([]*RemoteHost, 0),
|
||||
remoteIndex: make(map[CartId]*RemoteGrainPool),
|
||||
remoteIndex: make(map[CartId]*RemoteGrain),
|
||||
}
|
||||
|
||||
server.HandleCall(Ping, pool.PongHandler)
|
||||
@@ -227,8 +234,7 @@ func (p *SyncedPool) RemoveHostMappedCarts(host *RemoteHost) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
for id, r := range p.remoteIndex {
|
||||
if r == host.Pool {
|
||||
p.remoteIndex[id].Delete(id)
|
||||
if r.Host == host.Host {
|
||||
delete(p.remoteIndex, id)
|
||||
}
|
||||
}
|
||||
@@ -340,7 +346,12 @@ func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) error {
|
||||
log.Printf("Grain %s already exists locally, deleting\n", id)
|
||||
delete(p.local.grains, id)
|
||||
}
|
||||
p.remoteIndex[id] = remote.Pool
|
||||
grain, err := NewRemoteGrain(id, remote.Host)
|
||||
if err != nil {
|
||||
log.Printf("Error creating remote grain %s: %v\n", id, err)
|
||||
continue
|
||||
}
|
||||
p.remoteIndex[id] = grain
|
||||
}
|
||||
p.mu.Unlock()
|
||||
}()
|
||||
@@ -376,47 +387,59 @@ func (p *SyncedPool) AddRemote(address string) error {
|
||||
return p.addRemoteHost(address, &remote)
|
||||
}
|
||||
|
||||
func (p *SyncedPool) getGrainPool(id CartId) (GrainPool, error) {
|
||||
_, ok := p.local.grains[id]
|
||||
func (p *SyncedPool) getGrain(id CartId) (Grain, error) {
|
||||
localGrain, ok := p.local.grains[id]
|
||||
if !ok {
|
||||
// check if remote grain exists
|
||||
p.mu.RLock()
|
||||
remotePool, ok := p.remoteIndex[id]
|
||||
remoteGrain, ok := p.remoteIndex[id]
|
||||
p.mu.RUnlock()
|
||||
if ok {
|
||||
if remotePool == nil {
|
||||
p.remoteIndex[id].Delete(id)
|
||||
if remoteGrain == nil {
|
||||
// p.remoteIndex[id].Delete(id)
|
||||
p.mu.Lock()
|
||||
delete(p.remoteIndex, id)
|
||||
p.mu.Unlock()
|
||||
return nil, fmt.Errorf("remote pool is nil for %v", id)
|
||||
}
|
||||
remoteLookupCount.Inc()
|
||||
return remotePool, nil
|
||||
}
|
||||
if !ok {
|
||||
err := p.RequestOwnership(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
log.Printf("Remote grain %s is nil\n", id)
|
||||
//return nil, fmt.Errorf("remote pool is nil for %v", id)
|
||||
} else {
|
||||
remoteLookupCount.Inc()
|
||||
return remoteGrain, nil
|
||||
}
|
||||
}
|
||||
|
||||
err := p.RequestOwnership(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
}
|
||||
return p.local, nil
|
||||
return localGrain, nil
|
||||
}
|
||||
|
||||
func (p *SyncedPool) Process(id CartId, messages ...Message) ([]byte, error) {
|
||||
pool, err := p.getGrainPool(id)
|
||||
pool, err := p.getGrain(id)
|
||||
var res []byte
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pool.Process(id, messages...)
|
||||
for _, m := range messages {
|
||||
res, err = pool.HandleMessage(&m, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (p *SyncedPool) Get(id CartId) ([]byte, error) {
|
||||
pool, err := p.getGrainPool(id)
|
||||
grain, err := p.getGrain(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pool.Get(id)
|
||||
if remoteGrain, ok := grain.(*RemoteGrain); ok {
|
||||
return remoteGrain.GetCurrentState()
|
||||
}
|
||||
|
||||
return json.Marshal(grain)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user