600 lines
16 KiB
Go
600 lines
16 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
proto "git.tornberg.me/go-cart-actor/proto"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"google.golang.org/grpc"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
)
|
|
|
|
// SyncedPool coordinates cart grain ownership across nodes using gRPC control plane
|
|
// and cart actor services.
|
|
//
|
|
// Responsibilities:
|
|
// - Local grain access (delegates to GrainLocalPool)
|
|
// - Remote grain proxy management (RemoteGrainGRPC)
|
|
// - Cluster membership (AddRemote via discovery + negotiation)
|
|
// - Health/ping monitoring & remote removal
|
|
// - Ring based deterministic ownership (no runtime negotiation)
|
|
// - (Scaffolding) replication factor awareness via ring.LookupN
|
|
//
|
|
// Thread-safety: public methods that mutate internal maps lock p.mu (RWMutex).
|
|
type SyncedPool struct {
|
|
Hostname string
|
|
local *GrainLocalPool
|
|
|
|
mu sync.RWMutex
|
|
|
|
// Remote host state (gRPC only)
|
|
remoteHosts map[string]*RemoteHostGRPC // host -> remote host
|
|
|
|
// Remote grain proxies (by cart id)
|
|
remoteIndex map[CartId]Grain
|
|
|
|
// Discovery handler for re-adding hosts after failures
|
|
discardedHostHandler *DiscardedHostHandler
|
|
|
|
// Consistent hashing ring (immutable snapshot reference)
|
|
ringRef *RingRef
|
|
|
|
// Configuration
|
|
vnodesPerHost int
|
|
replicationFactor int // RF (>=1). Currently only primary is active; replicas are scaffolding.
|
|
}
|
|
|
|
// RemoteHostGRPC tracks a remote host's clients & health.
|
|
type RemoteHostGRPC struct {
|
|
Host string
|
|
Conn *grpc.ClientConn
|
|
CartClient proto.CartActorClient
|
|
ControlClient proto.ControlPlaneClient
|
|
MissedPings int
|
|
}
|
|
|
|
func (r *RemoteHostGRPC) IsHealthy() bool {
|
|
return r.MissedPings < 3
|
|
}
|
|
|
|
var (
|
|
negotiationCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_remote_negotiation_total",
|
|
Help: "The total number of remote negotiations",
|
|
})
|
|
grainSyncCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_grain_sync_total",
|
|
Help: "The total number of grain owner changes",
|
|
})
|
|
connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_connected_remotes",
|
|
Help: "The number of connected remotes",
|
|
})
|
|
remoteLookupCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_remote_lookup_total",
|
|
Help: "The total number of remote lookups (legacy counter)",
|
|
})
|
|
|
|
// Ring / ownership metrics
|
|
ringEpoch = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_ring_epoch",
|
|
Help: "Current consistent hashing ring epoch (fingerprint-based pseudo-epoch)",
|
|
})
|
|
ringHosts = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_ring_hosts",
|
|
Help: "Number of hosts currently in the ring",
|
|
})
|
|
ringVnodes = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_ring_vnodes",
|
|
Help: "Number of virtual nodes in the ring",
|
|
})
|
|
ringLookupLocal = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_ring_lookup_local_total",
|
|
Help: "Ring ownership lookups resolved to the local host",
|
|
})
|
|
ringLookupRemote = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_ring_lookup_remote_total",
|
|
Help: "Ring ownership lookups resolved to a remote host",
|
|
})
|
|
ringHostShare = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cart_ring_host_share",
|
|
Help: "Fractional share of ring vnodes per host",
|
|
}, []string{"host"})
|
|
)
|
|
|
|
func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) {
|
|
p := &SyncedPool{
|
|
Hostname: hostname,
|
|
local: local,
|
|
remoteHosts: make(map[string]*RemoteHostGRPC),
|
|
remoteIndex: make(map[CartId]Grain),
|
|
discardedHostHandler: NewDiscardedHostHandler(1338),
|
|
vnodesPerHost: 64, // default smoothing factor; adjust if needed
|
|
replicationFactor: 1, // RF scaffold; >1 not yet activating replicas
|
|
}
|
|
p.discardedHostHandler.SetReconnectHandler(p.AddRemote)
|
|
// Initialize empty ring (will be rebuilt after first AddRemote or discovery event)
|
|
p.rebuildRing()
|
|
|
|
if discovery != nil {
|
|
go func() {
|
|
time.Sleep(3 * time.Second) // allow gRPC server startup
|
|
log.Printf("Starting discovery watcher")
|
|
ch, err := discovery.Watch()
|
|
if err != nil {
|
|
log.Printf("Discovery error: %v", err)
|
|
return
|
|
}
|
|
for evt := range ch {
|
|
if evt.Host == "" {
|
|
continue
|
|
}
|
|
switch evt.Type {
|
|
case watch.Deleted:
|
|
if p.IsKnown(evt.Host) {
|
|
p.RemoveHost(evt.Host)
|
|
}
|
|
default:
|
|
if !p.IsKnown(evt.Host) {
|
|
log.Printf("Discovered host %s", evt.Host)
|
|
p.AddRemote(evt.Host)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
} else {
|
|
log.Printf("No discovery configured; expecting manual AddRemote or static host injection")
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
// ------------------------- Remote Host Management -----------------------------
|
|
|
|
// AddRemote dials a remote host and initializes grain proxies.
|
|
func (p *SyncedPool) AddRemote(host string) {
|
|
if host == "" || host == p.Hostname {
|
|
return
|
|
}
|
|
|
|
p.mu.Lock()
|
|
if _, exists := p.remoteHosts[host]; exists {
|
|
p.mu.Unlock()
|
|
return
|
|
}
|
|
p.mu.Unlock()
|
|
|
|
target := fmt.Sprintf("%s:1337", host)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
conn, err := grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithBlock())
|
|
if err != nil {
|
|
log.Printf("AddRemote: dial %s failed: %v", target, err)
|
|
return
|
|
}
|
|
|
|
cartClient := proto.NewCartActorClient(conn)
|
|
controlClient := proto.NewControlPlaneClient(conn)
|
|
|
|
// Health check (Ping) with limited retries
|
|
pings := 3
|
|
for pings > 0 {
|
|
ctxPing, cancelPing := context.WithTimeout(context.Background(), 1*time.Second)
|
|
_, pingErr := controlClient.Ping(ctxPing, &proto.Empty{})
|
|
cancelPing()
|
|
if pingErr == nil {
|
|
break
|
|
}
|
|
pings--
|
|
time.Sleep(200 * time.Millisecond)
|
|
if pings == 0 {
|
|
log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr)
|
|
conn.Close()
|
|
return
|
|
}
|
|
}
|
|
|
|
remote := &RemoteHostGRPC{
|
|
Host: host,
|
|
Conn: conn,
|
|
CartClient: cartClient,
|
|
ControlClient: controlClient,
|
|
MissedPings: 0,
|
|
}
|
|
|
|
p.mu.Lock()
|
|
p.remoteHosts[host] = remote
|
|
p.mu.Unlock()
|
|
connectedRemotes.Set(float64(p.RemoteCount()))
|
|
// Rebuild consistent hashing ring including this new host
|
|
p.rebuildRing()
|
|
|
|
log.Printf("Connected to remote host %s", host)
|
|
|
|
go p.pingLoop(remote)
|
|
go p.initializeRemote(remote)
|
|
go p.Negotiate()
|
|
}
|
|
|
|
// initializeRemote fetches remote cart ids and sets up remote grain proxies.
|
|
func (p *SyncedPool) initializeRemote(remote *RemoteHostGRPC) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
reply, err := remote.ControlClient.GetCartIds(ctx, &proto.Empty{})
|
|
if err != nil {
|
|
log.Printf("Init remote %s: GetCartIds error: %v", remote.Host, err)
|
|
return
|
|
}
|
|
count := 0
|
|
for _, idStr := range reply.CartIds {
|
|
if idStr == "" {
|
|
continue
|
|
}
|
|
p.SpawnRemoteGrain(ToCartId(idStr), remote.Host)
|
|
count++
|
|
}
|
|
log.Printf("Remote %s reported %d grains", remote.Host, count)
|
|
}
|
|
|
|
// RemoveHost removes remote host and its grains.
|
|
func (p *SyncedPool) RemoveHost(host string) {
|
|
p.mu.Lock()
|
|
remote, exists := p.remoteHosts[host]
|
|
if exists {
|
|
delete(p.remoteHosts, host)
|
|
}
|
|
// remove grains pointing to host
|
|
for id, g := range p.remoteIndex {
|
|
if rg, ok := g.(*RemoteGrainGRPC); ok && rg.Host == host {
|
|
delete(p.remoteIndex, id)
|
|
}
|
|
}
|
|
p.mu.Unlock()
|
|
|
|
if exists {
|
|
remote.Conn.Close()
|
|
}
|
|
connectedRemotes.Set(float64(p.RemoteCount()))
|
|
// Rebuild ring after host removal
|
|
p.rebuildRing()
|
|
}
|
|
|
|
// RemoteCount returns number of tracked remote hosts.
|
|
func (p *SyncedPool) RemoteCount() int {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
return len(p.remoteHosts)
|
|
}
|
|
|
|
func (p *SyncedPool) IsKnown(host string) bool {
|
|
if host == p.Hostname {
|
|
return true
|
|
}
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
_, ok := p.remoteHosts[host]
|
|
return ok
|
|
}
|
|
|
|
func (p *SyncedPool) ExcludeKnown(hosts []string) []string {
|
|
ret := make([]string, 0, len(hosts))
|
|
for _, h := range hosts {
|
|
if !p.IsKnown(h) {
|
|
ret = append(ret, h)
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// ------------------------- Health / Ping -------------------------------------
|
|
|
|
func (p *SyncedPool) pingLoop(remote *RemoteHostGRPC) {
|
|
ticker := time.NewTicker(3 * time.Second)
|
|
defer ticker.Stop()
|
|
for range ticker.C {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
_, err := remote.ControlClient.Ping(ctx, &proto.Empty{})
|
|
cancel()
|
|
if err != nil {
|
|
remote.MissedPings++
|
|
log.Printf("Ping %s failed (%d)", remote.Host, remote.MissedPings)
|
|
if !remote.IsHealthy() {
|
|
log.Printf("Remote %s unhealthy, removing", remote.Host)
|
|
p.RemoveHost(remote.Host)
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
remote.MissedPings = 0
|
|
}
|
|
}
|
|
|
|
func (p *SyncedPool) IsHealthy() bool {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
for _, r := range p.remoteHosts {
|
|
if !r.IsHealthy() {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// ------------------------- Negotiation ---------------------------------------
|
|
|
|
func (p *SyncedPool) Negotiate() {
|
|
negotiationCount.Inc()
|
|
|
|
p.mu.RLock()
|
|
hosts := make([]string, 0, len(p.remoteHosts)+1)
|
|
hosts = append(hosts, p.Hostname)
|
|
for h := range p.remoteHosts {
|
|
hosts = append(hosts, h)
|
|
}
|
|
remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
|
|
for _, r := range p.remoteHosts {
|
|
remotes = append(remotes, r)
|
|
}
|
|
p.mu.RUnlock()
|
|
|
|
changed := false
|
|
|
|
for _, r := range remotes {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
reply, err := r.ControlClient.Negotiate(ctx, &proto.NegotiateRequest{KnownHosts: hosts})
|
|
cancel()
|
|
if err != nil {
|
|
log.Printf("Negotiate with %s failed: %v", r.Host, err)
|
|
continue
|
|
}
|
|
for _, h := range reply.Hosts {
|
|
if !p.IsKnown(h) {
|
|
p.AddRemote(h)
|
|
changed = true
|
|
}
|
|
}
|
|
}
|
|
|
|
// If new hosts were discovered during negotiation, rebuild the ring once at the end.
|
|
if changed {
|
|
p.rebuildRing()
|
|
}
|
|
}
|
|
|
|
// ------------------------- Grain / Ring Ownership ----------------------------
|
|
|
|
// RemoveRemoteGrain removes a remote grain mapping.
|
|
func (p *SyncedPool) RemoveRemoteGrain(id CartId) {
|
|
p.mu.Lock()
|
|
delete(p.remoteIndex, id)
|
|
p.mu.Unlock()
|
|
}
|
|
|
|
// SpawnRemoteGrain creates/updates a remote grain proxy for a given host.
|
|
func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
|
|
if id.String() == "" {
|
|
return
|
|
}
|
|
p.mu.Lock()
|
|
// If local grain exists (legacy key), remove from local map (ownership moved).
|
|
if g, ok := p.local.grains[LegacyToCartKey(id)]; ok && g != nil {
|
|
delete(p.local.grains, LegacyToCartKey(id))
|
|
}
|
|
remoteHost, ok := p.remoteHosts[host]
|
|
if !ok {
|
|
p.mu.Unlock()
|
|
log.Printf("SpawnRemoteGrain: host %s unknown (id=%s), attempting AddRemote", host, id)
|
|
go p.AddRemote(host)
|
|
return
|
|
}
|
|
rg := NewRemoteGrainGRPC(id, host, remoteHost.CartClient)
|
|
p.remoteIndex[id] = rg
|
|
p.mu.Unlock()
|
|
}
|
|
|
|
// GetHealthyRemotes returns a copy slice of healthy remote hosts.
|
|
func (p *SyncedPool) GetHealthyRemotes() []*RemoteHostGRPC {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
ret := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
|
|
for _, r := range p.remoteHosts {
|
|
if r.IsHealthy() {
|
|
ret = append(ret, r)
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// rebuildRing reconstructs the consistent hashing ring from current host set
|
|
// and updates ring-related metrics.
|
|
func (p *SyncedPool) rebuildRing() {
|
|
p.mu.RLock()
|
|
hosts := make([]string, 0, len(p.remoteHosts)+1)
|
|
hosts = append(hosts, p.Hostname)
|
|
for h := range p.remoteHosts {
|
|
hosts = append(hosts, h)
|
|
}
|
|
p.mu.RUnlock()
|
|
|
|
epochSeed := fingerprintHosts(hosts)
|
|
builder := NewRingBuilder().
|
|
WithHosts(hosts).
|
|
WithEpoch(epochSeed).
|
|
WithVnodesPerHost(p.vnodesPerHost)
|
|
r := builder.Build()
|
|
if p.ringRef == nil {
|
|
p.ringRef = NewRingRef(r)
|
|
} else {
|
|
p.ringRef.Set(r)
|
|
}
|
|
|
|
// Metrics
|
|
ringEpoch.Set(float64(r.Epoch))
|
|
ringHosts.Set(float64(len(r.Hosts())))
|
|
ringVnodes.Set(float64(len(r.Vnodes)))
|
|
ringHostShare.Reset()
|
|
if len(r.Vnodes) > 0 {
|
|
perHost := make(map[string]int)
|
|
for _, v := range r.Vnodes {
|
|
perHost[v.Host]++
|
|
}
|
|
total := float64(len(r.Vnodes))
|
|
for h, c := range perHost {
|
|
ringHostShare.WithLabelValues(h).Set(float64(c) / total)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ForceRingRefresh exposes a manual ring rebuild hook (primarily for tests).
|
|
func (p *SyncedPool) ForceRingRefresh() {
|
|
p.rebuildRing()
|
|
}
|
|
|
|
// ownersFor returns the ordered list of primary + replica owners for a cart id
|
|
// (length min(replicationFactor, #hosts)). Currently only the first (primary)
|
|
// is used. This scaffolds future replication work.
|
|
func (p *SyncedPool) ownersFor(id CartId) []string {
|
|
if p.ringRef == nil || p.replicationFactor <= 0 {
|
|
return []string{p.Hostname}
|
|
}
|
|
r := p.ringRef.Get()
|
|
if r == nil || r.Empty() {
|
|
return []string{p.Hostname}
|
|
}
|
|
vnodes := r.LookupN(hashKeyString(id.String()), p.replicationFactor)
|
|
out := make([]string, 0, len(vnodes))
|
|
seen := make(map[string]struct{}, len(vnodes))
|
|
for _, v := range vnodes {
|
|
if _, ok := seen[v.Host]; ok {
|
|
continue
|
|
}
|
|
seen[v.Host] = struct{}{}
|
|
out = append(out, v.Host)
|
|
}
|
|
if len(out) == 0 {
|
|
out = append(out, p.Hostname)
|
|
}
|
|
return out
|
|
}
|
|
|
|
// ownerHostFor returns the primary owner host for a given id.
|
|
func (p *SyncedPool) ownerHostFor(id CartId) string {
|
|
return p.ownersFor(id)[0]
|
|
}
|
|
|
|
// DebugOwnerHost exposes (for tests) the currently computed primary owner host.
|
|
func (p *SyncedPool) DebugOwnerHost(id CartId) string {
|
|
return p.ownerHostFor(id)
|
|
}
|
|
|
|
func (p *SyncedPool) removeLocalGrain(id CartId) {
|
|
p.mu.Lock()
|
|
delete(p.local.grains, LegacyToCartKey(id))
|
|
p.mu.Unlock()
|
|
}
|
|
|
|
// getGrain returns a local or remote grain. For remote ownership it performs a
|
|
// bounded readiness wait (small retries) to reduce first-call failures while
|
|
// the remote connection & proxy are initializing.
|
|
func (p *SyncedPool) getGrain(id CartId) (Grain, error) {
|
|
owner := p.ownerHostFor(id)
|
|
if owner == p.Hostname {
|
|
ringLookupLocal.Inc()
|
|
grain, err := p.local.GetGrain(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return grain, nil
|
|
}
|
|
ringLookupRemote.Inc()
|
|
|
|
// Kick off remote dial if we don't yet know the owner.
|
|
if !p.IsKnown(owner) {
|
|
go p.AddRemote(owner)
|
|
}
|
|
|
|
// Fast path existing proxy
|
|
p.mu.RLock()
|
|
if rg, ok := p.remoteIndex[id]; ok {
|
|
p.mu.RUnlock()
|
|
remoteLookupCount.Inc()
|
|
return rg, nil
|
|
}
|
|
p.mu.RUnlock()
|
|
|
|
const (
|
|
attempts = 5
|
|
sleepPerTry = 40 * time.Millisecond
|
|
)
|
|
|
|
for attempt := 0; attempt < attempts; attempt++ {
|
|
// Try to spawn (idempotent if host already known)
|
|
if p.IsKnown(owner) {
|
|
p.SpawnRemoteGrain(id, owner)
|
|
}
|
|
// Check again
|
|
p.mu.RLock()
|
|
if rg, ok := p.remoteIndex[id]; ok {
|
|
p.mu.RUnlock()
|
|
remoteLookupCount.Inc()
|
|
return rg, nil
|
|
}
|
|
p.mu.RUnlock()
|
|
|
|
// Last attempt? break to return error.
|
|
if attempt == attempts-1 {
|
|
break
|
|
}
|
|
time.Sleep(sleepPerTry)
|
|
}
|
|
|
|
return nil, fmt.Errorf("remote owner %s not yet available for cart %s (after %d attempts)", owner, id.String(), attempts)
|
|
}
|
|
|
|
// Apply applies a single mutation to a grain (local or remote).
|
|
// Replication (RF>1) scaffolding: future enhancement will fan-out mutations
|
|
// to replica owners (best-effort) and reconcile quorum on read.
|
|
func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) {
|
|
grain, err := p.getGrain(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return grain.Apply(mutation, false)
|
|
}
|
|
|
|
// Get returns current state of a grain (local or remote).
|
|
// Future replication hook: Read-repair or quorum read can be added here.
|
|
func (p *SyncedPool) Get(id CartId) (*CartGrain, error) {
|
|
grain, err := p.getGrain(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return grain.GetCurrentState()
|
|
}
|
|
|
|
// Close notifies remotes this host is terminating.
|
|
func (p *SyncedPool) Close() {
|
|
p.mu.RLock()
|
|
remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
|
|
for _, r := range p.remoteHosts {
|
|
remotes = append(remotes, r)
|
|
}
|
|
p.mu.RUnlock()
|
|
|
|
for _, r := range remotes {
|
|
go func(rh *RemoteHostGRPC) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
_, err := rh.ControlClient.Closing(ctx, &proto.ClosingNotice{Host: p.Hostname})
|
|
cancel()
|
|
if err != nil {
|
|
log.Printf("Close notify to %s failed: %v", rh.Host, err)
|
|
}
|
|
}(r)
|
|
}
|
|
}
|