complete rewrite to grpc
This commit is contained in:
716
synced-pool.go
716
synced-pool.go
@@ -1,36 +1,59 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
proto "git.tornberg.me/go-cart-actor/proto"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/yudhasubki/netpool"
|
||||
"google.golang.org/grpc"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
type Quorum interface {
|
||||
Negotiate(knownHosts []string) ([]string, error)
|
||||
OwnerChanged(CartId, host string) error
|
||||
}
|
||||
|
||||
type HealthHandler interface {
|
||||
IsHealthy() bool
|
||||
}
|
||||
|
||||
// SyncedPool coordinates cart grain ownership across nodes using gRPC control plane
|
||||
// and cart actor services. Legacy frame / TCP code has been removed.
|
||||
//
|
||||
// Responsibilities:
|
||||
// - Local grain access (delegates to GrainLocalPool)
|
||||
// - Remote grain proxy management (RemoteGrainGRPC)
|
||||
// - Cluster membership (AddRemote via discovery + negotiation)
|
||||
// - Ownership acquisition (quorum via ConfirmOwner RPC)
|
||||
// - Health/ping monitoring & remote removal
|
||||
//
|
||||
// Thread-safety: public methods that mutate internal maps lock p.mu (RWMutex).
|
||||
type SyncedPool struct {
|
||||
Server *GenericListener
|
||||
mu sync.RWMutex
|
||||
Hostname string
|
||||
local *GrainLocalPool
|
||||
|
||||
mu sync.RWMutex
|
||||
|
||||
// Remote host state (gRPC only)
|
||||
remoteHosts map[string]*RemoteHostGRPC // host -> remote host
|
||||
|
||||
// Remote grain proxies (by cart id)
|
||||
remoteIndex map[CartId]Grain
|
||||
|
||||
// Discovery handler for re-adding hosts after failures
|
||||
discardedHostHandler *DiscardedHostHandler
|
||||
Hostname string
|
||||
local *GrainLocalPool
|
||||
remotes map[string]*RemoteHost
|
||||
remoteIndex map[CartId]*RemoteGrain
|
||||
|
||||
// Metrics / instrumentation dependencies already declared globally
|
||||
}
|
||||
|
||||
// RemoteHostGRPC tracks a remote host's clients & health.
|
||||
type RemoteHostGRPC struct {
|
||||
Host string
|
||||
Conn *grpc.ClientConn
|
||||
CartClient proto.CartActorClient
|
||||
ControlClient proto.ControlPlaneClient
|
||||
MissedPings int
|
||||
}
|
||||
|
||||
func (r *RemoteHostGRPC) IsHealthy() bool {
|
||||
return r.MissedPings < 3
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -50,241 +73,172 @@ var (
|
||||
Name: "cart_remote_lookup_total",
|
||||
Help: "The total number of remote lookups",
|
||||
})
|
||||
packetQueue = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "cart_packet_queue_size",
|
||||
Help: "The total number of packets in the queue",
|
||||
})
|
||||
packetsSent = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "cart_pool_packets_sent_total",
|
||||
Help: "The total number of packets sent",
|
||||
})
|
||||
packetsReceived = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "cart_pool_packets_received_total",
|
||||
Help: "The total number of packets received",
|
||||
})
|
||||
)
|
||||
|
||||
func (p *SyncedPool) PongHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
|
||||
resultChan <- MakeFrameWithPayload(Pong, 200, []byte{})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncedPool) GetCartIdHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
|
||||
ids := make([]string, 0, len(p.local.grains))
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
for id := range p.local.grains {
|
||||
if p.local.grains[id] == nil {
|
||||
continue
|
||||
}
|
||||
s := id.String()
|
||||
if s == "" {
|
||||
continue
|
||||
}
|
||||
ids = append(ids, s)
|
||||
}
|
||||
log.Printf("Returning %d cart ids\n", len(ids))
|
||||
resultChan <- MakeFrameWithPayload(CartIdsResponse, 200, []byte(strings.Join(ids, ";")))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncedPool) NegotiateHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
|
||||
negotiationCount.Inc()
|
||||
log.Printf("Handling negotiation\n")
|
||||
for _, host := range p.ExcludeKnown(strings.Split(string(data.Payload), ";")) {
|
||||
if host == "" {
|
||||
continue
|
||||
}
|
||||
go p.AddRemote(host)
|
||||
|
||||
}
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
hosts := make([]string, 0, len(p.remotes))
|
||||
for _, r := range p.remotes {
|
||||
if r.IsHealthy() {
|
||||
hosts = append(hosts, r.Host)
|
||||
}
|
||||
}
|
||||
resultChan <- MakeFrameWithPayload(RemoteNegotiateResponse, 200, []byte(strings.Join(hosts, ";")))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncedPool) GrainOwnerChangeHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
|
||||
grainSyncCount.Inc()
|
||||
|
||||
idAndHostParts := strings.Split(string(data.Payload), ";")
|
||||
if len(idAndHostParts) != 2 {
|
||||
log.Printf("Invalid remote grain change message")
|
||||
resultChan <- MakeFrameWithPayload(AckError, 500, []byte("invalid"))
|
||||
return nil
|
||||
}
|
||||
id := ToCartId(idAndHostParts[0])
|
||||
host := idAndHostParts[1]
|
||||
log.Printf("Handling remote grain owner change to %s for id %s", host, id)
|
||||
for _, r := range p.remotes {
|
||||
if r.Host == host && r.IsHealthy() {
|
||||
go p.SpawnRemoteGrain(id, host)
|
||||
break
|
||||
}
|
||||
}
|
||||
go p.AddRemote(host)
|
||||
resultChan <- MakeFrameWithPayload(AckChange, 200, []byte("ok"))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncedPool) RemoveRemoteGrain(id CartId) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
delete(p.remoteIndex, id)
|
||||
}
|
||||
|
||||
func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
|
||||
if id.String() == "" {
|
||||
log.Printf("Invalid grain id, %s", id)
|
||||
return
|
||||
}
|
||||
p.mu.RLock()
|
||||
localGrain, ok := p.local.grains[id]
|
||||
p.mu.RUnlock()
|
||||
|
||||
if ok && localGrain != nil {
|
||||
log.Printf("Grain %s already exists locally, owner is (%s)", id, host)
|
||||
p.mu.Lock()
|
||||
delete(p.local.grains, id)
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
go func(i CartId, h string) {
|
||||
var pool netpool.Netpooler
|
||||
p.mu.RLock()
|
||||
for _, r := range p.remotes {
|
||||
if r.Host == h {
|
||||
pool = r.HostPool
|
||||
break
|
||||
}
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
if pool == nil {
|
||||
log.Printf("Error spawning remote grain, no pool for %s", h)
|
||||
return
|
||||
}
|
||||
remoteGrain := NewRemoteGrain(i, h, pool)
|
||||
|
||||
p.mu.Lock()
|
||||
p.remoteIndex[i] = remoteGrain
|
||||
p.mu.Unlock()
|
||||
}(id, host)
|
||||
}
|
||||
|
||||
func (p *SyncedPool) HandleHostError(host string) {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
for _, r := range p.remotes {
|
||||
if r.Host == host {
|
||||
if !r.IsHealthy() {
|
||||
go p.RemoveHost(r)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) {
|
||||
listen := fmt.Sprintf("%s:1338", hostname)
|
||||
conn := NewConnection(listen, nil)
|
||||
server, err := conn.Listen()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Printf("Listening on %s", listen)
|
||||
dh := NewDiscardedHostHandler(1338)
|
||||
pool := &SyncedPool{
|
||||
Server: server,
|
||||
p := &SyncedPool{
|
||||
Hostname: hostname,
|
||||
local: local,
|
||||
discardedHostHandler: dh,
|
||||
remotes: make(map[string]*RemoteHost),
|
||||
remoteIndex: make(map[CartId]*RemoteGrain),
|
||||
remoteHosts: make(map[string]*RemoteHostGRPC),
|
||||
remoteIndex: make(map[CartId]Grain),
|
||||
discardedHostHandler: NewDiscardedHostHandler(1338),
|
||||
}
|
||||
dh.SetReconnectHandler(pool.AddRemote)
|
||||
server.AddHandler(Ping, pool.PongHandler)
|
||||
server.AddHandler(GetCartIds, pool.GetCartIdHandler)
|
||||
server.AddHandler(RemoteNegotiate, pool.NegotiateHandler)
|
||||
server.AddHandler(RemoteGrainChanged, pool.GrainOwnerChangeHandler)
|
||||
server.AddHandler(Closing, pool.HostTerminatingHandler)
|
||||
p.discardedHostHandler.SetReconnectHandler(p.AddRemote)
|
||||
|
||||
if discovery != nil {
|
||||
go func() {
|
||||
time.Sleep(time.Second * 5)
|
||||
log.Printf("Starting discovery")
|
||||
time.Sleep(3 * time.Second) // allow gRPC server startup
|
||||
log.Printf("Starting discovery watcher")
|
||||
ch, err := discovery.Watch()
|
||||
if err != nil {
|
||||
log.Printf("Error discovering hosts: %v", err)
|
||||
log.Printf("Discovery error: %v", err)
|
||||
return
|
||||
}
|
||||
for chng := range ch {
|
||||
if chng.Host == "" {
|
||||
for evt := range ch {
|
||||
if evt.Host == "" {
|
||||
continue
|
||||
}
|
||||
known := pool.IsKnown(chng.Host)
|
||||
if chng.Type != watch.Deleted && !known {
|
||||
|
||||
log.Printf("Discovered host %s, waiting for startup", chng.Host)
|
||||
time.Sleep(3 * time.Second)
|
||||
pool.AddRemote(chng.Host)
|
||||
|
||||
} else if chng.Type == watch.Deleted && known {
|
||||
log.Printf("Host removed %s, removing from index", chng.Host)
|
||||
for _, r := range pool.remotes {
|
||||
if r.Host == chng.Host {
|
||||
pool.RemoveHost(r)
|
||||
break
|
||||
}
|
||||
switch evt.Type {
|
||||
case watch.Deleted:
|
||||
if p.IsKnown(evt.Host) {
|
||||
p.RemoveHost(evt.Host)
|
||||
}
|
||||
default:
|
||||
if !p.IsKnown(evt.Host) {
|
||||
log.Printf("Discovered host %s", evt.Host)
|
||||
p.AddRemote(evt.Host)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
log.Printf("No discovery, waiting for remotes to connect")
|
||||
log.Printf("No discovery configured; expecting manual AddRemote or static host injection")
|
||||
}
|
||||
|
||||
return pool, nil
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *SyncedPool) HostTerminatingHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
|
||||
log.Printf("Remote host terminating")
|
||||
host := string(data.Payload)
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
for _, r := range p.remotes {
|
||||
if r.Host == host {
|
||||
go p.RemoveHost(r)
|
||||
// ------------------------- Remote Host Management -----------------------------
|
||||
|
||||
// AddRemote dials a remote host and initializes grain proxies.
|
||||
func (p *SyncedPool) AddRemote(host string) {
|
||||
if host == "" || host == p.Hostname {
|
||||
return
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
if _, exists := p.remoteHosts[host]; exists {
|
||||
p.mu.Unlock()
|
||||
return
|
||||
}
|
||||
p.mu.Unlock()
|
||||
|
||||
target := fmt.Sprintf("%s:1337", host)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
conn, err := grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithBlock())
|
||||
if err != nil {
|
||||
log.Printf("AddRemote: dial %s failed: %v", target, err)
|
||||
return
|
||||
}
|
||||
|
||||
cartClient := proto.NewCartActorClient(conn)
|
||||
controlClient := proto.NewControlPlaneClient(conn)
|
||||
|
||||
// Health check (Ping) with limited retries
|
||||
pings := 3
|
||||
for pings > 0 {
|
||||
ctxPing, cancelPing := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
_, pingErr := controlClient.Ping(ctxPing, &proto.Empty{})
|
||||
cancelPing()
|
||||
if pingErr == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
resultChan <- MakeFrameWithPayload(Pong, 200, []byte("ok"))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncedPool) IsHealthy() bool {
|
||||
for _, r := range p.remotes {
|
||||
if !r.IsHealthy() {
|
||||
return false
|
||||
pings--
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if pings == 0 {
|
||||
log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr)
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
return true
|
||||
|
||||
remote := &RemoteHostGRPC{
|
||||
Host: host,
|
||||
Conn: conn,
|
||||
CartClient: cartClient,
|
||||
ControlClient: controlClient,
|
||||
MissedPings: 0,
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
p.remoteHosts[host] = remote
|
||||
p.mu.Unlock()
|
||||
connectedRemotes.Set(float64(p.RemoteCount()))
|
||||
|
||||
log.Printf("Connected to remote host %s", host)
|
||||
|
||||
go p.pingLoop(remote)
|
||||
go p.initializeRemote(remote)
|
||||
go p.Negotiate()
|
||||
}
|
||||
|
||||
// initializeRemote fetches remote cart ids and sets up remote grain proxies.
|
||||
func (p *SyncedPool) initializeRemote(remote *RemoteHostGRPC) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
reply, err := remote.ControlClient.GetCartIds(ctx, &proto.Empty{})
|
||||
if err != nil {
|
||||
log.Printf("Init remote %s: GetCartIds error: %v", remote.Host, err)
|
||||
return
|
||||
}
|
||||
count := 0
|
||||
for _, idStr := range reply.CartIds {
|
||||
if idStr == "" {
|
||||
continue
|
||||
}
|
||||
p.SpawnRemoteGrain(ToCartId(idStr), remote.Host)
|
||||
count++
|
||||
}
|
||||
log.Printf("Remote %s reported %d grains", remote.Host, count)
|
||||
}
|
||||
|
||||
// RemoveHost removes remote host and its grains.
|
||||
func (p *SyncedPool) RemoveHost(host string) {
|
||||
p.mu.Lock()
|
||||
remote, exists := p.remoteHosts[host]
|
||||
if exists {
|
||||
delete(p.remoteHosts, host)
|
||||
}
|
||||
// remove grains pointing to host
|
||||
for id, g := range p.remoteIndex {
|
||||
if rg, ok := g.(*RemoteGrainGRPC); ok && rg.Host == host {
|
||||
delete(p.remoteIndex, id)
|
||||
}
|
||||
}
|
||||
p.mu.Unlock()
|
||||
|
||||
if exists {
|
||||
remote.Conn.Close()
|
||||
}
|
||||
connectedRemotes.Set(float64(p.RemoteCount()))
|
||||
}
|
||||
|
||||
// RemoteCount returns number of tracked remote hosts.
|
||||
func (p *SyncedPool) RemoteCount() int {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return len(p.remoteHosts)
|
||||
}
|
||||
|
||||
func (p *SyncedPool) IsKnown(host string) bool {
|
||||
for _, r := range p.remotes {
|
||||
if r.Host == host {
|
||||
return true
|
||||
}
|
||||
if host == p.Hostname {
|
||||
return true
|
||||
}
|
||||
|
||||
return host == p.Hostname
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
_, ok := p.remoteHosts[host]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (p *SyncedPool) ExcludeKnown(hosts []string) []string {
|
||||
@@ -297,227 +251,190 @@ func (p *SyncedPool) ExcludeKnown(hosts []string) []string {
|
||||
return ret
|
||||
}
|
||||
|
||||
func (p *SyncedPool) RemoveHost(host *RemoteHost) {
|
||||
p.mu.Lock()
|
||||
delete(p.remotes, host.Host)
|
||||
p.mu.Unlock()
|
||||
p.RemoveHostMappedCarts(host)
|
||||
p.discardedHostHandler.AppendHost(host.Host)
|
||||
connectedRemotes.Set(float64(len(p.remotes)))
|
||||
}
|
||||
// ------------------------- Health / Ping -------------------------------------
|
||||
|
||||
func (p *SyncedPool) RemoveHostMappedCarts(host *RemoteHost) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
for id, r := range p.remoteIndex {
|
||||
if r.Host == host.Host {
|
||||
delete(p.remoteIndex, id)
|
||||
func (p *SyncedPool) pingLoop(remote *RemoteHostGRPC) {
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
_, err := remote.ControlClient.Ping(ctx, &proto.Empty{})
|
||||
cancel()
|
||||
if err != nil {
|
||||
remote.MissedPings++
|
||||
log.Printf("Ping %s failed (%d)", remote.Host, remote.MissedPings)
|
||||
if !remote.IsHealthy() {
|
||||
log.Printf("Remote %s unhealthy, removing", remote.Host)
|
||||
p.RemoveHost(remote.Host)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
remote.MissedPings = 0
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
RemoteNegotiate = FrameType(3)
|
||||
RemoteGrainChanged = FrameType(4)
|
||||
AckChange = FrameType(5)
|
||||
AckError = FrameType(6)
|
||||
Ping = FrameType(7)
|
||||
Pong = FrameType(8)
|
||||
GetCartIds = FrameType(9)
|
||||
CartIdsResponse = FrameType(10)
|
||||
RemoteNegotiateResponse = FrameType(11)
|
||||
Closing = FrameType(12)
|
||||
)
|
||||
func (p *SyncedPool) IsHealthy() bool {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
for _, r := range p.remoteHosts {
|
||||
if !r.IsHealthy() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// ------------------------- Negotiation ---------------------------------------
|
||||
|
||||
func (p *SyncedPool) Negotiate() {
|
||||
knownHosts := make([]string, 0, len(p.remotes)+1)
|
||||
for _, r := range p.remotes {
|
||||
knownHosts = append(knownHosts, r.Host)
|
||||
}
|
||||
knownHosts = append([]string{p.Hostname}, knownHosts...)
|
||||
negotiationCount.Inc()
|
||||
|
||||
for _, r := range p.remotes {
|
||||
hosts, err := r.Negotiate(knownHosts)
|
||||
p.mu.RLock()
|
||||
hosts := make([]string, 0, len(p.remoteHosts)+1)
|
||||
hosts = append(hosts, p.Hostname)
|
||||
for h := range p.remoteHosts {
|
||||
hosts = append(hosts, h)
|
||||
}
|
||||
remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
|
||||
for _, r := range p.remoteHosts {
|
||||
remotes = append(remotes, r)
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
|
||||
for _, r := range remotes {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
reply, err := r.ControlClient.Negotiate(ctx, &proto.NegotiateRequest{KnownHosts: hosts})
|
||||
cancel()
|
||||
if err != nil {
|
||||
log.Printf("Error negotiating with %s: %v\n", r.Host, err)
|
||||
return
|
||||
log.Printf("Negotiate with %s failed: %v", r.Host, err)
|
||||
continue
|
||||
}
|
||||
for _, h := range hosts {
|
||||
for _, h := range reply.Hosts {
|
||||
if !p.IsKnown(h) {
|
||||
p.AddRemote(h)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (p *SyncedPool) GetHealthyRemotes() []*RemoteHost {
|
||||
// ------------------------- Grain Management ----------------------------------
|
||||
|
||||
// RemoveRemoteGrain removes a remote grain mapping.
|
||||
func (p *SyncedPool) RemoveRemoteGrain(id CartId) {
|
||||
p.mu.Lock()
|
||||
delete(p.remoteIndex, id)
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// SpawnRemoteGrain creates/updates a remote grain proxy for a given host.
|
||||
func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
|
||||
if id.String() == "" {
|
||||
return
|
||||
}
|
||||
p.mu.Lock()
|
||||
// If local grain exists, remove it (ownership changed)
|
||||
if g, ok := p.local.grains[id]; ok && g != nil {
|
||||
delete(p.local.grains, id)
|
||||
}
|
||||
remoteHost, ok := p.remoteHosts[host]
|
||||
if !ok {
|
||||
p.mu.Unlock()
|
||||
log.Printf("SpawnRemoteGrain: host %s unknown (id=%s), attempting AddRemote", host, id)
|
||||
go p.AddRemote(host)
|
||||
return
|
||||
}
|
||||
rg := NewRemoteGrainGRPC(id, host, remoteHost.CartClient)
|
||||
p.remoteIndex[id] = rg
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// GetHealthyRemotes returns a copy slice of healthy remote hosts.
|
||||
func (p *SyncedPool) GetHealthyRemotes() []*RemoteHostGRPC {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
remotes := make([]*RemoteHost, 0, len(p.remotes))
|
||||
for _, r := range p.remotes {
|
||||
ret := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
|
||||
for _, r := range p.remoteHosts {
|
||||
if r.IsHealthy() {
|
||||
remotes = append(remotes, r)
|
||||
ret = append(ret, r)
|
||||
}
|
||||
}
|
||||
return remotes
|
||||
return ret
|
||||
}
|
||||
|
||||
// RequestOwnership attempts to become owner of a cart, requiring quorum.
|
||||
// On success local grain is (or will be) created; peers spawn remote proxies.
|
||||
func (p *SyncedPool) RequestOwnership(id CartId) error {
|
||||
ok := 0
|
||||
all := 0
|
||||
|
||||
for _, r := range p.GetHealthyRemotes() {
|
||||
|
||||
err := r.ConfirmChange(id, p.Hostname)
|
||||
remotes := p.GetHealthyRemotes()
|
||||
for _, r := range remotes {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
|
||||
reply, err := r.ControlClient.ConfirmOwner(ctx, &proto.OwnerChangeRequest{
|
||||
CartId: id.String(),
|
||||
NewHost: p.Hostname,
|
||||
})
|
||||
cancel()
|
||||
all++
|
||||
if err != nil {
|
||||
if !r.IsHealthy() {
|
||||
log.Printf("Ownership: Removing host, unable to communicate with %s", r.Host)
|
||||
p.RemoveHost(r)
|
||||
all--
|
||||
} else {
|
||||
log.Printf("Error confirming change: %v from %s\n", err, p.Hostname)
|
||||
}
|
||||
if err != nil || reply == nil || !reply.Accepted {
|
||||
log.Printf("ConfirmOwner failure from %s for %s: %v (reply=%v)", r.Host, id, err, reply)
|
||||
continue
|
||||
}
|
||||
//log.Printf("Remote confirmed change %s\n", r.Host)
|
||||
ok++
|
||||
}
|
||||
|
||||
// Quorum rule mirrors legacy:
|
||||
// - If fewer than 3 total, require all.
|
||||
// - Else require majority (ok >= all/2).
|
||||
if (all < 3 && ok < all) || ok < (all/2) {
|
||||
p.removeLocalGrain(id)
|
||||
return fmt.Errorf("quorum not reached")
|
||||
return fmt.Errorf("quorum not reached (ok=%d all=%d)", ok, all)
|
||||
}
|
||||
grainSyncCount.Inc()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncedPool) removeLocalGrain(id CartId) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
delete(p.local.grains, id)
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
func (p *SyncedPool) AddRemote(host string) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
_, hasHost := p.remotes[host]
|
||||
if host == "" || hasHost || host == p.Hostname {
|
||||
return
|
||||
}
|
||||
|
||||
host_pool, err := netpool.New(func() (net.Conn, error) {
|
||||
return net.Dial("tcp", fmt.Sprintf("%s:1338", host))
|
||||
}, netpool.WithMaxPool(128), netpool.WithMinPool(0))
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Error creating host pool: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
client := NewConnection(fmt.Sprintf("%s:1338", host), host_pool)
|
||||
|
||||
pings := 3
|
||||
for pings >= 0 {
|
||||
_, err = client.Call(Ping, nil)
|
||||
if err != nil {
|
||||
log.Printf("Ping failed when adding %s, trying %d more times\n", host, pings)
|
||||
pings--
|
||||
time.Sleep(time.Millisecond * 300)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
log.Printf("Connected to remote %s", host)
|
||||
|
||||
cart_pool, err := netpool.New(func() (net.Conn, error) {
|
||||
return net.Dial("tcp", fmt.Sprintf("%s:1337", host))
|
||||
}, netpool.WithMaxPool(128), netpool.WithMinPool(0))
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Error creating grain pool: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
remote := RemoteHost{
|
||||
HostPool: cart_pool,
|
||||
Connection: client,
|
||||
MissedPings: 0,
|
||||
Host: host,
|
||||
}
|
||||
|
||||
p.remotes[host] = &remote
|
||||
|
||||
connectedRemotes.Set(float64(len(p.remotes)))
|
||||
|
||||
go p.HandlePing(&remote)
|
||||
go remote.Initialize(p)
|
||||
}
|
||||
|
||||
func (p *SyncedPool) HandlePing(remote *RemoteHost) {
|
||||
for range time.Tick(time.Second * 3) {
|
||||
|
||||
err := remote.Ping()
|
||||
|
||||
for err != nil {
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
if !remote.IsHealthy() {
|
||||
log.Printf("Removing host, unable to communicate with %s", remote.Host)
|
||||
p.RemoveHost(remote)
|
||||
return
|
||||
}
|
||||
err = remote.Ping()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getGrain returns a local or remote grain; if absent, attempts ownership.
|
||||
func (p *SyncedPool) getGrain(id CartId) (Grain, error) {
|
||||
var err error
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
localGrain, ok := p.local.grains[id]
|
||||
if !ok {
|
||||
// check if remote grain exists
|
||||
|
||||
remoteGrain, ok := p.remoteIndex[id]
|
||||
|
||||
if ok {
|
||||
remoteLookupCount.Inc()
|
||||
return remoteGrain, nil
|
||||
}
|
||||
|
||||
go p.RequestOwnership(id)
|
||||
// if err != nil {
|
||||
// log.Printf("Error requesting ownership: %v\n", err)
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
localGrain, err = p.local.GetGrain(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
localGrain, isLocal := p.local.grains[id]
|
||||
remoteGrain, isRemote := p.remoteIndex[id]
|
||||
p.mu.RUnlock()
|
||||
|
||||
if isLocal && localGrain != nil {
|
||||
return localGrain, nil
|
||||
}
|
||||
return localGrain, nil
|
||||
}
|
||||
|
||||
func (p *SyncedPool) Close() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
payload := []byte(p.Hostname)
|
||||
for _, r := range p.remotes {
|
||||
go r.Call(Closing, payload)
|
||||
if isRemote {
|
||||
remoteLookupCount.Inc()
|
||||
return remoteGrain, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (p *SyncedPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) {
|
||||
pool, err := p.getGrain(id)
|
||||
var res *FrameWithPayload
|
||||
// Attempt to claim ownership (async semantics preserved)
|
||||
go p.RequestOwnership(id)
|
||||
|
||||
// Create local grain (lazy spawn) - may be rolled back by quorum failure
|
||||
grain, err := p.local.GetGrain(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return grain, nil
|
||||
}
|
||||
|
||||
// Process applies mutation(s) to a grain (local or remote).
|
||||
func (p *SyncedPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) {
|
||||
grain, err := p.getGrain(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var res *FrameWithPayload
|
||||
for _, m := range messages {
|
||||
res, err = pool.HandleMessage(&m, false)
|
||||
res, err = grain.HandleMessage(&m, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -525,11 +442,32 @@ func (p *SyncedPool) Process(id CartId, messages ...Message) (*FrameWithPayload,
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// Get returns current state of a grain (local or remote).
|
||||
func (p *SyncedPool) Get(id CartId) (*FrameWithPayload, error) {
|
||||
grain, err := p.getGrain(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return grain.GetCurrentState()
|
||||
}
|
||||
|
||||
// Close notifies remotes this host is terminating.
|
||||
func (p *SyncedPool) Close() {
|
||||
p.mu.RLock()
|
||||
remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
|
||||
for _, r := range p.remoteHosts {
|
||||
remotes = append(remotes, r)
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
|
||||
for _, r := range remotes {
|
||||
go func(rh *RemoteHostGRPC) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
_, err := rh.ControlClient.Closing(ctx, &proto.ClosingNotice{Host: p.Hostname})
|
||||
cancel()
|
||||
if err != nil {
|
||||
log.Printf("Close notify to %s failed: %v", rh.Host, err)
|
||||
}
|
||||
}(r)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user