Files
go-cart-actor/synced-pool.go
matst80 e48a2590bd
All checks were successful
Build and Publish / Metadata (push) Successful in 3s
Build and Publish / BuildAndDeployAmd64 (push) Successful in 50s
Build and Publish / BuildAndDeployArm64 (push) Successful in 4m25s
change ids
2025-10-10 21:50:18 +00:00

637 lines
17 KiB
Go

package main
import (
"context"
"fmt"
"log"
"reflect"
"sync"
"time"
proto "git.tornberg.me/go-cart-actor/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/watch"
)
// SyncedPool coordinates cart grain ownership across nodes using gRPC control plane
// and cart actor services.
//
// Responsibilities:
// - Local grain access (delegates to GrainLocalPool)
// - Cluster membership (AddRemote via discovery + negotiation)
// - Health/ping monitoring & remote removal
// - (Legacy) ring-based ownership removed in first-touch model
//
// Thread-safety: public methods that mutate internal maps lock p.mu (RWMutex).
type SyncedPool struct {
LocalHostname string
local *GrainLocalPool
// New ownership tracking (first-touch / announcement model)
// remoteOwners maps cart id -> owning host (excluding locally owned carts which live in local.grains)
remoteOwners map[CartId]string
mu sync.RWMutex
// Remote host state (gRPC only)
remoteHosts map[string]*RemoteHostGRPC // host -> remote host
// Discovery handler for re-adding hosts after failures
discardedHostHandler *DiscardedHostHandler
}
// RemoteHostGRPC tracks a remote host's clients & health.
type RemoteHostGRPC struct {
Host string
Conn *grpc.ClientConn
ControlClient proto.ControlPlaneClient
MissedPings int
}
func (r *RemoteHostGRPC) IsHealthy() bool {
return r.MissedPings < 3
}
var (
negotiationCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_remote_negotiation_total",
Help: "The total number of remote negotiations",
})
connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_connected_remotes",
Help: "The number of connected remotes",
})
cartMutationsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_mutations_total",
Help: "Total number of cart state mutations applied.",
})
cartMutationFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_mutation_failures_total",
Help: "Total number of failed cart state mutations.",
})
cartMutationLatencySeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "cart_mutation_latency_seconds",
Help: "Latency of cart mutations in seconds.",
Buckets: prometheus.DefBuckets,
}, []string{"mutation"})
cartActiveGrains = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_active_grains",
Help: "Number of active (resident) local grains.",
})
)
func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) {
p := &SyncedPool{
LocalHostname: hostname,
local: local,
remoteHosts: make(map[string]*RemoteHostGRPC),
remoteOwners: make(map[CartId]string),
discardedHostHandler: NewDiscardedHostHandler(1338),
}
p.discardedHostHandler.SetReconnectHandler(p.AddRemote)
// Initialize empty ring (will be rebuilt after first AddRemote or discovery event)
p.rebuildRing()
if discovery != nil {
go func() {
time.Sleep(3 * time.Second) // allow gRPC server startup
log.Printf("Starting discovery watcher")
ch, err := discovery.Watch()
if err != nil {
log.Printf("Discovery error: %v", err)
return
}
for evt := range ch {
if evt.Host == "" {
continue
}
switch evt.Type {
case watch.Deleted:
if p.IsKnown(evt.Host) {
p.RemoveHost(evt.Host)
}
default:
if !p.IsKnown(evt.Host) {
log.Printf("Discovered host %s", evt.Host)
p.AddRemote(evt.Host)
}
}
}
}()
} else {
log.Printf("No discovery configured; expecting manual AddRemote or static host injection")
}
return p, nil
}
// ------------------------- Remote Host Management -----------------------------
// AddRemote dials a remote host and initializes grain proxies.
func (p *SyncedPool) AddRemote(host string) {
if host == "" || host == p.LocalHostname {
return
}
p.mu.Lock()
if _, exists := p.remoteHosts[host]; exists {
p.mu.Unlock()
return
}
p.mu.Unlock()
target := fmt.Sprintf("%s:1337", host)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Printf("AddRemote: dial %s failed: %v", target, err)
return
}
controlClient := proto.NewControlPlaneClient(conn)
// Health check (Ping) with limited retries
pings := 3
for pings > 0 {
ctxPing, cancelPing := context.WithTimeout(context.Background(), 1*time.Second)
_, pingErr := controlClient.Ping(ctxPing, &proto.Empty{})
cancelPing()
if pingErr == nil {
break
}
pings--
time.Sleep(200 * time.Millisecond)
if pings == 0 {
log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr)
conn.Close()
return
}
}
remote := &RemoteHostGRPC{
Host: host,
Conn: conn,
ControlClient: controlClient,
MissedPings: 0,
}
p.mu.Lock()
p.remoteHosts[host] = remote
p.mu.Unlock()
connectedRemotes.Set(float64(p.RemoteCount()))
// Rebuild consistent hashing ring including this new host
p.rebuildRing()
log.Printf("Connected to remote host %s", host)
go p.pingLoop(remote)
go p.initializeRemote(remote)
go p.Negotiate()
}
// initializeRemote fetches remote cart ids and sets up remote grain proxies.
func (p *SyncedPool) initializeRemote(remote *RemoteHostGRPC) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
reply, err := remote.ControlClient.GetCartIds(ctx, &proto.Empty{})
if err != nil {
log.Printf("Init remote %s: GetCartIds error: %v", remote.Host, err)
return
}
count := 0
// Record remote ownership (first-touch model) instead of spawning remote grain proxies.
p.mu.Lock()
for _, cid := range reply.CartIds {
// Only set if not already claimed (first claim wins)
if _, exists := p.remoteOwners[CartId(cid)]; !exists {
p.remoteOwners[CartId(cid)] = remote.Host
}
count++
}
p.mu.Unlock()
log.Printf("Remote %s reported %d remote-owned carts (ownership cached)", remote.Host, count)
}
// RemoveHost removes remote host and its grains.
func (p *SyncedPool) RemoveHost(host string) {
p.mu.Lock()
remote, exists := p.remoteHosts[host]
if exists {
delete(p.remoteHosts, host)
}
// purge remote ownership entries for this host
for id, h := range p.remoteOwners {
if h == host {
delete(p.remoteOwners, id)
}
}
p.mu.Unlock()
if exists {
remote.Conn.Close()
}
connectedRemotes.Set(float64(p.RemoteCount()))
// Rebuild ring after host removal
p.rebuildRing()
}
// RemoteCount returns number of tracked remote hosts.
func (p *SyncedPool) RemoteCount() int {
p.mu.RLock()
defer p.mu.RUnlock()
return len(p.remoteHosts)
}
func (p *SyncedPool) IsKnown(host string) bool {
if host == p.LocalHostname {
return true
}
p.mu.RLock()
defer p.mu.RUnlock()
_, ok := p.remoteHosts[host]
return ok
}
func (p *SyncedPool) ExcludeKnown(hosts []string) []string {
ret := make([]string, 0, len(hosts))
for _, h := range hosts {
if !p.IsKnown(h) {
ret = append(ret, h)
}
}
return ret
}
// ------------------------- Health / Ping -------------------------------------
func (p *SyncedPool) pingLoop(remote *RemoteHostGRPC) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for range ticker.C {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
_, err := remote.ControlClient.Ping(ctx, &proto.Empty{})
cancel()
if err != nil {
remote.MissedPings++
log.Printf("Ping %s failed (%d)", remote.Host, remote.MissedPings)
if !remote.IsHealthy() {
log.Printf("Remote %s unhealthy, removing", remote.Host)
p.RemoveHost(remote.Host)
return
}
continue
}
remote.MissedPings = 0
}
}
func (p *SyncedPool) IsHealthy() bool {
p.mu.RLock()
defer p.mu.RUnlock()
for _, r := range p.remoteHosts {
if !r.IsHealthy() {
return false
}
}
return true
}
// ------------------------- Negotiation ---------------------------------------
func (p *SyncedPool) Negotiate() {
negotiationCount.Inc()
p.mu.RLock()
hosts := make([]string, 0, len(p.remoteHosts)+1)
hosts = append(hosts, p.LocalHostname)
for h := range p.remoteHosts {
hosts = append(hosts, h)
}
remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
for _, r := range p.remoteHosts {
remotes = append(remotes, r)
}
p.mu.RUnlock()
for _, r := range remotes {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
reply, err := r.ControlClient.Negotiate(ctx, &proto.NegotiateRequest{KnownHosts: hosts})
cancel()
if err != nil {
log.Printf("Negotiate with %s failed: %v", r.Host, err)
continue
}
for _, h := range reply.Hosts {
if !p.IsKnown(h) {
p.AddRemote(h)
}
}
}
// Ring rebuild removed (first-touch ownership model no longer uses ring)
}
// ------------------------- Grain / Ring Ownership ----------------------------
// RemoveRemoteGrain obsolete in first-touch model (no remote grain proxies retained)
// SpawnRemoteGrain removed (remote grain proxies eliminated in first-touch model)
// GetHealthyRemotes retained (still useful for broadcasting ownership)
func (p *SyncedPool) GetHealthyRemotes() []*RemoteHostGRPC {
p.mu.RLock()
defer p.mu.RUnlock()
ret := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
for _, r := range p.remoteHosts {
if r.IsHealthy() {
ret = append(ret, r)
}
}
return ret
}
// rebuildRing removed (ring no longer used in first-touch ownership model)
func (p *SyncedPool) rebuildRing() {}
// (All ring construction & metrics removed)
// ForceRingRefresh kept as no-op for backward compatibility.
func (p *SyncedPool) ForceRingRefresh() {}
// ownersFor removed (ring-based ownership deprecated)
func (p *SyncedPool) ownersFor(id CartId) []string {
return []string{p.LocalHostname}
}
// ownerHostFor retained as wrapper to satisfy existing calls (always local)
func (p *SyncedPool) ownerHostFor(id CartId) string {
return p.LocalHostname
}
// DebugOwnerHost exposes (for tests) the currently computed primary owner host.
func (p *SyncedPool) DebugOwnerHost(id CartId) string {
return p.ownerHostFor(id)
}
func (p *SyncedPool) removeLocalGrain(id CartId) {
p.mu.Lock()
delete(p.local.grains, uint64(id))
p.mu.Unlock()
}
// ------------------------- First-Touch Ownership Resolution ------------------
// ErrNotOwner is returned when an operation is attempted on a cart that is
// owned by a different host (according to first-touch ownership mapping).
var ErrNotOwner = fmt.Errorf("not owner")
// resolveOwnerFirstTouch implements the new semantics:
// 1. If local grain exists -> local host owns it.
// 2. Else if remoteOwners has an entry -> return that host.
// 3. Else: claim locally (spawn), insert into remoteOwners map locally for
// idempotency, and asynchronously announce ownership to all remotes.
//
// NOTE: This does NOT (yet) reconcile conflicting announcements; first claim
// wins. Later improvements can add tie-break via timestamp or host ordering.
func (p *SyncedPool) resolveOwnerFirstTouch(id CartId) (string, error) {
// Fast local existence check
p.local.mu.RLock()
_, existsLocal := p.local.grains[uint64(id)]
p.local.mu.RUnlock()
if existsLocal {
return p.LocalHostname, nil
}
// Remote ownership map lookup
p.mu.RLock()
remoteHost, foundRemote := p.remoteOwners[id]
p.mu.RUnlock()
if foundRemote && remoteHost != "" {
return remoteHost, nil
}
// Claim: spawn locally
_, err := p.local.GetGrain(id)
if err != nil {
return "", err
}
// Record (defensive) in remoteOwners pointing to self (not strictly needed
// for local queries, but keeps a single lookup structure).
p.mu.Lock()
if _, stillMissing := p.remoteOwners[id]; !stillMissing {
// Another goroutine inserted meanwhile; keep theirs (first claim wins).
} else {
p.remoteOwners[id] = p.LocalHostname
}
p.mu.Unlock()
// Announce asynchronously
go p.broadcastOwnership([]CartId{id})
return p.LocalHostname, nil
}
// broadcastOwnership sends an AnnounceOwnership RPC to all healthy remotes.
// Best-effort: failures are logged and ignored.
func (p *SyncedPool) broadcastOwnership(ids []CartId) {
if len(ids) == 0 {
return
}
// Prepare payload (convert to string slice)
payload := make([]string, 0, len(ids))
for _, id := range ids {
if id.String() != "" {
payload = append(payload, id.String())
}
}
if len(payload) == 0 {
return
}
p.mu.RLock()
remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
for _, r := range p.remoteHosts {
if r.IsHealthy() {
remotes = append(remotes, r)
}
}
p.mu.RUnlock()
for _, r := range remotes {
go func(rh *RemoteHostGRPC) {
// AnnounceOwnership RPC not yet available (proto regeneration pending); no-op broadcast for now.
// Intended announcement: host=p.LocalHostname ids=payload
_ = rh
}(r)
}
}
// AdoptRemoteOwnership processes an incoming ownership announcement for cart ids.
func (p *SyncedPool) AdoptRemoteOwnership(host string, ids []string) {
if host == "" || host == p.LocalHostname {
return
}
p.mu.Lock()
defer p.mu.Unlock()
for _, s := range ids {
if s == "" {
continue
}
parsed, ok := ParseCartId(s)
if !ok {
continue // skip invalid cart id strings
}
id := parsed
// Do not overwrite if already claimed by another host (first wins).
if existing, ok := p.remoteOwners[id]; ok && existing != host {
continue
}
// Skip if we own locally (local wins for our own process)
p.local.mu.RLock()
_, localHas := p.local.grains[uint64(id)]
p.local.mu.RUnlock()
if localHas {
continue
}
p.remoteOwners[id] = host
}
}
// getGrain returns a local grain if this host is (or becomes) the owner under
// the first-touch model. If another host owns the cart, ErrNotOwner is returned.
// Remote grain proxy logic and ring-based spawning have been removed.
func (p *SyncedPool) getGrain(id CartId) (Grain, error) {
owner, err := p.resolveOwnerFirstTouch(id)
if err != nil {
return nil, err
}
if owner != p.LocalHostname {
// Another host owns it; signal caller to proxy / forward.
return nil, ErrNotOwner
}
// Owner is local (either existing or just claimed), fetch/create grain.
grain, err := p.local.GetGrain(id)
if err != nil {
return nil, err
}
return grain, nil
}
// Apply applies a single mutation to a grain (local or remote).
// Replication (RF>1) scaffolding: future enhancement will fan-out mutations
// to replica owners (best-effort) and reconcile quorum on read.
func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) {
grain, err := p.getGrain(id)
if err == ErrNotOwner {
// Remote owner reported but either unreachable or failed earlier in stack.
// Takeover strategy: remove remote mapping (first-touch override) and claim locally.
p.mu.Lock()
delete(p.remoteOwners, id)
p.mu.Unlock()
if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil {
return nil, terr
} else if owner == p.LocalHostname {
// Fetch (now-local) grain
grain, err = p.local.GetGrain(id)
if err != nil {
return nil, err
}
} else {
// Another host reclaimed before us; treat as not owner.
return nil, ErrNotOwner
}
} else if err != nil {
return nil, err
}
start := time.Now()
result, applyErr := grain.Apply(mutation, false)
// Derive mutation type label (strip pointer)
mutationType := "unknown"
if mutation != nil {
if t := reflect.TypeOf(mutation); t != nil {
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
if t.Name() != "" {
mutationType = t.Name()
}
}
}
cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds())
if applyErr == nil && result != nil {
cartMutationsTotal.Inc()
if p.ownerHostFor(id) == p.LocalHostname {
// Update active grains gauge only for local ownership
cartActiveGrains.Set(float64(p.local.DebugGrainCount()))
}
} else if applyErr != nil {
cartMutationFailuresTotal.Inc()
}
return result, applyErr
}
// Get returns current state of a grain (local or remote).
// Future replication hook: Read-repair or quorum read can be added here.
func (p *SyncedPool) Get(id CartId) (*CartGrain, error) {
grain, err := p.getGrain(id)
if err == ErrNotOwner {
// Attempt takeover on read as well (e.g. owner dead).
p.mu.Lock()
delete(p.remoteOwners, id)
p.mu.Unlock()
if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil {
return nil, terr
} else if owner == p.LocalHostname {
grain, err = p.local.GetGrain(id)
if err != nil {
return nil, err
}
} else {
return nil, ErrNotOwner
}
} else if err != nil {
return nil, err
}
return grain.GetCurrentState()
}
// Close notifies remotes this host is terminating.
func (p *SyncedPool) Close() {
p.mu.RLock()
remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
for _, r := range p.remoteHosts {
remotes = append(remotes, r)
}
p.mu.RUnlock()
for _, r := range remotes {
go func(rh *RemoteHostGRPC) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
_, err := rh.ControlClient.Closing(ctx, &proto.ClosingNotice{Host: p.LocalHostname})
cancel()
if err != nil {
log.Printf("Close notify to %s failed: %v", rh.Host, err)
}
}(r)
}
}
// Hostname implements the GrainPool interface, returning this node's hostname.
func (p *SyncedPool) Hostname() string {
return p.LocalHostname
}
// OwnerHost returns the primary owning host for a given cart id (ring lookup).
func (p *SyncedPool) OwnerHost(id CartId) string {
return p.ownerHostFor(id)
}