package main import ( "context" "fmt" "log" "sync" "time" proto "git.tornberg.me/go-cart-actor/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc" "k8s.io/apimachinery/pkg/watch" ) // SyncedPool coordinates cart grain ownership across nodes using gRPC control plane // and cart actor services. Legacy frame / TCP code has been removed. // // Responsibilities: // - Local grain access (delegates to GrainLocalPool) // - Remote grain proxy management (RemoteGrainGRPC) // - Cluster membership (AddRemote via discovery + negotiation) // - Ownership acquisition (quorum via ConfirmOwner RPC) // - Health/ping monitoring & remote removal // // Thread-safety: public methods that mutate internal maps lock p.mu (RWMutex). type SyncedPool struct { Hostname string local *GrainLocalPool mu sync.RWMutex // Remote host state (gRPC only) remoteHosts map[string]*RemoteHostGRPC // host -> remote host // Remote grain proxies (by cart id) remoteIndex map[CartId]Grain // Discovery handler for re-adding hosts after failures discardedHostHandler *DiscardedHostHandler // Metrics / instrumentation dependencies already declared globally } // RemoteHostGRPC tracks a remote host's clients & health. type RemoteHostGRPC struct { Host string Conn *grpc.ClientConn CartClient proto.CartActorClient ControlClient proto.ControlPlaneClient MissedPings int } func (r *RemoteHostGRPC) IsHealthy() bool { return r.MissedPings < 3 } var ( negotiationCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_remote_negotiation_total", Help: "The total number of remote negotiations", }) grainSyncCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_sync_total", Help: "The total number of grain owner changes", }) connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_connected_remotes", Help: "The number of connected remotes", }) remoteLookupCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_remote_lookup_total", Help: "The total number of remote lookups", }) ) func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) { p := &SyncedPool{ Hostname: hostname, local: local, remoteHosts: make(map[string]*RemoteHostGRPC), remoteIndex: make(map[CartId]Grain), discardedHostHandler: NewDiscardedHostHandler(1338), } p.discardedHostHandler.SetReconnectHandler(p.AddRemote) if discovery != nil { go func() { time.Sleep(3 * time.Second) // allow gRPC server startup log.Printf("Starting discovery watcher") ch, err := discovery.Watch() if err != nil { log.Printf("Discovery error: %v", err) return } for evt := range ch { if evt.Host == "" { continue } switch evt.Type { case watch.Deleted: if p.IsKnown(evt.Host) { p.RemoveHost(evt.Host) } default: if !p.IsKnown(evt.Host) { log.Printf("Discovered host %s", evt.Host) p.AddRemote(evt.Host) } } } }() } else { log.Printf("No discovery configured; expecting manual AddRemote or static host injection") } return p, nil } // ------------------------- Remote Host Management ----------------------------- // AddRemote dials a remote host and initializes grain proxies. func (p *SyncedPool) AddRemote(host string) { if host == "" || host == p.Hostname { return } p.mu.Lock() if _, exists := p.remoteHosts[host]; exists { p.mu.Unlock() return } p.mu.Unlock() target := fmt.Sprintf("%s:1337", host) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() conn, err := grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Printf("AddRemote: dial %s failed: %v", target, err) return } cartClient := proto.NewCartActorClient(conn) controlClient := proto.NewControlPlaneClient(conn) // Health check (Ping) with limited retries pings := 3 for pings > 0 { ctxPing, cancelPing := context.WithTimeout(context.Background(), 1*time.Second) _, pingErr := controlClient.Ping(ctxPing, &proto.Empty{}) cancelPing() if pingErr == nil { break } pings-- time.Sleep(200 * time.Millisecond) if pings == 0 { log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr) conn.Close() return } } remote := &RemoteHostGRPC{ Host: host, Conn: conn, CartClient: cartClient, ControlClient: controlClient, MissedPings: 0, } p.mu.Lock() p.remoteHosts[host] = remote p.mu.Unlock() connectedRemotes.Set(float64(p.RemoteCount())) log.Printf("Connected to remote host %s", host) go p.pingLoop(remote) go p.initializeRemote(remote) go p.Negotiate() } // initializeRemote fetches remote cart ids and sets up remote grain proxies. func (p *SyncedPool) initializeRemote(remote *RemoteHostGRPC) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() reply, err := remote.ControlClient.GetCartIds(ctx, &proto.Empty{}) if err != nil { log.Printf("Init remote %s: GetCartIds error: %v", remote.Host, err) return } count := 0 for _, idStr := range reply.CartIds { if idStr == "" { continue } p.SpawnRemoteGrain(ToCartId(idStr), remote.Host) count++ } log.Printf("Remote %s reported %d grains", remote.Host, count) } // RemoveHost removes remote host and its grains. func (p *SyncedPool) RemoveHost(host string) { p.mu.Lock() remote, exists := p.remoteHosts[host] if exists { delete(p.remoteHosts, host) } // remove grains pointing to host for id, g := range p.remoteIndex { if rg, ok := g.(*RemoteGrainGRPC); ok && rg.Host == host { delete(p.remoteIndex, id) } } p.mu.Unlock() if exists { remote.Conn.Close() } connectedRemotes.Set(float64(p.RemoteCount())) } // RemoteCount returns number of tracked remote hosts. func (p *SyncedPool) RemoteCount() int { p.mu.RLock() defer p.mu.RUnlock() return len(p.remoteHosts) } func (p *SyncedPool) IsKnown(host string) bool { if host == p.Hostname { return true } p.mu.RLock() defer p.mu.RUnlock() _, ok := p.remoteHosts[host] return ok } func (p *SyncedPool) ExcludeKnown(hosts []string) []string { ret := make([]string, 0, len(hosts)) for _, h := range hosts { if !p.IsKnown(h) { ret = append(ret, h) } } return ret } // ------------------------- Health / Ping ------------------------------------- func (p *SyncedPool) pingLoop(remote *RemoteHostGRPC) { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() for range ticker.C { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) _, err := remote.ControlClient.Ping(ctx, &proto.Empty{}) cancel() if err != nil { remote.MissedPings++ log.Printf("Ping %s failed (%d)", remote.Host, remote.MissedPings) if !remote.IsHealthy() { log.Printf("Remote %s unhealthy, removing", remote.Host) p.RemoveHost(remote.Host) return } continue } remote.MissedPings = 0 } } func (p *SyncedPool) IsHealthy() bool { p.mu.RLock() defer p.mu.RUnlock() for _, r := range p.remoteHosts { if !r.IsHealthy() { return false } } return true } // ------------------------- Negotiation --------------------------------------- func (p *SyncedPool) Negotiate() { negotiationCount.Inc() p.mu.RLock() hosts := make([]string, 0, len(p.remoteHosts)+1) hosts = append(hosts, p.Hostname) for h := range p.remoteHosts { hosts = append(hosts, h) } remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) for _, r := range p.remoteHosts { remotes = append(remotes, r) } p.mu.RUnlock() for _, r := range remotes { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) reply, err := r.ControlClient.Negotiate(ctx, &proto.NegotiateRequest{KnownHosts: hosts}) cancel() if err != nil { log.Printf("Negotiate with %s failed: %v", r.Host, err) continue } for _, h := range reply.Hosts { if !p.IsKnown(h) { p.AddRemote(h) } } } } // ------------------------- Grain Management ---------------------------------- // RemoveRemoteGrain removes a remote grain mapping. func (p *SyncedPool) RemoveRemoteGrain(id CartId) { p.mu.Lock() delete(p.remoteIndex, id) p.mu.Unlock() } // SpawnRemoteGrain creates/updates a remote grain proxy for a given host. func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { if id.String() == "" { return } p.mu.Lock() // If local grain exists, remove it (ownership changed) if g, ok := p.local.grains[id]; ok && g != nil { delete(p.local.grains, id) } remoteHost, ok := p.remoteHosts[host] if !ok { p.mu.Unlock() log.Printf("SpawnRemoteGrain: host %s unknown (id=%s), attempting AddRemote", host, id) go p.AddRemote(host) return } rg := NewRemoteGrainGRPC(id, host, remoteHost.CartClient) p.remoteIndex[id] = rg p.mu.Unlock() } // GetHealthyRemotes returns a copy slice of healthy remote hosts. func (p *SyncedPool) GetHealthyRemotes() []*RemoteHostGRPC { p.mu.RLock() defer p.mu.RUnlock() ret := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) for _, r := range p.remoteHosts { if r.IsHealthy() { ret = append(ret, r) } } return ret } // RequestOwnership attempts to become owner of a cart, requiring quorum. // On success local grain is (or will be) created; peers spawn remote proxies. func (p *SyncedPool) RequestOwnership(id CartId) error { ok := 0 all := 0 remotes := p.GetHealthyRemotes() log.Printf("RequestOwnership start id=%s host=%s healthyRemotes=%d", id, p.Hostname, len(remotes)) for _, r := range remotes { log.Printf("RequestOwnership sending ConfirmOwner to host=%s id=%s", r.Host, id) ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond) reply, err := r.ControlClient.ConfirmOwner(ctx, &proto.OwnerChangeRequest{ CartId: id.String(), NewHost: p.Hostname, }) cancel() all++ if err != nil || reply == nil || !reply.Accepted { log.Printf("RequestOwnership negative/failed response from host=%s id=%s err=%v reply=%v", r.Host, id, err, reply) continue } ok++ log.Printf("RequestOwnership accept from host=%s id=%s (ok=%d all=%d)", r.Host, id, ok, all) } // Quorum rule (majority semantics): // - Let N = all remotes + 1 (self) // - We require ok + 1 (implicit self vote) >= floor(N/2)+1 // => ok >= floor(N/2) // - Examples: // N=2 (all=1): threshold=1 (need 1 remote) // N=3 (all=2): threshold=1 (need 1 remote; previously required 2) // N=4 (all=3): threshold=2 // N=5 (all=4): threshold=2 // - This change allows faster ownership under partial remote availability in small clusters. log.Printf("RequestOwnership quorum evaluation id=%s host=%s ok=%d all=%d", id, p.Hostname, ok, all) threshold := (all + 1) / 2 // floor(N/2) if ok < threshold { p.removeLocalGrain(id) log.Printf("RequestOwnership failed quorum id=%s host=%s ok=%d all=%d threshold=%d", id, p.Hostname, ok, all, threshold) return fmt.Errorf("quorum not reached (ok=%d all=%d threshold=%d)", ok, all, threshold) } grainSyncCount.Inc() log.Printf("RequestOwnership success id=%s host=%s ok=%d all=%d threshold=%d", id, p.Hostname, ok, all, threshold) return nil } func (p *SyncedPool) removeLocalGrain(id CartId) { p.mu.Lock() delete(p.local.grains, id) p.mu.Unlock() } // getGrain returns a local or remote grain. If absent, it synchronously attempts // to acquire ownership before spawning a local grain to eliminate the race where // the first mutation applies before peers have installed remote proxies. func (p *SyncedPool) getGrain(id CartId) (Grain, error) { p.mu.RLock() localGrain, isLocal := p.local.grains[id] remoteGrain, isRemote := p.remoteIndex[id] p.mu.RUnlock() if isLocal && localGrain != nil { return localGrain, nil } if isRemote { remoteLookupCount.Inc() return remoteGrain, nil } // Synchronously attempt to claim ownership. If this fails (quorum not reached) // we re-check for a newly appeared remote proxy (another node became owner). if err := p.RequestOwnership(id); err != nil { p.mu.RLock() if rg, ok := p.remoteIndex[id]; ok { p.mu.RUnlock() remoteLookupCount.Inc() return rg, nil } p.mu.RUnlock() return nil, err } // Ownership acquired; now lazily spawn the local grain. grain, err := p.local.GetGrain(id) if err != nil { return nil, err } return grain, nil } // Apply applies a single mutation to a grain (local or remote). func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { grain, err := p.getGrain(id) if err != nil { return nil, err } return grain.Apply(mutation, false) } // Get returns current state of a grain (local or remote). func (p *SyncedPool) Get(id CartId) (*CartGrain, error) { grain, err := p.getGrain(id) if err != nil { return nil, err } return grain.GetCurrentState() } // Close notifies remotes this host is terminating. func (p *SyncedPool) Close() { p.mu.RLock() remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) for _, r := range p.remoteHosts { remotes = append(remotes, r) } p.mu.RUnlock() for _, r := range remotes { go func(rh *RemoteHostGRPC) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) _, err := rh.ControlClient.Closing(ctx, &proto.ClosingNotice{Host: p.Hostname}) cancel() if err != nil { log.Printf("Close notify to %s failed: %v", rh.Host, err) } }(r) } }