diff --git a/main.go b/main.go index f8fb35d..ef33359 100644 --- a/main.go +++ b/main.go @@ -233,6 +233,7 @@ func main() { go func() { sig := <-sigs fmt.Println("Shutting down due to signal:", sig) + go syncedPool.Close() app.Save() done <- true }() diff --git a/synced-pool.go b/synced-pool.go index 3a4761b..2d6bf76 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -213,6 +213,7 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) server.AddHandler(GetCartIds, pool.GetCartIdHandler) server.AddHandler(RemoteNegotiate, pool.NegotiateHandler) server.AddHandler(RemoteGrainChanged, pool.GrainOwnerChangeHandler) + server.AddHandler(Closing, pool.HostTerminatingHandler) if discovery != nil { go func() { @@ -252,6 +253,21 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) return pool, nil } +func (p *SyncedPool) HostTerminatingHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error { + log.Printf("Remote host terminating") + host := string(data.Payload) + p.mu.RLock() + defer p.mu.RUnlock() + for _, r := range p.remotes { + if r.Host == host { + go p.RemoveHost(r) + break + } + } + resultChan <- MakeFrameWithPayload(Pong, 200, []byte("ok")) + return nil +} + func (p *SyncedPool) IsHealthy() bool { for _, r := range p.remotes { if !r.IsHealthy() { @@ -310,6 +326,7 @@ const ( GetCartIds = FrameType(9) CartIdsResponse = FrameType(10) RemoteNegotiateResponse = FrameType(11) + Closing = FrameType(12) ) func (p *SyncedPool) Negotiate() { @@ -479,6 +496,15 @@ func (p *SyncedPool) getGrain(id CartId) (Grain, error) { return localGrain, nil } +func (p *SyncedPool) Close() { + p.mu.Lock() + defer p.mu.Unlock() + payload := []byte(p.Hostname) + for _, r := range p.remotes { + go r.Call(Closing, payload) + } +} + func (p *SyncedPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) { pool, err := p.getGrain(id) var res *FrameWithPayload