91 lines
2.1 KiB
Go
91 lines
2.1 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
)
|
|
|
|
type RemoteHost struct {
|
|
*Connection
|
|
Host string
|
|
MissedPings int
|
|
}
|
|
|
|
func (h *RemoteHost) IsHealthy() bool {
|
|
return h.MissedPings < 3
|
|
}
|
|
|
|
func (h *RemoteHost) Initialize(p *SyncedPool) {
|
|
log.Printf("Initializing remote %s\n", h.Host)
|
|
ids, err := h.GetCartMappings()
|
|
if err != nil {
|
|
log.Printf("Error getting remote mappings: %v\n", err)
|
|
return
|
|
}
|
|
log.Printf("Remote %s has %d grains\n", h.Host, len(ids))
|
|
p.mu.Lock()
|
|
local := 0
|
|
remoteNo := 0
|
|
for _, id := range ids {
|
|
go p.SpawnRemoteGrain(id, h.Host)
|
|
remoteNo++
|
|
}
|
|
log.Printf("Removed %d local grains, added %d remote grains\n", local, remoteNo)
|
|
p.mu.Unlock()
|
|
go p.Negotiate()
|
|
|
|
}
|
|
|
|
func (h *RemoteHost) Ping() error {
|
|
result, err := h.Call(Ping, nil)
|
|
|
|
if err != nil || result.StatusCode != 200 || result.Type != Pong {
|
|
h.MissedPings++
|
|
log.Printf("Error pinging remote %s, missed pings: %d", h.Host, h.MissedPings)
|
|
} else {
|
|
h.MissedPings = 0
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) {
|
|
reply, err := h.Call(RemoteNegotiate, []byte(strings.Join(knownHosts, ";")))
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if reply.StatusCode != 200 {
|
|
return nil, fmt.Errorf("remote returned error on negotiate: %s", string(reply.Payload))
|
|
}
|
|
|
|
return strings.Split(string(reply.Payload), ";"), nil
|
|
}
|
|
|
|
func (g *RemoteHost) GetCartMappings() ([]CartId, error) {
|
|
reply, err := g.Call(GetCartIds, []byte{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if reply.StatusCode != 200 || reply.Type != CartIdsResponse {
|
|
log.Printf("Remote returned error on get cart mappings: %s", string(reply.Payload))
|
|
return nil, fmt.Errorf("remote returned incorrect data")
|
|
}
|
|
parts := strings.Split(string(reply.Payload), ";")
|
|
ids := make([]CartId, 0, len(parts))
|
|
for _, p := range parts {
|
|
ids = append(ids, ToCartId(p))
|
|
}
|
|
return ids, nil
|
|
}
|
|
|
|
func (r *RemoteHost) ConfirmChange(id CartId, host string) error {
|
|
reply, err := r.Call(RemoteGrainChanged, []byte(fmt.Sprintf("%s;%s", id, host)))
|
|
|
|
if err != nil || reply.StatusCode != 200 || reply.Type != AckChange {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|