use persisted connections and handle died
This commit is contained in:
@@ -115,18 +115,9 @@ func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint32, []byte, error
|
|||||||
for _, r := range p.remotes {
|
for _, r := range p.remotes {
|
||||||
if r.Host == host {
|
if r.Host == host {
|
||||||
// log.Printf("Remote grain %s changed to %s\n", id, host)
|
// log.Printf("Remote grain %s changed to %s\n", id, host)
|
||||||
p.mu.Lock()
|
|
||||||
if p.local.grains[id] != nil {
|
p.SpawnRemoteGrain(id, host)
|
||||||
log.Printf("Grain %s already exists locally, deleting\n", id)
|
|
||||||
delete(p.local.grains, id)
|
|
||||||
}
|
|
||||||
grain, err := NewRemoteGrain(id, r.Host)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Owner change failed %s: %v\n", id, err)
|
|
||||||
return AckChange, []byte("error"), err
|
|
||||||
}
|
|
||||||
p.remoteIndex[id] = grain
|
|
||||||
p.mu.Unlock()
|
|
||||||
return AckChange, []byte("ok"), nil
|
return AckChange, []byte("ok"), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -135,6 +126,33 @@ func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint32, []byte, error
|
|||||||
return AckChange, []byte("ok"), nil
|
return AckChange, []byte("ok"), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
|
||||||
|
if p.local.grains[id] != nil {
|
||||||
|
log.Printf("Grain %s already exists locally, deleting\n", id)
|
||||||
|
p.mu.Lock()
|
||||||
|
delete(p.local.grains, id)
|
||||||
|
p.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
remote, err := NewRemoteGrain(id, host)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error creating remote grain %v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
<-remote.Died
|
||||||
|
p.mu.Lock()
|
||||||
|
delete(p.remoteIndex, id)
|
||||||
|
p.mu.Unlock()
|
||||||
|
log.Printf("Remote grain %s died, host: %s\n", id.String(), host)
|
||||||
|
//p.RemoveHost(host)
|
||||||
|
}()
|
||||||
|
|
||||||
|
p.mu.Lock()
|
||||||
|
p.remoteIndex[id] = remote
|
||||||
|
p.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) {
|
func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) {
|
||||||
listen := fmt.Sprintf("%s:1338", hostname)
|
listen := fmt.Sprintf("%s:1338", hostname)
|
||||||
|
|
||||||
@@ -242,6 +260,7 @@ func (p *SyncedPool) RemoveHostMappedCarts(host *RemoteHost) {
|
|||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
for id, r := range p.remoteIndex {
|
for id, r := range p.remoteIndex {
|
||||||
if r.Host == host.Host {
|
if r.Host == host.Host {
|
||||||
|
p.remoteIndex[id].Close()
|
||||||
delete(p.remoteIndex, id)
|
delete(p.remoteIndex, id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -385,17 +404,11 @@ func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) error {
|
|||||||
local := 0
|
local := 0
|
||||||
remoteNo := 0
|
remoteNo := 0
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
if p.local.grains[id] != nil {
|
|
||||||
local++
|
go p.SpawnRemoteGrain(id, remote.Host)
|
||||||
delete(p.local.grains, id)
|
|
||||||
}
|
|
||||||
grain, err := NewRemoteGrain(id, remote.Host)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Error creating remote grain %s: %v\n", id, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
remoteNo++
|
remoteNo++
|
||||||
p.remoteIndex[id] = grain
|
|
||||||
}
|
}
|
||||||
log.Printf("Removed %d local grains, added %d remote grains\n", local, remoteNo)
|
log.Printf("Removed %d local grains, added %d remote grains\n", local, remoteNo)
|
||||||
p.mu.Unlock()
|
p.mu.Unlock()
|
||||||
@@ -447,6 +460,11 @@ func (p *SyncedPool) AddRemote(host string) error {
|
|||||||
MissedPings: 0,
|
MissedPings: 0,
|
||||||
Host: host,
|
Host: host,
|
||||||
}
|
}
|
||||||
|
go func() {
|
||||||
|
<-remote.Died
|
||||||
|
log.Printf("Removing host, remote died %s", host)
|
||||||
|
p.RemoveHost(&remote)
|
||||||
|
}()
|
||||||
go func() {
|
go func() {
|
||||||
for range time.Tick(time.Second * 3) {
|
for range time.Tick(time.Second * 3) {
|
||||||
var err error
|
var err error
|
||||||
|
|||||||
@@ -33,6 +33,8 @@ type TCPClient struct {
|
|||||||
|
|
||||||
type PersistentConnection struct {
|
type PersistentConnection struct {
|
||||||
net.Conn
|
net.Conn
|
||||||
|
Died chan bool
|
||||||
|
Dead bool
|
||||||
address string
|
address string
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -43,21 +45,29 @@ func NewPersistentConnection(address string) (*PersistentConnection, error) {
|
|||||||
}
|
}
|
||||||
return &PersistentConnection{
|
return &PersistentConnection{
|
||||||
Conn: connection,
|
Conn: connection,
|
||||||
|
Died: make(chan bool, 1),
|
||||||
|
Dead: false,
|
||||||
address: address,
|
address: address,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *PersistentConnection) Connect() error {
|
func (m *PersistentConnection) Connect() error {
|
||||||
connection, err := net.Dial("tcp", m.address)
|
if !m.Dead {
|
||||||
if err != nil {
|
connection, err := net.Dial("tcp", m.address)
|
||||||
return err
|
if err != nil {
|
||||||
|
m.Died <- true
|
||||||
|
m.Dead = true
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m.Conn = connection
|
||||||
}
|
}
|
||||||
m.Conn = connection
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *PersistentConnection) Close() {
|
func (m *PersistentConnection) Close() {
|
||||||
m.Conn.Close()
|
m.Conn.Close()
|
||||||
|
m.Died <- true
|
||||||
|
m.Dead = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *PersistentConnection) HandleConnectionError(err error) error {
|
func (m *PersistentConnection) HandleConnectionError(err error) error {
|
||||||
|
|||||||
Reference in New Issue
Block a user