test/net-pool #2

Merged
mats merged 11 commits from test/net-pool into main 2024-11-23 15:30:38 +01:00
2 changed files with 27 additions and 0 deletions
Showing only changes of commit 354964040a - Show all commits

View File

@@ -233,6 +233,7 @@ func main() {
go func() { go func() {
sig := <-sigs sig := <-sigs
fmt.Println("Shutting down due to signal:", sig) fmt.Println("Shutting down due to signal:", sig)
go syncedPool.Close()
app.Save() app.Save()
done <- true done <- true
}() }()

View File

@@ -213,6 +213,7 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery)
server.AddHandler(GetCartIds, pool.GetCartIdHandler) server.AddHandler(GetCartIds, pool.GetCartIdHandler)
server.AddHandler(RemoteNegotiate, pool.NegotiateHandler) server.AddHandler(RemoteNegotiate, pool.NegotiateHandler)
server.AddHandler(RemoteGrainChanged, pool.GrainOwnerChangeHandler) server.AddHandler(RemoteGrainChanged, pool.GrainOwnerChangeHandler)
server.AddHandler(Closing, pool.HostTerminatingHandler)
if discovery != nil { if discovery != nil {
go func() { go func() {
@@ -252,6 +253,21 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery)
return pool, nil return pool, nil
} }
func (p *SyncedPool) HostTerminatingHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
log.Printf("Remote host terminating")
host := string(data.Payload)
p.mu.RLock()
defer p.mu.RUnlock()
for _, r := range p.remotes {
if r.Host == host {
go p.RemoveHost(r)
break
}
}
resultChan <- MakeFrameWithPayload(Pong, 200, []byte("ok"))
return nil
}
func (p *SyncedPool) IsHealthy() bool { func (p *SyncedPool) IsHealthy() bool {
for _, r := range p.remotes { for _, r := range p.remotes {
if !r.IsHealthy() { if !r.IsHealthy() {
@@ -310,6 +326,7 @@ const (
GetCartIds = FrameType(9) GetCartIds = FrameType(9)
CartIdsResponse = FrameType(10) CartIdsResponse = FrameType(10)
RemoteNegotiateResponse = FrameType(11) RemoteNegotiateResponse = FrameType(11)
Closing = FrameType(12)
) )
func (p *SyncedPool) Negotiate() { func (p *SyncedPool) Negotiate() {
@@ -479,6 +496,15 @@ func (p *SyncedPool) getGrain(id CartId) (Grain, error) {
return localGrain, nil return localGrain, nil
} }
func (p *SyncedPool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
payload := []byte(p.Hostname)
for _, r := range p.remotes {
go r.Call(Closing, payload)
}
}
func (p *SyncedPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) { func (p *SyncedPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) {
pool, err := p.getGrain(id) pool, err := p.getGrain(id)
var res *FrameWithPayload var res *FrameWithPayload