package main import ( "context" "fmt" "log" "reflect" "sync" "time" proto "git.tornberg.me/go-cart-actor/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc" "k8s.io/apimachinery/pkg/watch" ) // SyncedPool coordinates cart grain ownership across nodes using gRPC control plane // and cart actor services. // // Responsibilities: // - Local grain access (delegates to GrainLocalPool) // - Cluster membership (AddRemote via discovery + negotiation) // - Health/ping monitoring & remote removal // - (Legacy) ring-based ownership removed in first-touch model // // Thread-safety: public methods that mutate internal maps lock p.mu (RWMutex). type SyncedPool struct { LocalHostname string local *GrainLocalPool // New ownership tracking (first-touch / announcement model) // remoteOwners maps cart id -> owning host (excluding locally owned carts which live in local.grains) remoteOwners map[CartId]string mu sync.RWMutex // Remote host state (gRPC only) remoteHosts map[string]*RemoteHostGRPC // host -> remote host // Discovery handler for re-adding hosts after failures discardedHostHandler *DiscardedHostHandler } // RemoteHostGRPC tracks a remote host's clients & health. type RemoteHostGRPC struct { Host string Conn *grpc.ClientConn CartClient proto.CartActorClient ControlClient proto.ControlPlaneClient MissedPings int } func (r *RemoteHostGRPC) IsHealthy() bool { return r.MissedPings < 3 } var ( negotiationCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_remote_negotiation_total", Help: "The total number of remote negotiations", }) connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_connected_remotes", Help: "The number of connected remotes", }) cartMutationsTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_mutations_total", Help: "Total number of cart state mutations applied.", }) cartMutationFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_mutation_failures_total", Help: "Total number of failed cart state mutations.", }) cartMutationLatencySeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "cart_mutation_latency_seconds", Help: "Latency of cart mutations in seconds.", Buckets: prometheus.DefBuckets, }, []string{"mutation"}) cartActiveGrains = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_active_grains", Help: "Number of active (resident) local grains.", }) ) func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) { p := &SyncedPool{ LocalHostname: hostname, local: local, remoteHosts: make(map[string]*RemoteHostGRPC), remoteOwners: make(map[CartId]string), discardedHostHandler: NewDiscardedHostHandler(1338), } p.discardedHostHandler.SetReconnectHandler(p.AddRemote) // Initialize empty ring (will be rebuilt after first AddRemote or discovery event) p.rebuildRing() if discovery != nil { go func() { time.Sleep(3 * time.Second) // allow gRPC server startup log.Printf("Starting discovery watcher") ch, err := discovery.Watch() if err != nil { log.Printf("Discovery error: %v", err) return } for evt := range ch { if evt.Host == "" { continue } switch evt.Type { case watch.Deleted: if p.IsKnown(evt.Host) { p.RemoveHost(evt.Host) } default: if !p.IsKnown(evt.Host) { log.Printf("Discovered host %s", evt.Host) p.AddRemote(evt.Host) } } } }() } else { log.Printf("No discovery configured; expecting manual AddRemote or static host injection") } return p, nil } // ------------------------- Remote Host Management ----------------------------- // AddRemote dials a remote host and initializes grain proxies. func (p *SyncedPool) AddRemote(host string) { if host == "" || host == p.LocalHostname { return } p.mu.Lock() if _, exists := p.remoteHosts[host]; exists { p.mu.Unlock() return } p.mu.Unlock() target := fmt.Sprintf("%s:1337", host) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() conn, err := grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Printf("AddRemote: dial %s failed: %v", target, err) return } cartClient := proto.NewCartActorClient(conn) controlClient := proto.NewControlPlaneClient(conn) // Health check (Ping) with limited retries pings := 3 for pings > 0 { ctxPing, cancelPing := context.WithTimeout(context.Background(), 1*time.Second) _, pingErr := controlClient.Ping(ctxPing, &proto.Empty{}) cancelPing() if pingErr == nil { break } pings-- time.Sleep(200 * time.Millisecond) if pings == 0 { log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr) conn.Close() return } } remote := &RemoteHostGRPC{ Host: host, Conn: conn, CartClient: cartClient, ControlClient: controlClient, MissedPings: 0, } p.mu.Lock() p.remoteHosts[host] = remote p.mu.Unlock() connectedRemotes.Set(float64(p.RemoteCount())) // Rebuild consistent hashing ring including this new host p.rebuildRing() log.Printf("Connected to remote host %s", host) go p.pingLoop(remote) go p.initializeRemote(remote) go p.Negotiate() } // initializeRemote fetches remote cart ids and sets up remote grain proxies. func (p *SyncedPool) initializeRemote(remote *RemoteHostGRPC) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() reply, err := remote.ControlClient.GetCartIds(ctx, &proto.Empty{}) if err != nil { log.Printf("Init remote %s: GetCartIds error: %v", remote.Host, err) return } count := 0 // Record remote ownership (first-touch model) instead of spawning remote grain proxies. p.mu.Lock() for _, idStr := range reply.CartIds { if idStr == "" { continue } cid := ToCartId(idStr) // Only set if not already claimed (first claim wins) if _, exists := p.remoteOwners[cid]; !exists { p.remoteOwners[cid] = remote.Host } count++ } p.mu.Unlock() log.Printf("Remote %s reported %d remote-owned carts (ownership cached)", remote.Host, count) } // RemoveHost removes remote host and its grains. func (p *SyncedPool) RemoveHost(host string) { p.mu.Lock() remote, exists := p.remoteHosts[host] if exists { delete(p.remoteHosts, host) } // purge remote ownership entries for this host for id, h := range p.remoteOwners { if h == host { delete(p.remoteOwners, id) } } p.mu.Unlock() if exists { remote.Conn.Close() } connectedRemotes.Set(float64(p.RemoteCount())) // Rebuild ring after host removal p.rebuildRing() } // RemoteCount returns number of tracked remote hosts. func (p *SyncedPool) RemoteCount() int { p.mu.RLock() defer p.mu.RUnlock() return len(p.remoteHosts) } func (p *SyncedPool) IsKnown(host string) bool { if host == p.LocalHostname { return true } p.mu.RLock() defer p.mu.RUnlock() _, ok := p.remoteHosts[host] return ok } func (p *SyncedPool) ExcludeKnown(hosts []string) []string { ret := make([]string, 0, len(hosts)) for _, h := range hosts { if !p.IsKnown(h) { ret = append(ret, h) } } return ret } // ------------------------- Health / Ping ------------------------------------- func (p *SyncedPool) pingLoop(remote *RemoteHostGRPC) { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() for range ticker.C { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) _, err := remote.ControlClient.Ping(ctx, &proto.Empty{}) cancel() if err != nil { remote.MissedPings++ log.Printf("Ping %s failed (%d)", remote.Host, remote.MissedPings) if !remote.IsHealthy() { log.Printf("Remote %s unhealthy, removing", remote.Host) p.RemoveHost(remote.Host) return } continue } remote.MissedPings = 0 } } func (p *SyncedPool) IsHealthy() bool { p.mu.RLock() defer p.mu.RUnlock() for _, r := range p.remoteHosts { if !r.IsHealthy() { return false } } return true } // ------------------------- Negotiation --------------------------------------- func (p *SyncedPool) Negotiate() { negotiationCount.Inc() p.mu.RLock() hosts := make([]string, 0, len(p.remoteHosts)+1) hosts = append(hosts, p.LocalHostname) for h := range p.remoteHosts { hosts = append(hosts, h) } remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) for _, r := range p.remoteHosts { remotes = append(remotes, r) } p.mu.RUnlock() for _, r := range remotes { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) reply, err := r.ControlClient.Negotiate(ctx, &proto.NegotiateRequest{KnownHosts: hosts}) cancel() if err != nil { log.Printf("Negotiate with %s failed: %v", r.Host, err) continue } for _, h := range reply.Hosts { if !p.IsKnown(h) { p.AddRemote(h) } } } // Ring rebuild removed (first-touch ownership model no longer uses ring) } // ------------------------- Grain / Ring Ownership ---------------------------- // RemoveRemoteGrain obsolete in first-touch model (no remote grain proxies retained) // SpawnRemoteGrain removed (remote grain proxies eliminated in first-touch model) // GetHealthyRemotes retained (still useful for broadcasting ownership) func (p *SyncedPool) GetHealthyRemotes() []*RemoteHostGRPC { p.mu.RLock() defer p.mu.RUnlock() ret := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) for _, r := range p.remoteHosts { if r.IsHealthy() { ret = append(ret, r) } } return ret } // rebuildRing removed (ring no longer used in first-touch ownership model) func (p *SyncedPool) rebuildRing() {} // (All ring construction & metrics removed) // ForceRingRefresh kept as no-op for backward compatibility. func (p *SyncedPool) ForceRingRefresh() {} // ownersFor removed (ring-based ownership deprecated) func (p *SyncedPool) ownersFor(id CartId) []string { return []string{p.LocalHostname} } // ownerHostFor retained as wrapper to satisfy existing calls (always local) func (p *SyncedPool) ownerHostFor(id CartId) string { return p.LocalHostname } // DebugOwnerHost exposes (for tests) the currently computed primary owner host. func (p *SyncedPool) DebugOwnerHost(id CartId) string { return p.ownerHostFor(id) } func (p *SyncedPool) removeLocalGrain(id CartId) { p.mu.Lock() delete(p.local.grains, LegacyToCartKey(id)) p.mu.Unlock() } // ------------------------- First-Touch Ownership Resolution ------------------ // ErrNotOwner is returned when an operation is attempted on a cart that is // owned by a different host (according to first-touch ownership mapping). var ErrNotOwner = fmt.Errorf("not owner") // resolveOwnerFirstTouch implements the new semantics: // 1. If local grain exists -> local host owns it. // 2. Else if remoteOwners has an entry -> return that host. // 3. Else: claim locally (spawn), insert into remoteOwners map locally for // idempotency, and asynchronously announce ownership to all remotes. // // NOTE: This does NOT (yet) reconcile conflicting announcements; first claim // wins. Later improvements can add tie-break via timestamp or host ordering. func (p *SyncedPool) resolveOwnerFirstTouch(id CartId) (string, error) { // Fast local existence check p.local.mu.RLock() _, existsLocal := p.local.grains[LegacyToCartKey(id)] p.local.mu.RUnlock() if existsLocal { return p.LocalHostname, nil } // Remote ownership map lookup p.mu.RLock() remoteHost, foundRemote := p.remoteOwners[id] p.mu.RUnlock() if foundRemote && remoteHost != "" { return remoteHost, nil } // Claim: spawn locally _, err := p.local.GetGrain(id) if err != nil { return "", err } // Record (defensive) in remoteOwners pointing to self (not strictly needed // for local queries, but keeps a single lookup structure). p.mu.Lock() if _, stillMissing := p.remoteOwners[id]; !stillMissing { // Another goroutine inserted meanwhile; keep theirs (first claim wins). } else { p.remoteOwners[id] = p.LocalHostname } p.mu.Unlock() // Announce asynchronously go p.broadcastOwnership([]CartId{id}) return p.LocalHostname, nil } // broadcastOwnership sends an AnnounceOwnership RPC to all healthy remotes. // Best-effort: failures are logged and ignored. func (p *SyncedPool) broadcastOwnership(ids []CartId) { if len(ids) == 0 { return } // Prepare payload (convert to string slice) payload := make([]string, 0, len(ids)) for _, id := range ids { if id.String() != "" { payload = append(payload, id.String()) } } if len(payload) == 0 { return } p.mu.RLock() remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) for _, r := range p.remoteHosts { if r.IsHealthy() { remotes = append(remotes, r) } } p.mu.RUnlock() for _, r := range remotes { go func(rh *RemoteHostGRPC) { // AnnounceOwnership RPC not yet available (proto regeneration pending); no-op broadcast for now. // Intended announcement: host=p.LocalHostname ids=payload _ = rh }(r) } } // AdoptRemoteOwnership processes an incoming ownership announcement for cart ids. func (p *SyncedPool) AdoptRemoteOwnership(host string, ids []string) { if host == "" || host == p.LocalHostname { return } p.mu.Lock() defer p.mu.Unlock() for _, s := range ids { if s == "" { continue } id := ToCartId(s) // Do not overwrite if already claimed by another host (first wins). if existing, ok := p.remoteOwners[id]; ok && existing != host { continue } // Skip if we own locally (local wins for our own process) p.local.mu.RLock() _, localHas := p.local.grains[LegacyToCartKey(id)] p.local.mu.RUnlock() if localHas { continue } p.remoteOwners[id] = host } } // getGrain returns a local grain if this host is (or becomes) the owner under // the first-touch model. If another host owns the cart, ErrNotOwner is returned. // Remote grain proxy logic and ring-based spawning have been removed. func (p *SyncedPool) getGrain(id CartId) (Grain, error) { owner, err := p.resolveOwnerFirstTouch(id) if err != nil { return nil, err } if owner != p.LocalHostname { // Another host owns it; signal caller to proxy / forward. return nil, ErrNotOwner } // Owner is local (either existing or just claimed), fetch/create grain. grain, err := p.local.GetGrain(id) if err != nil { return nil, err } return grain, nil } // Apply applies a single mutation to a grain (local or remote). // Replication (RF>1) scaffolding: future enhancement will fan-out mutations // to replica owners (best-effort) and reconcile quorum on read. func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { grain, err := p.getGrain(id) if err == ErrNotOwner { // Remote owner reported but either unreachable or failed earlier in stack. // Takeover strategy: remove remote mapping (first-touch override) and claim locally. p.mu.Lock() delete(p.remoteOwners, id) p.mu.Unlock() if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil { return nil, terr } else if owner == p.LocalHostname { // Fetch (now-local) grain grain, err = p.local.GetGrain(id) if err != nil { return nil, err } } else { // Another host reclaimed before us; treat as not owner. return nil, ErrNotOwner } } else if err != nil { return nil, err } start := time.Now() result, applyErr := grain.Apply(mutation, false) // Derive mutation type label (strip pointer) mutationType := "unknown" if mutation != nil { if t := reflect.TypeOf(mutation); t != nil { if t.Kind() == reflect.Ptr { t = t.Elem() } if t.Name() != "" { mutationType = t.Name() } } } cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds()) if applyErr == nil && result != nil { cartMutationsTotal.Inc() if p.ownerHostFor(id) == p.LocalHostname { // Update active grains gauge only for local ownership cartActiveGrains.Set(float64(p.local.DebugGrainCount())) } } else if applyErr != nil { cartMutationFailuresTotal.Inc() } return result, applyErr } // Get returns current state of a grain (local or remote). // Future replication hook: Read-repair or quorum read can be added here. func (p *SyncedPool) Get(id CartId) (*CartGrain, error) { grain, err := p.getGrain(id) if err == ErrNotOwner { // Attempt takeover on read as well (e.g. owner dead). p.mu.Lock() delete(p.remoteOwners, id) p.mu.Unlock() if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil { return nil, terr } else if owner == p.LocalHostname { grain, err = p.local.GetGrain(id) if err != nil { return nil, err } } else { return nil, ErrNotOwner } } else if err != nil { return nil, err } return grain.GetCurrentState() } // Close notifies remotes this host is terminating. func (p *SyncedPool) Close() { p.mu.RLock() remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) for _, r := range p.remoteHosts { remotes = append(remotes, r) } p.mu.RUnlock() for _, r := range remotes { go func(rh *RemoteHostGRPC) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) _, err := rh.ControlClient.Closing(ctx, &proto.ClosingNotice{Host: p.LocalHostname}) cancel() if err != nil { log.Printf("Close notify to %s failed: %v", rh.Host, err) } }(r) } } // Hostname implements the GrainPool interface, returning this node's hostname. func (p *SyncedPool) Hostname() string { return p.LocalHostname } // OwnerHost returns the primary owning host for a given cart id (ring lookup). func (p *SyncedPool) OwnerHost(id CartId) string { return p.ownerHostFor(id) }