474 lines
11 KiB
Go
474 lines
11 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
)
|
|
|
|
type Quorum interface {
|
|
Negotiate(knownHosts []string) ([]string, error)
|
|
OwnerChanged(CartId, host string) error
|
|
}
|
|
|
|
type HealthHandler interface {
|
|
IsHealthy() bool
|
|
}
|
|
|
|
type SyncedPool struct {
|
|
*Server
|
|
mu sync.RWMutex
|
|
Hostname string
|
|
local *GrainLocalPool
|
|
remotes map[string]*RemoteHost
|
|
remoteIndex map[CartId]*RemoteGrain
|
|
}
|
|
|
|
var (
|
|
negotiationCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_remote_negotiation_total",
|
|
Help: "The total number of remote negotiations",
|
|
})
|
|
grainSyncCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_grain_sync_total",
|
|
Help: "The total number of grain owner changes",
|
|
})
|
|
connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_connected_remotes",
|
|
Help: "The number of connected remotes",
|
|
})
|
|
remoteLookupCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_remote_lookup_total",
|
|
Help: "The total number of remote lookups",
|
|
})
|
|
packetQueue = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_packet_queue_size",
|
|
Help: "The total number of packets in the queue",
|
|
})
|
|
packetsSent = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_pool_packets_sent_total",
|
|
Help: "The total number of packets sent",
|
|
})
|
|
packetsReceived = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_pool_packets_received_total",
|
|
Help: "The total number of packets received",
|
|
})
|
|
)
|
|
|
|
func (p *SyncedPool) PongHandler(data []byte) (PoolMessage, []byte, error) {
|
|
return Pong, data, nil
|
|
}
|
|
|
|
func (p *SyncedPool) GetCartIdHandler(data []byte) (PoolMessage, []byte, error) {
|
|
ids := make([]string, 0, len(p.local.grains))
|
|
for id := range p.local.grains {
|
|
if p.local.grains[id] == nil {
|
|
continue
|
|
}
|
|
s := id.String()
|
|
if s == "" {
|
|
continue
|
|
}
|
|
ids = append(ids, s)
|
|
}
|
|
log.Printf("Returning %d cart ids\n", len(ids))
|
|
return CartIdsResponse, []byte(strings.Join(ids, ";")), nil
|
|
}
|
|
|
|
func (p *SyncedPool) NegotiateHandler(data []byte) (PoolMessage, []byte, error) {
|
|
negotiationCount.Inc()
|
|
log.Printf("Handling negotiation\n")
|
|
for _, host := range p.ExcludeKnown(strings.Split(string(data), ";")) {
|
|
if host == "" {
|
|
continue
|
|
}
|
|
go p.AddRemote(host)
|
|
|
|
}
|
|
|
|
return RemoteNegotiateResponse, []byte("ok"), nil
|
|
}
|
|
|
|
func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (PoolMessage, []byte, error) {
|
|
grainSyncCount.Inc()
|
|
|
|
idAndHostParts := strings.Split(string(data), ";")
|
|
if len(idAndHostParts) != 2 {
|
|
log.Printf("Invalid remote grain change message\n")
|
|
return AckChange, []byte("incorrect"), fmt.Errorf("invalid remote grain change message")
|
|
}
|
|
id := ToCartId(idAndHostParts[0])
|
|
host := idAndHostParts[1]
|
|
log.Printf("Handling remote grain owner change to %s for id %s\n", host, id)
|
|
for _, r := range p.remotes {
|
|
if r.Host == host && r.IsHealthy() {
|
|
// log.Printf("Remote grain %s changed to %s\n", id, host)
|
|
|
|
go p.SpawnRemoteGrain(id, host)
|
|
|
|
return AckChange, []byte("ok"), nil
|
|
}
|
|
}
|
|
go p.AddRemote(host)
|
|
return AckChange, []byte("ok"), nil
|
|
}
|
|
|
|
func (p *SyncedPool) RemoveRemoteGrain(id CartId) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
delete(p.remoteIndex, id)
|
|
}
|
|
|
|
func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
|
|
if id.String() == "" {
|
|
log.Printf("Invalid grain id, %s\n", id)
|
|
return
|
|
}
|
|
if p.local.grains[id] != nil {
|
|
log.Printf("Grain %s already exists locally, owner is (%s)\n", id, host)
|
|
p.mu.Lock()
|
|
delete(p.local.grains, id)
|
|
p.mu.Unlock()
|
|
}
|
|
|
|
remote, err := NewRemoteGrain(id, host)
|
|
if err != nil {
|
|
log.Printf("Error creating remote grain %v\n", err)
|
|
return
|
|
}
|
|
go func() {
|
|
<-remote.PersistentConnection.Died
|
|
p.RemoveRemoteGrain(id)
|
|
p.HandleHostError(host)
|
|
log.Printf("Remote grain %s died, host: %s\n", id.String(), host)
|
|
}()
|
|
|
|
p.mu.Lock()
|
|
p.remoteIndex[id] = remote
|
|
p.mu.Unlock()
|
|
}
|
|
|
|
func (p *SyncedPool) HandleHostError(host string) {
|
|
for _, r := range p.remotes {
|
|
if r.Host == host {
|
|
if !r.IsHealthy() {
|
|
p.RemoveHost(r)
|
|
} else {
|
|
r.ErrorCount++
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) {
|
|
listen := fmt.Sprintf("%s:1338", hostname)
|
|
|
|
server, err := Listen(listen)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log.Printf("Listening on %s", listen)
|
|
|
|
pool := &SyncedPool{
|
|
Server: server,
|
|
Hostname: hostname,
|
|
local: local,
|
|
|
|
remotes: make(map[string]*RemoteHost),
|
|
remoteIndex: make(map[CartId]*RemoteGrain),
|
|
}
|
|
|
|
server.HandleCall(Ping, pool.PongHandler)
|
|
server.HandleCall(GetCartIds, pool.GetCartIdHandler)
|
|
server.HandleCall(RemoteNegotiate, pool.NegotiateHandler)
|
|
server.HandleCall(RemoteGrainChanged, pool.GrainOwnerChangeHandler)
|
|
|
|
if discovery != nil {
|
|
go func() {
|
|
time.Sleep(time.Second * 5)
|
|
log.Printf("Starting discovery")
|
|
ch, err := discovery.Watch()
|
|
if err != nil {
|
|
log.Printf("Error discovering hosts: %v", err)
|
|
return
|
|
}
|
|
for chng := range ch {
|
|
if chng.Host == "" {
|
|
continue
|
|
}
|
|
known := pool.IsKnown(chng.Host)
|
|
if chng.Type != watch.Deleted && !known {
|
|
go func(h string) {
|
|
log.Printf("Discovered host %s, waiting for startup", h)
|
|
time.Sleep(time.Second)
|
|
pool.AddRemote(h)
|
|
}(chng.Host)
|
|
} else if chng.Type == watch.Deleted && known {
|
|
log.Printf("Host removed %s, removing from index", chng.Host)
|
|
for _, r := range pool.remotes {
|
|
if r.Host == chng.Host {
|
|
pool.RemoveHost(r)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
} else {
|
|
log.Printf("No discovery, waiting for remotes to connect")
|
|
}
|
|
|
|
return pool, nil
|
|
}
|
|
|
|
func (p *SyncedPool) IsHealthy() bool {
|
|
for _, r := range p.remotes {
|
|
if !r.IsHealthy() {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (p *SyncedPool) IsKnown(host string) bool {
|
|
for _, r := range p.remotes {
|
|
if r.Host == host {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return host == p.Hostname
|
|
}
|
|
|
|
func (p *SyncedPool) ExcludeKnown(hosts []string) []string {
|
|
ret := make([]string, 0, len(hosts))
|
|
for _, h := range hosts {
|
|
if !p.IsKnown(h) {
|
|
ret = append(ret, h)
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
func (p *SyncedPool) RemoveHost(host *RemoteHost) {
|
|
if p.remotes[host.Host] == nil {
|
|
return
|
|
}
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
h := p.remotes[host.Host]
|
|
h.Close()
|
|
delete(p.remotes, host.Host)
|
|
|
|
connectedRemotes.Set(float64(len(p.remotes)))
|
|
}
|
|
|
|
func (p *SyncedPool) RemoveHostMappedCarts(host *RemoteHost) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
for id, r := range p.remoteIndex {
|
|
if r.Host == host.Host {
|
|
p.remoteIndex[id].Close()
|
|
delete(p.remoteIndex, id)
|
|
}
|
|
}
|
|
}
|
|
|
|
type PoolMessage uint32
|
|
|
|
const (
|
|
RemoteNegotiate = PoolMessage(3)
|
|
RemoteGrainChanged = PoolMessage(4)
|
|
AckChange = PoolMessage(5)
|
|
//AckError = PoolMessage(6)
|
|
Ping = PoolMessage(7)
|
|
Pong = PoolMessage(8)
|
|
GetCartIds = PoolMessage(9)
|
|
CartIdsResponse = PoolMessage(10)
|
|
RemoteNegotiateResponse = PoolMessage(11)
|
|
)
|
|
|
|
func (p *SyncedPool) Negotiate() {
|
|
knownHosts := make([]string, 0, len(p.remotes)+1)
|
|
for _, r := range p.remotes {
|
|
knownHosts = append(knownHosts, r.Host)
|
|
}
|
|
knownHosts = append([]string{p.Hostname}, knownHosts...)
|
|
|
|
for _, r := range p.remotes {
|
|
hosts, err := r.Negotiate(knownHosts)
|
|
if err != nil {
|
|
log.Printf("Error negotiating with %s: %v\n", r.Host, err)
|
|
return
|
|
}
|
|
for _, h := range hosts {
|
|
if !p.IsKnown(h) {
|
|
p.AddRemote(h)
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func (p *SyncedPool) GetHealthyRemotes() []*RemoteHost {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
remotes := make([]*RemoteHost, 0, len(p.remotes))
|
|
for _, r := range p.remotes {
|
|
if r.IsHealthy() {
|
|
remotes = append(remotes, r)
|
|
}
|
|
}
|
|
return remotes
|
|
}
|
|
|
|
func (p *SyncedPool) RequestOwnership(id CartId) error {
|
|
ok := 0
|
|
all := 0
|
|
|
|
for _, r := range p.GetHealthyRemotes() {
|
|
if !r.IsHealthy() {
|
|
continue
|
|
}
|
|
log.Printf("Asking for confirmation change of %s to %s (me) with %s\n", id, p.Hostname, r.Host)
|
|
err := r.ConfirmChange(id, p.Hostname)
|
|
all++
|
|
if err != nil {
|
|
if !r.IsHealthy() {
|
|
log.Printf("Ownership: Removing host, unable to communicate with %s", r.Host)
|
|
p.RemoveHost(r)
|
|
all--
|
|
} else {
|
|
log.Printf("Error confirming change: %v from %s\n", err, p.Hostname)
|
|
}
|
|
continue
|
|
}
|
|
ok++
|
|
}
|
|
|
|
if ok == 0 && all > 0 {
|
|
p.removeLocalGrain(id)
|
|
return fmt.Errorf("no remotes confirmed change")
|
|
}
|
|
if (all < 3 && ok < all) || ok < (all/2) {
|
|
p.removeLocalGrain(id)
|
|
return fmt.Errorf("quorum not reached")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *SyncedPool) removeLocalGrain(id CartId) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
delete(p.local.grains, id)
|
|
}
|
|
|
|
func (p *SyncedPool) AddRemote(host string) error {
|
|
|
|
_, hasHost := p.remotes[host]
|
|
if host == "" || p.IsKnown(host) || hasHost {
|
|
return nil
|
|
}
|
|
client, err := Dial(fmt.Sprintf("%s:1338", host))
|
|
if err != nil {
|
|
log.Printf("Error connecting to remote %s: %v\n", host, err)
|
|
return err
|
|
}
|
|
|
|
remote := RemoteHost{
|
|
Client: client,
|
|
MissedPings: 0,
|
|
Host: host,
|
|
}
|
|
p.mu.Lock()
|
|
p.remotes[host] = &remote
|
|
p.mu.Unlock()
|
|
go func() {
|
|
<-remote.PersistentConnection.Died
|
|
log.Printf("Removing host, remote died %s", host)
|
|
p.RemoveHost(&remote)
|
|
}()
|
|
go func() {
|
|
for range time.Tick(time.Second * 3) {
|
|
err := remote.Ping()
|
|
|
|
for err != nil {
|
|
time.Sleep(time.Millisecond * 200)
|
|
if !remote.IsHealthy() {
|
|
log.Printf("Removing host, unable to communicate with %s", host)
|
|
p.RemoveHost(&remote)
|
|
return
|
|
}
|
|
err = remote.Ping()
|
|
}
|
|
}
|
|
}()
|
|
|
|
connectedRemotes.Set(float64(len(p.remotes)))
|
|
log.Printf("Added remote %s\n", remote.Host)
|
|
|
|
go remote.Initialize(p)
|
|
return nil
|
|
}
|
|
|
|
func (p *SyncedPool) getGrain(id CartId) (Grain, error) {
|
|
var err error
|
|
localGrain, ok := p.local.grains[id]
|
|
if !ok {
|
|
// check if remote grain exists
|
|
p.mu.RLock()
|
|
remoteGrain, ok := p.remoteIndex[id]
|
|
p.mu.RUnlock()
|
|
if ok {
|
|
remoteLookupCount.Inc()
|
|
return remoteGrain, nil
|
|
}
|
|
|
|
go p.RequestOwnership(id)
|
|
// if err != nil {
|
|
// log.Printf("Error requesting ownership: %v\n", err)
|
|
// return nil, err
|
|
// }
|
|
|
|
localGrain, err = p.local.GetGrain(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
}
|
|
return localGrain, nil
|
|
}
|
|
|
|
func (p *SyncedPool) Process(id CartId, messages ...Message) (*CallResult, error) {
|
|
pool, err := p.getGrain(id)
|
|
var res *CallResult
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, m := range messages {
|
|
res, err = pool.HandleMessage(&m, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (p *SyncedPool) Get(id CartId) (*CallResult, error) {
|
|
grain, err := p.getGrain(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return grain.GetCurrentState()
|
|
}
|