From 9df2f3362ab6816a7fb87f2e960462018d3d2021 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mats=20T=C3=B6rnberg?= Date: Sat, 11 Oct 2025 17:42:37 +0200 Subject: [PATCH] some cleanup and annonce expiry --- grain-pool.go | 907 ++++++++++++++++++++++++++------- grpc_server.go | 43 +- main.go | 68 +-- pool-server.go | 92 +--- proto/control_plane.pb.go | 180 ++++--- proto/control_plane.proto | 9 + proto/control_plane_grpc.pb.go | 46 +- synced-pool.go | 642 ----------------------- 8 files changed, 935 insertions(+), 1052 deletions(-) delete mode 100644 synced-pool.go diff --git a/grain-pool.go b/grain-pool.go index f0d35d1..e4bddff 100644 --- a/grain-pool.go +++ b/grain-pool.go @@ -1,186 +1,367 @@ package main import ( + "bytes" + "context" "fmt" + "io" "log" "net/http" + "reflect" "sync" "time" + messages "git.tornberg.me/go-cart-actor/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "k8s.io/apimachinery/pkg/watch" ) -// grain-pool.go -// -// Migration Note: -// This file has been migrated to use uint64 cart keys internally (derived -// from the new CartID base62 representation). For backward compatibility, -// a deprecated legacy map keyed by CartId is maintained so existing code -// that directly indexes pool.grains with a CartId continues to compile -// until the full refactor across SyncedPool is completed. -// -// Authoritative storage: grains (map[uint64]*CartGrain) -// Legacy compatibility: grainsLegacy (map[CartId]*CartGrain) - kept in sync. -// -// Once all external usages are updated to rely on helper accessors, -// grainsLegacy can be removed. -// +// --------------------------------------------------------------------------- +// Metrics shared by the cart pool implementation. // --------------------------------------------------------------------------- var ( poolGrains = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_grains_in_pool", - Help: "The total number of grains in the pool", + Help: "The total number of grains in the local pool", }) poolSize = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_pool_size", - Help: "The total number of mutations", + Help: "Configured capacity of the cart pool", }) poolUsage = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_grain_pool_usage", - Help: "The current usage of the grain pool", + Help: "Current utilisation of the cart pool", + }) + negotiationCount = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_remote_negotiation_total", + Help: "The total number of remote host negotiations", + }) + connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cart_connected_remotes", + Help: "Number of connected remote hosts", + }) + cartMutationsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_mutations_total", + Help: "Total number of cart state mutations applied", + }) + cartMutationFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_mutation_failures_total", + Help: "Total number of failed cart state mutations", + }) + cartMutationLatencySeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "cart_mutation_latency_seconds", + Help: "Latency of cart mutations in seconds", + Buckets: prometheus.DefBuckets, + }, []string{"mutation"}) + cartActiveGrains = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cart_active_grains", + Help: "Number of active (resident) local grains", }) ) -// GrainPool interface remains legacy-compatible. +// GrainPool is the interface exposed to HTTP handlers and other subsystems. type GrainPool interface { Apply(id CartId, mutation interface{}) (*CartGrain, error) Get(id CartId) (*CartGrain, error) - // OwnerHost returns the primary owner host for a given cart id. OwnerHost(id CartId) (Host, bool) - // Hostname returns the hostname of the local pool implementation. Hostname() string + TakeOwnership(id CartId) + IsHealthy() bool + Close() } +// Host abstracts a remote node capable of proxying cart requests. type Host interface { Name() string Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error) } -// Ttl keeps expiry info +// Ttl tracks the expiry deadline for an in-memory grain. type Ttl struct { Expires time.Time Grain *CartGrain } -// GrainLocalPool now stores grains keyed by uint64 (CartKey). -type GrainLocalPool struct { - mu sync.RWMutex - grains map[uint64]*CartGrain // authoritative only +// CartPool merges the responsibilities that previously belonged to +// GrainLocalPool and SyncedPool. It provides local grain storage together +// with cluster coordination, ownership negotiation and expiry signalling. +type CartPool struct { + // Local grain state ----------------------------------------------------- + localMu sync.RWMutex + grains map[uint64]*CartGrain expiry []Ttl spawn func(id CartId) (*CartGrain, error) - Ttl time.Duration - PoolSize int + ttl time.Duration + poolSize int + + // Cluster coordination -------------------------------------------------- + hostname string + remoteMu sync.RWMutex + remoteOwners map[CartId]*RemoteHostGRPC + remoteHosts map[string]*RemoteHostGRPC + discardedHostHandler *DiscardedHostHandler + + // House-keeping --------------------------------------------------------- + purgeTicker *time.Ticker } -// NewGrainLocalPool constructs a new pool. -func NewGrainLocalPool(size int, ttl time.Duration, spawn func(id CartId) (*CartGrain, error)) *GrainLocalPool { - ret := &GrainLocalPool{ - spawn: spawn, - grains: make(map[uint64]*CartGrain), - expiry: make([]Ttl, 0), - Ttl: ttl, - PoolSize: size, +// RemoteHostGRPC mirrors the lightweight controller used for remote node +// interaction. +type RemoteHostGRPC struct { + Host string + HTTPBase string + Conn *grpc.ClientConn + Transport *http.Transport + Client *http.Client + ControlClient messages.ControlPlaneClient + MissedPings int +} + +func (h *RemoteHostGRPC) Name() string { + return h.Host +} + +func (h *RemoteHostGRPC) Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error) { + target := fmt.Sprintf("%s%s", h.HTTPBase, r.URL.RequestURI()) + var bodyCopy []byte + if r.Body != nil && r.Body != http.NoBody { + var err error + bodyCopy, err = io.ReadAll(r.Body) + if err != nil { + http.Error(w, "proxy read error", http.StatusBadGateway) + return false, err + } } - cartPurge := time.NewTicker(time.Minute) + if r.Body != nil { + r.Body.Close() + } + var reqBody io.Reader + if len(bodyCopy) > 0 { + reqBody = bytes.NewReader(bodyCopy) + } + req, err := http.NewRequestWithContext(r.Context(), r.Method, target, reqBody) + if err != nil { + http.Error(w, "proxy build error", http.StatusBadGateway) + return false, err + } + r.Body = io.NopCloser(bytes.NewReader(bodyCopy)) + req.Header.Set("X-Forwarded-Host", r.Host) + if idStr := id.String(); idStr != "" { + req.Header.Set("X-Cart-Id", idStr) + } + for k, v := range r.Header { + for _, vv := range v { + req.Header.Add(k, vv) + } + } + res, err := h.Client.Do(req) + if err != nil { + http.Error(w, "proxy request error", http.StatusBadGateway) + return false, err + } + defer res.Body.Close() + for k, v := range res.Header { + for _, vv := range v { + w.Header().Add(k, vv) + } + } + w.Header().Set("X-Cart-Owner-Routed", "true") + if res.StatusCode >= 200 && res.StatusCode <= 299 { + w.WriteHeader(res.StatusCode) + _, copyErr := io.Copy(w, res.Body) + if copyErr != nil { + return true, copyErr + } + return true, nil + } + return false, fmt.Errorf("proxy response status %d", res.StatusCode) +} + +func (r *RemoteHostGRPC) IsHealthy() bool { + return r.MissedPings < 3 +} + +// NewCartPool constructs a unified pool. Discovery may be nil for standalone +// deployments. +func NewCartPool(size int, ttl time.Duration, hostname string, spawn func(id CartId) (*CartGrain, error), discovery Discovery) (*CartPool, error) { + p := &CartPool{ + grains: make(map[uint64]*CartGrain), + expiry: make([]Ttl, 0), + spawn: spawn, + ttl: ttl, + poolSize: size, + hostname: hostname, + remoteOwners: make(map[CartId]*RemoteHostGRPC), + remoteHosts: make(map[string]*RemoteHostGRPC), + } + + p.discardedHostHandler = NewDiscardedHostHandler(1338) + p.discardedHostHandler.SetReconnectHandler(p.AddRemote) + + p.purgeTicker = time.NewTicker(time.Minute) go func() { - for range cartPurge.C { - ret.Purge() + for range p.purgeTicker.C { + p.Purge() } }() - return ret -} -// keyFromCartId derives the uint64 key from a legacy CartId deterministically. -func keyFromCartId(id CartId) uint64 { - return uint64(id) -} - -// storeGrain indexes a grain in both maps. -func (p *GrainLocalPool) storeGrain(id CartId, g *CartGrain) { - k := keyFromCartId(id) - p.grains[k] = g -} - -// deleteGrain removes a grain from both maps. -func (p *GrainLocalPool) deleteGrain(id CartId) { - k := keyFromCartId(id) - delete(p.grains, k) -} - -// SetAvailable pre-populates placeholder entries (legacy signature). -func (p *GrainLocalPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) { - p.mu.Lock() - defer p.mu.Unlock() - for id := range availableWithLastChangeUnix { - k := keyFromCartId(id) - if _, ok := p.grains[k]; !ok { - p.grains[k] = nil - p.expiry = append(p.expiry, Ttl{ - Expires: time.Now().Add(p.Ttl), - Grain: nil, - }) - } + if discovery != nil { + go p.startDiscovery(discovery) + } else { + log.Printf("No discovery configured; expecting manual AddRemote or static host injection") } + + return p, nil } -// Purge removes expired grains. -func (p *GrainLocalPool) Purge() { - lastChangeTime := time.Now().Add(-p.Ttl) - keepChanged := lastChangeTime.Unix() - - p.mu.Lock() - defer p.mu.Unlock() - - for i := 0; i < len(p.expiry); i++ { - item := p.expiry[i] - if item.Grain == nil { +// startDiscovery subscribes to cluster events and adds/removes hosts. +func (p *CartPool) startDiscovery(discovery Discovery) { + time.Sleep(3 * time.Second) // allow gRPC server startup + log.Printf("Starting discovery watcher") + ch, err := discovery.Watch() + if err != nil { + log.Printf("Discovery error: %v", err) + return + } + for evt := range ch { + if evt.Host == "" { continue } - if item.Expires.Before(time.Now()) { - if item.Grain.GetLastChange() > keepChanged { - log.Printf("Expired item %s changed, keeping", item.Grain.GetId()) - if i < len(p.expiry)-1 { - p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) - p.expiry = append(p.expiry, item) - } else { - // move last to end (noop) - p.expiry = append(p.expiry[:i], item) - } - } else { - log.Printf("Item %s expired", item.Grain.GetId()) - p.deleteGrain(item.Grain.GetId()) - if i < len(p.expiry)-1 { - p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) - } else { - p.expiry = p.expiry[:i] - } + switch evt.Type { + case watch.Deleted: + if p.IsKnown(evt.Host) { + p.RemoveHost(evt.Host) + } + default: + if !p.IsKnown(evt.Host) { + log.Printf("Discovered host %s", evt.Host) + p.AddRemote(evt.Host) } - } else { - break } } } -// RefreshExpiry resets the expiry timestamp for a living grain to now + TTL. -// Called after successful mutations to implement a sliding inactivity window. -func (p *GrainLocalPool) RefreshExpiry(id CartId) { - p.mu.Lock() - defer p.mu.Unlock() +// --------------------------------------------------------------------------- +// Local grain management +// --------------------------------------------------------------------------- + +func (p *CartPool) statsUpdate() { + p.localMu.RLock() + size := len(p.grains) + cap := p.poolSize + p.localMu.RUnlock() + poolGrains.Set(float64(size)) + poolSize.Set(float64(cap)) + if cap > 0 { + poolUsage.Set(float64(size) / float64(cap)) + } +} + +// LocalUsage returns the number of resident grains and configured capacity. +func (p *CartPool) LocalUsage() (int, int) { + p.localMu.RLock() + defer p.localMu.RUnlock() + return len(p.grains), p.poolSize +} + +// SetAvailable pre-populates placeholder entries. +func (p *CartPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) { + p.localMu.Lock() + defer p.localMu.Unlock() + for id := range availableWithLastChangeUnix { + k := uint64(id) + if _, ok := p.grains[k]; !ok { + p.grains[k] = nil + p.expiry = append(p.expiry, Ttl{Expires: time.Now().Add(p.ttl)}) + } + } + p.statsUpdate() +} + +// Purge removes expired grains and broadcasts expiry announcements so that +// other hosts drop stale ownership hints. +func (p *CartPool) Purge() { + now := time.Now() + keepChanged := now.Add(-p.ttl).Unix() + var expired []CartId + + p.localMu.Lock() + for i := 0; i < len(p.expiry); { + entry := p.expiry[i] + if entry.Grain == nil { + i++ + continue + } + if entry.Expires.After(now) { + break + } + if entry.Grain.GetLastChange() > keepChanged { + // Recently mutated: move to back. + p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) + p.expiry = append(p.expiry, entry) + continue + } + id := entry.Grain.GetId() + delete(p.grains, uint64(id)) + expired = append(expired, id) + if i < len(p.expiry)-1 { + p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) + } else { + p.expiry = p.expiry[:i] + } + } + p.localMu.Unlock() + + if len(expired) > 0 { + p.statsUpdate() + go p.broadcastExpiry(expired) + } +} + +// RefreshExpiry updates the TTL entry for a given grain. +func (p *CartPool) RefreshExpiry(id CartId) { + p.localMu.Lock() + defer p.localMu.Unlock() for i := range p.expiry { g := p.expiry[i].Grain if g != nil && g.Id == id { - p.expiry[i].Expires = time.Now().Add(p.Ttl) - break + p.expiry[i].Expires = time.Now().Add(p.ttl) + return } } + // If no entry existed, append one (safeguard for newly spawned grains). + p.expiry = append(p.expiry, Ttl{Expires: time.Now().Add(p.ttl), Grain: p.grains[uint64(id)]}) } -// GetGrains returns a legacy view of grains (copy) for compatibility. -func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain { - p.mu.RLock() - defer p.mu.RUnlock() +// DebugGrainCount returns the number of locally resident grains. +func (p *CartPool) DebugGrainCount() int { + p.localMu.RLock() + defer p.localMu.RUnlock() + return len(p.grains) +} + +// LocalCartIDs returns the currently owned cart ids (for control-plane RPCs). +func (p *CartPool) LocalCartIDs() []uint64 { + p.localMu.RLock() + defer p.localMu.RUnlock() + ids := make([]uint64, 0, len(p.grains)) + for _, g := range p.grains { + if g == nil { + continue + } + ids = append(ids, uint64(g.GetId())) + } + return ids +} + +// SnapshotGrains returns a copy of the currently resident grains keyed by id. +func (p *CartPool) SnapshotGrains() map[CartId]*CartGrain { + p.localMu.RLock() + defer p.localMu.RUnlock() out := make(map[CartId]*CartGrain, len(p.grains)) for _, g := range p.grains { if g != nil { @@ -190,97 +371,457 @@ func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain { return out } -// statsUpdate updates Prometheus gauges asynchronously. -func (p *GrainLocalPool) statsUpdate() { - go func(size int) { - l := float64(size) - ps := float64(p.PoolSize) - poolUsage.Set(l / ps) - poolGrains.Set(l) - poolSize.Set(ps) - }(len(p.grains)) +func (p *CartPool) removeLocalGrain(id CartId) { + p.localMu.Lock() + delete(p.grains, uint64(id)) + for i := range p.expiry { + if p.expiry[i].Grain != nil && p.expiry[i].Grain.GetId() == id { + p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) + break + } + } + p.localMu.Unlock() + p.statsUpdate() } -// GetGrain retrieves or spawns a grain (legacy id signature). -func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) { +func (p *CartPool) getLocalGrain(id CartId) (*CartGrain, error) { + key := uint64(id) grainLookups.Inc() - k := keyFromCartId(id) - p.mu.RLock() - grain, ok := p.grains[k] - p.mu.RUnlock() + p.localMu.RLock() + grain, ok := p.grains[key] + p.localMu.RUnlock() + if grain != nil && ok { + return grain, nil + } - var err error + p.localMu.Lock() + defer p.localMu.Unlock() + grain, ok = p.grains[key] if grain == nil || !ok { - p.mu.Lock() - // Re-check under write lock - grain, ok = p.grains[k] - if grain == nil || !ok { - // Capacity check - if len(p.grains) >= p.PoolSize && len(p.expiry) > 0 { - if p.expiry[0].Expires.Before(time.Now()) && p.expiry[0].Grain != nil { - oldId := p.expiry[0].Grain.GetId() - p.deleteGrain(oldId) - p.expiry = p.expiry[1:] - } else { - p.mu.Unlock() - return nil, fmt.Errorf("pool is full") - } - } - grain, err = p.spawn(id) - if err == nil { - p.storeGrain(id, grain) + if len(p.grains) >= p.poolSize && len(p.expiry) > 0 { + if p.expiry[0].Expires.Before(time.Now()) && p.expiry[0].Grain != nil { + oldID := p.expiry[0].Grain.GetId() + delete(p.grains, uint64(oldID)) + p.expiry = p.expiry[1:] + go p.broadcastExpiry([]CartId{oldID}) + } else { + return nil, fmt.Errorf("pool is full") } } - p.mu.Unlock() - p.statsUpdate() + spawned, err := p.spawn(id) + if err != nil { + return nil, err + } + p.grains[key] = spawned + p.expiry = append(p.expiry, Ttl{Expires: time.Now().Add(p.ttl), Grain: spawned}) + grain = spawned } - - return grain, err + go p.statsUpdate() + return grain, nil } -// Apply applies a mutation (legacy compatibility). -func (p *GrainLocalPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { - grain, err := p.GetGrain(id) - if err != nil || grain == nil { +// --------------------------------------------------------------------------- +// Cluster ownership and coordination +// --------------------------------------------------------------------------- + +func (p *CartPool) TakeOwnership(id CartId) { + p.broadcastOwnership([]CartId{id}) +} + +func (p *CartPool) AddRemote(host string) { + if host == "" || host == p.hostname { + return + } + + p.remoteMu.Lock() + if _, exists := p.remoteHosts[host]; exists { + p.remoteMu.Unlock() + return + } + p.remoteMu.Unlock() + + target := fmt.Sprintf("%s:1337", host) + dialCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + conn, err := grpc.DialContext(dialCtx, target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + cancel() + if err != nil { + log.Printf("AddRemote: dial %s failed: %v", target, err) + return + } + + controlClient := messages.NewControlPlaneClient(conn) + for retries := 0; retries < 3; retries++ { + ctx, pingCancel := context.WithTimeout(context.Background(), time.Second) + _, pingErr := controlClient.Ping(ctx, &messages.Empty{}) + pingCancel() + if pingErr == nil { + break + } + if retries == 2 { + log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr) + conn.Close() + return + } + time.Sleep(200 * time.Millisecond) + } + + transport := &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 120 * time.Second, + } + client := &http.Client{Transport: transport, Timeout: 10 * time.Second} + + remote := &RemoteHostGRPC{ + Host: host, + HTTPBase: fmt.Sprintf("http://%s:8080/cart", host), + Conn: conn, + Transport: transport, + Client: client, + ControlClient: controlClient, + } + + p.remoteMu.Lock() + p.remoteHosts[host] = remote + p.remoteMu.Unlock() + connectedRemotes.Set(float64(p.RemoteCount())) + + log.Printf("Connected to remote host %s", host) + + go p.pingLoop(remote) + go p.initializeRemote(remote) + go p.Negotiate() +} + +func (p *CartPool) initializeRemote(remote *RemoteHostGRPC) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + reply, err := remote.ControlClient.GetCartIds(ctx, &messages.Empty{}) + if err != nil { + log.Printf("Init remote %s: GetCartIds error: %v", remote.Host, err) + return + } + count := 0 + p.remoteMu.Lock() + for _, cid := range reply.CartIds { + id := CartId(cid) + if _, exists := p.remoteOwners[id]; !exists { + p.remoteOwners[id] = remote + } + count++ + } + p.remoteMu.Unlock() + log.Printf("Remote %s reported %d remote-owned carts", remote.Host, count) +} + +func (p *CartPool) RemoveHost(host string) { + p.remoteMu.Lock() + remote, exists := p.remoteHosts[host] + if exists { + delete(p.remoteHosts, host) + } + for id, owner := range p.remoteOwners { + if owner.Host == host { + delete(p.remoteOwners, id) + } + } + p.remoteMu.Unlock() + + if exists { + remote.Conn.Close() + } + connectedRemotes.Set(float64(p.RemoteCount())) +} + +func (p *CartPool) RemoteCount() int { + p.remoteMu.RLock() + defer p.remoteMu.RUnlock() + return len(p.remoteHosts) +} + +// RemoteHostNames returns a snapshot of connected remote host identifiers. +func (p *CartPool) RemoteHostNames() []string { + p.remoteMu.RLock() + defer p.remoteMu.RUnlock() + hosts := make([]string, 0, len(p.remoteHosts)) + for host := range p.remoteHosts { + hosts = append(hosts, host) + } + return hosts +} + +func (p *CartPool) IsKnown(host string) bool { + if host == p.hostname { + return true + } + p.remoteMu.RLock() + defer p.remoteMu.RUnlock() + _, ok := p.remoteHosts[host] + return ok +} + +func (p *CartPool) pingLoop(remote *RemoteHostGRPC) { + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + for range ticker.C { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + _, err := remote.ControlClient.Ping(ctx, &messages.Empty{}) + cancel() + if err != nil { + remote.MissedPings++ + log.Printf("Ping %s failed (%d)", remote.Host, remote.MissedPings) + if !remote.IsHealthy() { + log.Printf("Remote %s unhealthy, removing", remote.Host) + p.RemoveHost(remote.Host) + return + } + continue + } + remote.MissedPings = 0 + } +} + +func (p *CartPool) IsHealthy() bool { + p.remoteMu.RLock() + defer p.remoteMu.RUnlock() + for _, r := range p.remoteHosts { + if !r.IsHealthy() { + return false + } + } + return true +} + +func (p *CartPool) Negotiate() { + negotiationCount.Inc() + + p.remoteMu.RLock() + hosts := make([]string, 0, len(p.remoteHosts)+1) + hosts = append(hosts, p.hostname) + remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) + for h, r := range p.remoteHosts { + hosts = append(hosts, h) + remotes = append(remotes, r) + } + p.remoteMu.RUnlock() + + for _, r := range remotes { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + reply, err := r.ControlClient.Negotiate(ctx, &messages.NegotiateRequest{KnownHosts: hosts}) + cancel() + if err != nil { + log.Printf("Negotiate with %s failed: %v", r.Host, err) + continue + } + for _, h := range reply.Hosts { + if !p.IsKnown(h) { + p.AddRemote(h) + } + } + } +} + +func (p *CartPool) broadcastOwnership(ids []CartId) { + if len(ids) == 0 { + return + } + uids := make([]uint64, len(ids)) + for i, id := range ids { + uids[i] = uint64(id) + } + + p.remoteMu.RLock() + remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) + for _, r := range p.remoteHosts { + if r.IsHealthy() { + remotes = append(remotes, r) + } + } + p.remoteMu.RUnlock() + + for _, remote := range remotes { + go func(rh *RemoteHostGRPC) { + _, err := rh.ControlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{ + Host: p.hostname, + CartIds: uids, + }) + if err != nil { + log.Printf("ownership announce to %s failed: %v", rh.Host, err) + } + }(remote) + } +} + +func (p *CartPool) broadcastExpiry(ids []CartId) { + if len(ids) == 0 { + return + } + uids := make([]uint64, len(ids)) + for i, id := range ids { + uids[i] = uint64(id) + } + + p.remoteMu.RLock() + remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) + for _, r := range p.remoteHosts { + if r.IsHealthy() { + remotes = append(remotes, r) + } + } + p.remoteMu.RUnlock() + + for _, remote := range remotes { + go func(rh *RemoteHostGRPC) { + _, err := rh.ControlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{ + Host: p.hostname, + CartIds: uids, + }) + if err != nil { + log.Printf("expiry announce to %s failed: %v", rh.Host, err) + } + }(remote) + } +} + +func (p *CartPool) AdoptRemoteOwnership(host string, ids []string) { + if host == "" || host == p.hostname { + return + } + remoteHost, ok := p.remoteHosts[host] + if !ok { + log.Printf("AdoptRemoteOwnership: unknown host %s", host) + } + p.remoteMu.Lock() + defer p.remoteMu.Unlock() + for _, s := range ids { + if s == "" { + continue + } + parsed, ok := ParseCartId(s) + if !ok { + continue + } + if existing, ok := p.remoteOwners[parsed]; ok && existing != remoteHost { + continue + } + p.localMu.RLock() + _, localHas := p.grains[uint64(parsed)] + p.localMu.RUnlock() + if localHas { + continue + } + p.remoteOwners[parsed] = remoteHost + } +} + +func (p *CartPool) HandleRemoteExpiry(host string, ids []uint64) { + if host == "" || host == p.hostname { + return + } + p.remoteMu.Lock() + defer p.remoteMu.Unlock() + for _, raw := range ids { + id := CartId(raw) + if owner, ok := p.remoteOwners[id]; ok && owner.Host == host { + delete(p.remoteOwners, id) + } + } +} + +func (p *CartPool) getOrClaimGrain(id CartId) (*CartGrain, error) { + p.localMu.RLock() + grain, exists := p.grains[uint64(id)] + p.localMu.RUnlock() + if exists && grain != nil { + return grain, nil + } + + p.remoteMu.RLock() + remoteHost, found := p.remoteOwners[id] + p.remoteMu.RUnlock() + if found && remoteHost != nil && remoteHost.Host != "" { + return nil, ErrNotOwner + } + + grain, err := p.getLocalGrain(id) + if err != nil { return nil, err } - result, applyErr := grain.Apply(mutation, false) - // Sliding TTL: refresh expiry on successful non-replay mutation (Apply always non-replay here) - if applyErr == nil && result != nil { - p.RefreshExpiry(id) + go p.broadcastOwnership([]CartId{id}) + return grain, nil +} + +// ErrNotOwner is returned when a cart belongs to another host. +var ErrNotOwner = fmt.Errorf("not owner") + +// Apply applies a mutation to a grain. +func (p *CartPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { + grain, err := p.getOrClaimGrain(id) + if err != nil { + return nil, err } + start := time.Now() + result, applyErr := grain.Apply(mutation, false) + mutationType := "unknown" + if mutation != nil { + if t := reflect.TypeOf(mutation); t != nil { + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + if t.Name() != "" { + mutationType = t.Name() + } + } + } + cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds()) + + if applyErr == nil && result != nil { + cartMutationsTotal.Inc() + p.RefreshExpiry(id) + cartActiveGrains.Set(float64(p.DebugGrainCount())) + } else if applyErr != nil { + cartMutationFailuresTotal.Inc() + } + return result, applyErr } -// Get returns current state (legacy wrapper). -func (p *GrainLocalPool) Get(id CartId) (*CartGrain, error) { - return p.GetGrain(id) +// Get returns the current state of a grain. +func (p *CartPool) Get(id CartId) (*CartGrain, error) { + grain, err := p.getOrClaimGrain(id) + if err != nil { + return nil, err + } + return grain.GetCurrentState() } -// DebugGrainCount returns counts for debugging. -func (p *GrainLocalPool) DebugGrainCount() (authoritative int) { - p.mu.RLock() - defer p.mu.RUnlock() - return len(p.grains) +// OwnerHost reports the remote owner (if any) for the supplied cart id. +func (p *CartPool) OwnerHost(id CartId) (Host, bool) { + p.remoteMu.RLock() + defer p.remoteMu.RUnlock() + owner, ok := p.remoteOwners[id] + return owner, ok } -// UnsafePointerToLegacyMap exposes the legacy map pointer (for transitional -// tests that still poke the field directly). DO NOT rely on this long-term. -func (p *GrainLocalPool) UnsafePointerToLegacyMap() uintptr { - // Legacy map removed; retained only to satisfy any transitional callers. - return 0 +// Hostname returns the local hostname (pod IP). +func (p *CartPool) Hostname() string { + return p.hostname } -// OwnerHost implements the extended GrainPool interface for the standalone -// local pool. Since the local pool has no concept of multi-host ownership, -// it returns an empty string. Callers can treat empty as "local host". -func (p *GrainLocalPool) OwnerHost(id CartId) (Host, bool) { - return nil, false -} - -// Hostname returns a blank string because GrainLocalPool does not track a node -// identity. (SyncedPool will return the real hostname.) -func (p *GrainLocalPool) Hostname() string { - return "" +// Close notifies remotes that this host is shutting down. +func (p *CartPool) Close() { + p.remoteMu.Lock() + defer p.remoteMu.Unlock() + for _, r := range p.remoteHosts { + go func(rh *RemoteHostGRPC) { + _, err := rh.ControlClient.Closing(context.Background(), &messages.ClosingNotice{Host: p.hostname}) + if err != nil { + log.Printf("Close notify to %s failed: %v", rh.Host, err) + } + }(r) + } + if p.purgeTicker != nil { + p.purgeTicker.Stop() + } } diff --git a/grpc_server.go b/grpc_server.go index d4c4990..a570eab 100644 --- a/grpc_server.go +++ b/grpc_server.go @@ -17,21 +17,19 @@ import ( type cartActorGRPCServer struct { messages.UnimplementedControlPlaneServer - //pool GrainPool // For cart state mutations and queries - syncedPool *SyncedPool // For cluster membership and control + pool *CartPool } // NewCartActorGRPCServer creates and initializes the server. -func NewCartActorGRPCServer(syncedPool *SyncedPool) *cartActorGRPCServer { +func NewCartActorGRPCServer(pool *CartPool) *cartActorGRPCServer { return &cartActorGRPCServer{ - //pool: pool, - syncedPool: syncedPool, + pool: pool, } } func (s *cartActorGRPCServer) AnnounceOwnership(ctx context.Context, req *messages.OwnershipAnnounce) (*messages.OwnerChangeAck, error) { for _, cartId := range req.CartIds { - s.syncedPool.removeLocalGrain(CartId(cartId)) + s.pool.removeLocalGrain(CartId(cartId)) } log.Printf("Ack count: %d", len(req.CartIds)) return &messages.OwnerChangeAck{ @@ -40,13 +38,21 @@ func (s *cartActorGRPCServer) AnnounceOwnership(ctx context.Context, req *messag }, nil } +func (s *cartActorGRPCServer) AnnounceExpiry(ctx context.Context, req *messages.ExpiryAnnounce) (*messages.OwnerChangeAck, error) { + s.pool.HandleRemoteExpiry(req.GetHost(), req.GetCartIds()) + return &messages.OwnerChangeAck{ + Accepted: true, + Message: "expiry acknowledged", + }, nil +} + // ControlPlane: Ping func (s *cartActorGRPCServer) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) { // Expose cart owner cookie (first-touch owner = this host) for HTTP gateways translating gRPC metadata. // Gateways that propagate Set-Cookie can help establish sticky sessions at the edge. //_ = grpc.SendHeader(ctx, metadata.Pairs("set-cookie", fmt.Sprintf("cartowner=%s; Path=/; HttpOnly", s.syncedPool.Hostname()))) return &messages.PingReply{ - Host: s.syncedPool.Hostname(), + Host: s.pool.Hostname(), UnixTime: time.Now().Unix(), }, nil } @@ -61,13 +67,11 @@ func (s *cartActorGRPCServer) Negotiate(ctx context.Context, req *messages.Negot } } // This host - hostSet[s.syncedPool.Hostname()] = struct{}{} + hostSet[s.pool.Hostname()] = struct{}{} // Known remotes - s.syncedPool.mu.RLock() - for h := range s.syncedPool.remoteHosts { + for _, h := range s.pool.RemoteHostNames() { hostSet[h] = struct{}{} } - s.syncedPool.mu.RUnlock() out := make([]string, 0, len(hostSet)) for h := range hostSet { @@ -78,22 +82,13 @@ func (s *cartActorGRPCServer) Negotiate(ctx context.Context, req *messages.Negot // ControlPlane: GetCartIds (locally owned carts only) func (s *cartActorGRPCServer) GetCartIds(ctx context.Context, _ *messages.Empty) (*messages.CartIdsReply, error) { - s.syncedPool.local.mu.RLock() - ids := make([]uint64, 0, len(s.syncedPool.local.grains)) - for _, g := range s.syncedPool.local.grains { - if g == nil { - continue - } - ids = append(ids, uint64(g.GetId())) - } - s.syncedPool.local.mu.RUnlock() - return &messages.CartIdsReply{CartIds: ids}, nil + return &messages.CartIdsReply{CartIds: s.pool.LocalCartIDs()}, nil } // ControlPlane: Closing (peer shutdown notification) func (s *cartActorGRPCServer) Closing(ctx context.Context, req *messages.ClosingNotice) (*messages.OwnerChangeAck, error) { if req.GetHost() != "" { - s.syncedPool.RemoveHost(req.GetHost()) + s.pool.RemoveHost(req.GetHost()) } return &messages.OwnerChangeAck{ Accepted: true, @@ -103,14 +98,14 @@ func (s *cartActorGRPCServer) Closing(ctx context.Context, req *messages.Closing // StartGRPCServer configures and starts the unified gRPC server on the given address. // It registers both the CartActor and ControlPlane services. -func StartGRPCServer(addr string, syncedPool *SyncedPool) (*grpc.Server, error) { +func StartGRPCServer(addr string, pool *CartPool) (*grpc.Server, error) { lis, err := net.Listen("tcp", addr) if err != nil { return nil, fmt.Errorf("failed to listen: %w", err) } grpcServer := grpc.NewServer() - server := NewCartActorGRPCServer(syncedPool) + server := NewCartActorGRPCServer(pool) messages.RegisterControlPlaneServer(grpcServer, server) reflection.Register(grpcServer) diff --git a/main.go b/main.go index cf7426b..634112c 100644 --- a/main.go +++ b/main.go @@ -60,15 +60,12 @@ func init() { } type App struct { - pool *GrainLocalPool + pool *CartPool storage *DiskStorage } func (a *App) Save() error { - - a.pool.mu.RLock() - defer a.pool.mu.RUnlock() - for id, grain := range a.pool.GetGrains() { + for id, grain := range a.pool.SnapshotGrains() { if grain == nil { continue } @@ -80,19 +77,7 @@ func (a *App) Save() error { } } } - return nil - -} - -func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) { - err := a.Save() - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) - } else { - w.WriteHeader(http.StatusCreated) - } } var podIp = os.Getenv("POD_IP") @@ -121,24 +106,6 @@ func getCountryFromHost(host string) string { return "se" } -func getCheckoutOrder(host string, cartId CartId) *messages.CreateCheckoutOrder { - baseUrl := fmt.Sprintf("https://%s", host) - cartBaseUrl := os.Getenv("CART_BASE_URL") - if cartBaseUrl == "" { - cartBaseUrl = "https://cart.tornberg.me" - } - country := getCountryFromHost(host) - - return &messages.CreateCheckoutOrder{ - Terms: fmt.Sprintf("%s/terms", baseUrl), - Checkout: fmt.Sprintf("%s/checkout?order_id={checkout.order.id}", baseUrl), - Confirmation: fmt.Sprintf("%s/confirmation/{checkout.order.id}", baseUrl), - Validation: fmt.Sprintf("%s/validation", cartBaseUrl), - Push: fmt.Sprintf("%s/push?order_id={checkout.order.id}", cartBaseUrl), - Country: country, - } -} - func GetDiscovery() Discovery { if podIp == "" { return nil @@ -157,32 +124,27 @@ func GetDiscovery() Discovery { } func main() { - storage, err := NewDiskStorage(fmt.Sprintf("data/s_%s.gob", name)) if err != nil { log.Printf("Error loading state: %v\n", err) } - localPool := NewGrainLocalPool(2*65535, 15*time.Minute, spawn) + pool, err := NewCartPool(2*65535, 15*time.Minute, podIp, spawn, GetDiscovery()) + if err != nil { + log.Fatalf("Error creating cart pool: %v\n", err) + } app := &App{ - pool: localPool, + pool: pool, storage: storage, } - syncedPool, err := NewSyncedPool(localPool, podIp, GetDiscovery()) - if err != nil { - log.Fatalf("Error creating synced pool: %v\n", err) - } - - // Start unified gRPC server (CartActor + ControlPlane) replacing legacy RPC server on :1337 - // TODO: Remove any remaining legacy RPC server references and deprecated frame-based code after full gRPC migration is validated. - grpcSrv, err := StartGRPCServer(":1337", syncedPool) + grpcSrv, err := StartGRPCServer(":1337", pool) if err != nil { log.Fatalf("Error starting gRPC server: %v\n", err) } defer grpcSrv.GracefulStop() go func() { - for range time.Tick(time.Minute * 10) { + for range time.Tick(time.Minute * 5) { err := app.Save() if err != nil { log.Printf("Error saving: %v\n", err) @@ -193,7 +155,7 @@ func main() { Url: amqpUrl, } - syncedServer := NewPoolServer(syncedPool, fmt.Sprintf("%s, %s", name, podIp)) + syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp)) mux := http.NewServeMux() mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve())) // only for local @@ -210,16 +172,13 @@ func main() { mux.Handle("/metrics", promhttp.Handler()) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { // Grain pool health: simple capacity check (mirrors previous GrainHandler.IsHealthy) - app.pool.mu.RLock() - grainCount := len(app.pool.grains) - capacity := app.pool.PoolSize - app.pool.mu.RUnlock() + grainCount, capacity := app.pool.LocalUsage() if grainCount >= capacity { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("grain pool at capacity")) return } - if !syncedPool.IsHealthy() { + if !pool.IsHealthy() { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("control plane not healthy")) return @@ -382,8 +341,9 @@ func main() { go func() { sig := <-sigs fmt.Println("Shutting down due to signal:", sig) - go syncedPool.Close() app.Save() + pool.Close() + done <- true }() diff --git a/pool-server.go b/pool-server.go index fee45bb..769b2ab 100644 --- a/pool-server.go +++ b/pool-server.go @@ -265,14 +265,6 @@ func (s *PoolServer) HandleCheckout(w http.ResponseWriter, r *http.Request, id C return json.NewEncoder(w).Encode(klarnaOrder) } -/* -Legacy wrapper NewCartId removed. -Use the unified generator in cart_id.go: - id, err := NewCartId() -or panic-on-error helper: - id := MustNewCartId() -*/ - func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) error { return func(w http.ResponseWriter, r *http.Request) error { var id CartId @@ -307,11 +299,7 @@ func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.R id = parsed } } - // if ownershipProxyAfterExtraction != nil { - // if handled, err := ownershipProxyAfterExtraction(id, w, r); handled || err != nil { - // return err - // } - // } + return fn(id, w, r) } } @@ -356,11 +344,11 @@ func CartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request, cartId CartId) error) func(cartId CartId, w http.ResponseWriter, r *http.Request) error { return func(cartId CartId, w http.ResponseWriter, r *http.Request) error { if ownerHost, ok := s.pool.OwnerHost(cartId); ok { - ok, err := ownerHost.Proxy(cartId, w, r) - if ok || err != nil { - log.Printf("proxy failed: %v", err) - // todo take ownership!! - } else { + handled, err := ownerHost.Proxy(cartId, w, r) + if err != nil { + log.Printf("proxy failed: %v, taking ownership", err) + s.pool.TakeOwnership(cartId) + } else if handled { return nil } } @@ -371,75 +359,7 @@ func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request } } -//var ownershipProxyAfterExtraction func(cartId CartId, w http.ResponseWriter, r *http.Request) (handled bool, err error) - func (s *PoolServer) Serve() *http.ServeMux { - // // Install ownership proxy hook that runs AFTER id extraction (cookie OR path) - // ownershipProxyAfterExtraction = func(cartId CartId, w http.ResponseWriter, r *http.Request) (bool, error) { - // if cartId.String() == "" { - // return false, nil - // } - // owner := s.pool.OwnerHost(cartId) - // if owner == "" || owner == s.pool.Hostname() { - // // Set / refresh cartowner cookie pointing to the local host (claim or already owned). - // localHost := owner - // if localHost == "" { - // localHost = s.pool.Hostname() - // } - // http.SetCookie(w, &http.Cookie{ - // Name: "cartowner", - // Value: localHost, - // Path: "/", - // HttpOnly: true, - // SameSite: http.SameSiteLaxMode, - // }) - // return false, nil - // } - // // For remote ownership set cartowner cookie to remote host for sticky sessions. - // http.SetCookie(w, &http.Cookie{ - // Name: "cartowner", - // Value: owner, - // Path: "/", - // HttpOnly: true, - // SameSite: http.SameSiteLaxMode, - // }) - // // Proxy logic (simplified): reuse existing request to owning host on same port. - // target := "http://" + owner + r.URL.Path - // if q := r.URL.RawQuery; q != "" { - // target += "?" + q - // } - // req, err := http.NewRequestWithContext(r.Context(), r.Method, target, r.Body) - // if err != nil { - // http.Error(w, "proxy build error", http.StatusBadGateway) - // return true, err - // } - // for k, v := range r.Header { - // for _, vv := range v { - // req.Header.Add(k, vv) - // } - // } - // req.Header.Set("X-Forwarded-Host", r.Host) - // req.Header.Set("X-Cart-Id", cartId.String()) - // req.Header.Set("X-Cart-Owner", owner) - // resp, err := http.DefaultClient.Do(req) - // if err != nil { - // http.Error(w, "proxy upstream error", http.StatusBadGateway) - // return true, err - // } - // defer resp.Body.Close() - // for k, v := range resp.Header { - // for _, vv := range v { - // w.Header().Add(k, vv) - // } - // } - // w.Header().Set("X-Cart-Owner-Routed", "true") - // w.WriteHeader(resp.StatusCode) - // _, copyErr := io.Copy(w, resp.Body) - // if copyErr != nil { - // return true, copyErr - // } - // return true, nil - // } mux := http.NewServeMux() mux.HandleFunc("OPTIONS /", func(w http.ResponseWriter, r *http.Request) { diff --git a/proto/control_plane.pb.go b/proto/control_plane.pb.go index 3a7483a..6478a54 100644 --- a/proto/control_plane.pb.go +++ b/proto/control_plane.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 -// protoc v3.21.12 -// source: control_plane.proto +// protoc-gen-go v1.36.9 +// protoc v6.32.1 +// source: proto/control_plane.proto package messages @@ -30,7 +30,7 @@ type Empty struct { func (x *Empty) Reset() { *x = Empty{} - mi := &file_control_plane_proto_msgTypes[0] + mi := &file_proto_control_plane_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -42,7 +42,7 @@ func (x *Empty) String() string { func (*Empty) ProtoMessage() {} func (x *Empty) ProtoReflect() protoreflect.Message { - mi := &file_control_plane_proto_msgTypes[0] + mi := &file_proto_control_plane_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -55,7 +55,7 @@ func (x *Empty) ProtoReflect() protoreflect.Message { // Deprecated: Use Empty.ProtoReflect.Descriptor instead. func (*Empty) Descriptor() ([]byte, []int) { - return file_control_plane_proto_rawDescGZIP(), []int{0} + return file_proto_control_plane_proto_rawDescGZIP(), []int{0} } // Ping reply includes responding host and its current unix time (seconds). @@ -69,7 +69,7 @@ type PingReply struct { func (x *PingReply) Reset() { *x = PingReply{} - mi := &file_control_plane_proto_msgTypes[1] + mi := &file_proto_control_plane_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -81,7 +81,7 @@ func (x *PingReply) String() string { func (*PingReply) ProtoMessage() {} func (x *PingReply) ProtoReflect() protoreflect.Message { - mi := &file_control_plane_proto_msgTypes[1] + mi := &file_proto_control_plane_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -94,7 +94,7 @@ func (x *PingReply) ProtoReflect() protoreflect.Message { // Deprecated: Use PingReply.ProtoReflect.Descriptor instead. func (*PingReply) Descriptor() ([]byte, []int) { - return file_control_plane_proto_rawDescGZIP(), []int{1} + return file_proto_control_plane_proto_rawDescGZIP(), []int{1} } func (x *PingReply) GetHost() string { @@ -121,7 +121,7 @@ type NegotiateRequest struct { func (x *NegotiateRequest) Reset() { *x = NegotiateRequest{} - mi := &file_control_plane_proto_msgTypes[2] + mi := &file_proto_control_plane_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -133,7 +133,7 @@ func (x *NegotiateRequest) String() string { func (*NegotiateRequest) ProtoMessage() {} func (x *NegotiateRequest) ProtoReflect() protoreflect.Message { - mi := &file_control_plane_proto_msgTypes[2] + mi := &file_proto_control_plane_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -146,7 +146,7 @@ func (x *NegotiateRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use NegotiateRequest.ProtoReflect.Descriptor instead. func (*NegotiateRequest) Descriptor() ([]byte, []int) { - return file_control_plane_proto_rawDescGZIP(), []int{2} + return file_proto_control_plane_proto_rawDescGZIP(), []int{2} } func (x *NegotiateRequest) GetKnownHosts() []string { @@ -166,7 +166,7 @@ type NegotiateReply struct { func (x *NegotiateReply) Reset() { *x = NegotiateReply{} - mi := &file_control_plane_proto_msgTypes[3] + mi := &file_proto_control_plane_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -178,7 +178,7 @@ func (x *NegotiateReply) String() string { func (*NegotiateReply) ProtoMessage() {} func (x *NegotiateReply) ProtoReflect() protoreflect.Message { - mi := &file_control_plane_proto_msgTypes[3] + mi := &file_proto_control_plane_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -191,7 +191,7 @@ func (x *NegotiateReply) ProtoReflect() protoreflect.Message { // Deprecated: Use NegotiateReply.ProtoReflect.Descriptor instead. func (*NegotiateReply) Descriptor() ([]byte, []int) { - return file_control_plane_proto_rawDescGZIP(), []int{3} + return file_proto_control_plane_proto_rawDescGZIP(), []int{3} } func (x *NegotiateReply) GetHosts() []string { @@ -211,7 +211,7 @@ type CartIdsReply struct { func (x *CartIdsReply) Reset() { *x = CartIdsReply{} - mi := &file_control_plane_proto_msgTypes[4] + mi := &file_proto_control_plane_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -223,7 +223,7 @@ func (x *CartIdsReply) String() string { func (*CartIdsReply) ProtoMessage() {} func (x *CartIdsReply) ProtoReflect() protoreflect.Message { - mi := &file_control_plane_proto_msgTypes[4] + mi := &file_proto_control_plane_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -236,7 +236,7 @@ func (x *CartIdsReply) ProtoReflect() protoreflect.Message { // Deprecated: Use CartIdsReply.ProtoReflect.Descriptor instead. func (*CartIdsReply) Descriptor() ([]byte, []int) { - return file_control_plane_proto_rawDescGZIP(), []int{4} + return file_proto_control_plane_proto_rawDescGZIP(), []int{4} } func (x *CartIdsReply) GetCartIds() []uint64 { @@ -257,7 +257,7 @@ type OwnerChangeAck struct { func (x *OwnerChangeAck) Reset() { *x = OwnerChangeAck{} - mi := &file_control_plane_proto_msgTypes[5] + mi := &file_proto_control_plane_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -269,7 +269,7 @@ func (x *OwnerChangeAck) String() string { func (*OwnerChangeAck) ProtoMessage() {} func (x *OwnerChangeAck) ProtoReflect() protoreflect.Message { - mi := &file_control_plane_proto_msgTypes[5] + mi := &file_proto_control_plane_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -282,7 +282,7 @@ func (x *OwnerChangeAck) ProtoReflect() protoreflect.Message { // Deprecated: Use OwnerChangeAck.ProtoReflect.Descriptor instead. func (*OwnerChangeAck) Descriptor() ([]byte, []int) { - return file_control_plane_proto_rawDescGZIP(), []int{5} + return file_proto_control_plane_proto_rawDescGZIP(), []int{5} } func (x *OwnerChangeAck) GetAccepted() bool { @@ -309,7 +309,7 @@ type ClosingNotice struct { func (x *ClosingNotice) Reset() { *x = ClosingNotice{} - mi := &file_control_plane_proto_msgTypes[6] + mi := &file_proto_control_plane_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -321,7 +321,7 @@ func (x *ClosingNotice) String() string { func (*ClosingNotice) ProtoMessage() {} func (x *ClosingNotice) ProtoReflect() protoreflect.Message { - mi := &file_control_plane_proto_msgTypes[6] + mi := &file_proto_control_plane_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -334,7 +334,7 @@ func (x *ClosingNotice) ProtoReflect() protoreflect.Message { // Deprecated: Use ClosingNotice.ProtoReflect.Descriptor instead. func (*ClosingNotice) Descriptor() ([]byte, []int) { - return file_control_plane_proto_rawDescGZIP(), []int{6} + return file_proto_control_plane_proto_rawDescGZIP(), []int{6} } func (x *ClosingNotice) GetHost() string { @@ -356,7 +356,7 @@ type OwnershipAnnounce struct { func (x *OwnershipAnnounce) Reset() { *x = OwnershipAnnounce{} - mi := &file_control_plane_proto_msgTypes[7] + mi := &file_proto_control_plane_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -368,7 +368,7 @@ func (x *OwnershipAnnounce) String() string { func (*OwnershipAnnounce) ProtoMessage() {} func (x *OwnershipAnnounce) ProtoReflect() protoreflect.Message { - mi := &file_control_plane_proto_msgTypes[7] + mi := &file_proto_control_plane_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -381,7 +381,7 @@ func (x *OwnershipAnnounce) ProtoReflect() protoreflect.Message { // Deprecated: Use OwnershipAnnounce.ProtoReflect.Descriptor instead. func (*OwnershipAnnounce) Descriptor() ([]byte, []int) { - return file_control_plane_proto_rawDescGZIP(), []int{7} + return file_proto_control_plane_proto_rawDescGZIP(), []int{7} } func (x *OwnershipAnnounce) GetHost() string { @@ -398,11 +398,64 @@ func (x *OwnershipAnnounce) GetCartIds() []uint64 { return nil } -var File_control_plane_proto protoreflect.FileDescriptor +// ExpiryAnnounce broadcasts that a host evicted the provided cart IDs. +type ExpiryAnnounce struct { + state protoimpl.MessageState `protogen:"open.v1"` + Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` + CartIds []uint64 `protobuf:"varint,2,rep,packed,name=cart_ids,json=cartIds,proto3" json:"cart_ids,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} -const file_control_plane_proto_rawDesc = "" + +func (x *ExpiryAnnounce) Reset() { + *x = ExpiryAnnounce{} + mi := &file_proto_control_plane_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ExpiryAnnounce) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExpiryAnnounce) ProtoMessage() {} + +func (x *ExpiryAnnounce) ProtoReflect() protoreflect.Message { + mi := &file_proto_control_plane_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExpiryAnnounce.ProtoReflect.Descriptor instead. +func (*ExpiryAnnounce) Descriptor() ([]byte, []int) { + return file_proto_control_plane_proto_rawDescGZIP(), []int{8} +} + +func (x *ExpiryAnnounce) GetHost() string { + if x != nil { + return x.Host + } + return "" +} + +func (x *ExpiryAnnounce) GetCartIds() []uint64 { + if x != nil { + return x.CartIds + } + return nil +} + +var File_proto_control_plane_proto protoreflect.FileDescriptor + +const file_proto_control_plane_proto_rawDesc = "" + "\n" + - "\x13control_plane.proto\x12\bmessages\"\a\n" + + "\x19proto/control_plane.proto\x12\bmessages\"\a\n" + "\x05Empty\"<\n" + "\tPingReply\x12\x12\n" + "\x04host\x18\x01 \x01(\tR\x04host\x12\x1b\n" + @@ -421,29 +474,33 @@ const file_control_plane_proto_rawDesc = "" + "\x04host\x18\x01 \x01(\tR\x04host\"B\n" + "\x11OwnershipAnnounce\x12\x12\n" + "\x04host\x18\x01 \x01(\tR\x04host\x12\x19\n" + - "\bcart_ids\x18\x02 \x03(\x04R\acartIds2\xc0\x02\n" + + "\bcart_ids\x18\x02 \x03(\x04R\acartIds\"?\n" + + "\x0eExpiryAnnounce\x12\x12\n" + + "\x04host\x18\x01 \x01(\tR\x04host\x12\x19\n" + + "\bcart_ids\x18\x02 \x03(\x04R\acartIds2\x86\x03\n" + "\fControlPlane\x12,\n" + "\x04Ping\x12\x0f.messages.Empty\x1a\x13.messages.PingReply\x12A\n" + "\tNegotiate\x12\x1a.messages.NegotiateRequest\x1a\x18.messages.NegotiateReply\x125\n" + "\n" + "GetCartIds\x12\x0f.messages.Empty\x1a\x16.messages.CartIdsReply\x12J\n" + - "\x11AnnounceOwnership\x12\x1b.messages.OwnershipAnnounce\x1a\x18.messages.OwnerChangeAck\x12<\n" + + "\x11AnnounceOwnership\x12\x1b.messages.OwnershipAnnounce\x1a\x18.messages.OwnerChangeAck\x12D\n" + + "\x0eAnnounceExpiry\x12\x18.messages.ExpiryAnnounce\x1a\x18.messages.OwnerChangeAck\x12<\n" + "\aClosing\x12\x17.messages.ClosingNotice\x1a\x18.messages.OwnerChangeAckB.Z,git.tornberg.me/go-cart-actor/proto;messagesb\x06proto3" var ( - file_control_plane_proto_rawDescOnce sync.Once - file_control_plane_proto_rawDescData []byte + file_proto_control_plane_proto_rawDescOnce sync.Once + file_proto_control_plane_proto_rawDescData []byte ) -func file_control_plane_proto_rawDescGZIP() []byte { - file_control_plane_proto_rawDescOnce.Do(func() { - file_control_plane_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_control_plane_proto_rawDesc), len(file_control_plane_proto_rawDesc))) +func file_proto_control_plane_proto_rawDescGZIP() []byte { + file_proto_control_plane_proto_rawDescOnce.Do(func() { + file_proto_control_plane_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_proto_control_plane_proto_rawDesc), len(file_proto_control_plane_proto_rawDesc))) }) - return file_control_plane_proto_rawDescData + return file_proto_control_plane_proto_rawDescData } -var file_control_plane_proto_msgTypes = make([]protoimpl.MessageInfo, 8) -var file_control_plane_proto_goTypes = []any{ +var file_proto_control_plane_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_proto_control_plane_proto_goTypes = []any{ (*Empty)(nil), // 0: messages.Empty (*PingReply)(nil), // 1: messages.PingReply (*NegotiateRequest)(nil), // 2: messages.NegotiateRequest @@ -452,45 +509,48 @@ var file_control_plane_proto_goTypes = []any{ (*OwnerChangeAck)(nil), // 5: messages.OwnerChangeAck (*ClosingNotice)(nil), // 6: messages.ClosingNotice (*OwnershipAnnounce)(nil), // 7: messages.OwnershipAnnounce + (*ExpiryAnnounce)(nil), // 8: messages.ExpiryAnnounce } -var file_control_plane_proto_depIdxs = []int32{ +var file_proto_control_plane_proto_depIdxs = []int32{ 0, // 0: messages.ControlPlane.Ping:input_type -> messages.Empty 2, // 1: messages.ControlPlane.Negotiate:input_type -> messages.NegotiateRequest 0, // 2: messages.ControlPlane.GetCartIds:input_type -> messages.Empty 7, // 3: messages.ControlPlane.AnnounceOwnership:input_type -> messages.OwnershipAnnounce - 6, // 4: messages.ControlPlane.Closing:input_type -> messages.ClosingNotice - 1, // 5: messages.ControlPlane.Ping:output_type -> messages.PingReply - 3, // 6: messages.ControlPlane.Negotiate:output_type -> messages.NegotiateReply - 4, // 7: messages.ControlPlane.GetCartIds:output_type -> messages.CartIdsReply - 5, // 8: messages.ControlPlane.AnnounceOwnership:output_type -> messages.OwnerChangeAck - 5, // 9: messages.ControlPlane.Closing:output_type -> messages.OwnerChangeAck - 5, // [5:10] is the sub-list for method output_type - 0, // [0:5] is the sub-list for method input_type + 8, // 4: messages.ControlPlane.AnnounceExpiry:input_type -> messages.ExpiryAnnounce + 6, // 5: messages.ControlPlane.Closing:input_type -> messages.ClosingNotice + 1, // 6: messages.ControlPlane.Ping:output_type -> messages.PingReply + 3, // 7: messages.ControlPlane.Negotiate:output_type -> messages.NegotiateReply + 4, // 8: messages.ControlPlane.GetCartIds:output_type -> messages.CartIdsReply + 5, // 9: messages.ControlPlane.AnnounceOwnership:output_type -> messages.OwnerChangeAck + 5, // 10: messages.ControlPlane.AnnounceExpiry:output_type -> messages.OwnerChangeAck + 5, // 11: messages.ControlPlane.Closing:output_type -> messages.OwnerChangeAck + 6, // [6:12] is the sub-list for method output_type + 0, // [0:6] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name } -func init() { file_control_plane_proto_init() } -func file_control_plane_proto_init() { - if File_control_plane_proto != nil { +func init() { file_proto_control_plane_proto_init() } +func file_proto_control_plane_proto_init() { + if File_proto_control_plane_proto != nil { return } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_control_plane_proto_rawDesc), len(file_control_plane_proto_rawDesc)), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_control_plane_proto_rawDesc), len(file_proto_control_plane_proto_rawDesc)), NumEnums: 0, - NumMessages: 8, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, - GoTypes: file_control_plane_proto_goTypes, - DependencyIndexes: file_control_plane_proto_depIdxs, - MessageInfos: file_control_plane_proto_msgTypes, + GoTypes: file_proto_control_plane_proto_goTypes, + DependencyIndexes: file_proto_control_plane_proto_depIdxs, + MessageInfos: file_proto_control_plane_proto_msgTypes, }.Build() - File_control_plane_proto = out.File - file_control_plane_proto_goTypes = nil - file_control_plane_proto_depIdxs = nil + File_proto_control_plane_proto = out.File + file_proto_control_plane_proto_goTypes = nil + file_proto_control_plane_proto_depIdxs = nil } diff --git a/proto/control_plane.proto b/proto/control_plane.proto index 03bf6c8..316e46b 100644 --- a/proto/control_plane.proto +++ b/proto/control_plane.proto @@ -59,6 +59,12 @@ message OwnershipAnnounce { repeated uint64 cart_ids = 2; // newly claimed cart ids } +// ExpiryAnnounce broadcasts that a host evicted the provided cart IDs. +message ExpiryAnnounce { + string host = 1; + repeated uint64 cart_ids = 2; +} + // ControlPlane defines cluster coordination and ownership operations. service ControlPlane { // Ping for liveness; lightweight health signal. @@ -75,6 +81,9 @@ service ControlPlane { // Ownership announcement: first-touch claim broadcast (idempotent; best-effort). rpc AnnounceOwnership(OwnershipAnnounce) returns (OwnerChangeAck); + // Expiry announcement: drop remote ownership hints when local TTL expires. + rpc AnnounceExpiry(ExpiryAnnounce) returns (OwnerChangeAck); + // Closing announces graceful shutdown so peers can proactively adjust. rpc Closing(ClosingNotice) returns (OwnerChangeAck); } diff --git a/proto/control_plane_grpc.pb.go b/proto/control_plane_grpc.pb.go index 77e61ad..271fbad 100644 --- a/proto/control_plane_grpc.pb.go +++ b/proto/control_plane_grpc.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.5.1 -// - protoc v3.21.12 -// source: control_plane.proto +// - protoc v6.32.1 +// source: proto/control_plane.proto package messages @@ -23,6 +23,7 @@ const ( ControlPlane_Negotiate_FullMethodName = "/messages.ControlPlane/Negotiate" ControlPlane_GetCartIds_FullMethodName = "/messages.ControlPlane/GetCartIds" ControlPlane_AnnounceOwnership_FullMethodName = "/messages.ControlPlane/AnnounceOwnership" + ControlPlane_AnnounceExpiry_FullMethodName = "/messages.ControlPlane/AnnounceExpiry" ControlPlane_Closing_FullMethodName = "/messages.ControlPlane/Closing" ) @@ -40,6 +41,8 @@ type ControlPlaneClient interface { GetCartIds(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*CartIdsReply, error) // Ownership announcement: first-touch claim broadcast (idempotent; best-effort). AnnounceOwnership(ctx context.Context, in *OwnershipAnnounce, opts ...grpc.CallOption) (*OwnerChangeAck, error) + // Expiry announcement: drop remote ownership hints when local TTL expires. + AnnounceExpiry(ctx context.Context, in *ExpiryAnnounce, opts ...grpc.CallOption) (*OwnerChangeAck, error) // Closing announces graceful shutdown so peers can proactively adjust. Closing(ctx context.Context, in *ClosingNotice, opts ...grpc.CallOption) (*OwnerChangeAck, error) } @@ -92,6 +95,16 @@ func (c *controlPlaneClient) AnnounceOwnership(ctx context.Context, in *Ownershi return out, nil } +func (c *controlPlaneClient) AnnounceExpiry(ctx context.Context, in *ExpiryAnnounce, opts ...grpc.CallOption) (*OwnerChangeAck, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(OwnerChangeAck) + err := c.cc.Invoke(ctx, ControlPlane_AnnounceExpiry_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *controlPlaneClient) Closing(ctx context.Context, in *ClosingNotice, opts ...grpc.CallOption) (*OwnerChangeAck, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(OwnerChangeAck) @@ -116,6 +129,8 @@ type ControlPlaneServer interface { GetCartIds(context.Context, *Empty) (*CartIdsReply, error) // Ownership announcement: first-touch claim broadcast (idempotent; best-effort). AnnounceOwnership(context.Context, *OwnershipAnnounce) (*OwnerChangeAck, error) + // Expiry announcement: drop remote ownership hints when local TTL expires. + AnnounceExpiry(context.Context, *ExpiryAnnounce) (*OwnerChangeAck, error) // Closing announces graceful shutdown so peers can proactively adjust. Closing(context.Context, *ClosingNotice) (*OwnerChangeAck, error) mustEmbedUnimplementedControlPlaneServer() @@ -140,6 +155,9 @@ func (UnimplementedControlPlaneServer) GetCartIds(context.Context, *Empty) (*Car func (UnimplementedControlPlaneServer) AnnounceOwnership(context.Context, *OwnershipAnnounce) (*OwnerChangeAck, error) { return nil, status.Errorf(codes.Unimplemented, "method AnnounceOwnership not implemented") } +func (UnimplementedControlPlaneServer) AnnounceExpiry(context.Context, *ExpiryAnnounce) (*OwnerChangeAck, error) { + return nil, status.Errorf(codes.Unimplemented, "method AnnounceExpiry not implemented") +} func (UnimplementedControlPlaneServer) Closing(context.Context, *ClosingNotice) (*OwnerChangeAck, error) { return nil, status.Errorf(codes.Unimplemented, "method Closing not implemented") } @@ -236,6 +254,24 @@ func _ControlPlane_AnnounceOwnership_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } +func _ControlPlane_AnnounceExpiry_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ExpiryAnnounce) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ControlPlaneServer).AnnounceExpiry(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ControlPlane_AnnounceExpiry_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ControlPlaneServer).AnnounceExpiry(ctx, req.(*ExpiryAnnounce)) + } + return interceptor(ctx, in, info, handler) +} + func _ControlPlane_Closing_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(ClosingNotice) if err := dec(in); err != nil { @@ -277,11 +313,15 @@ var ControlPlane_ServiceDesc = grpc.ServiceDesc{ MethodName: "AnnounceOwnership", Handler: _ControlPlane_AnnounceOwnership_Handler, }, + { + MethodName: "AnnounceExpiry", + Handler: _ControlPlane_AnnounceExpiry_Handler, + }, { MethodName: "Closing", Handler: _ControlPlane_Closing_Handler, }, }, Streams: []grpc.StreamDesc{}, - Metadata: "control_plane.proto", + Metadata: "proto/control_plane.proto", } diff --git a/synced-pool.go b/synced-pool.go deleted file mode 100644 index a886e31..0000000 --- a/synced-pool.go +++ /dev/null @@ -1,642 +0,0 @@ -package main - -import ( - "context" - "fmt" - "io" - "log" - "net/http" - "reflect" - "sync" - "time" - - messages "git.tornberg.me/go-cart-actor/proto" - proto "git.tornberg.me/go-cart-actor/proto" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "k8s.io/apimachinery/pkg/watch" -) - -// SyncedPool coordinates cart grain ownership across nodes using gRPC control plane -// and cart actor services. -// -// Responsibilities: -// - Local grain access (delegates to GrainLocalPool) -// - Cluster membership (AddRemote via discovery + negotiation) -// - Health/ping monitoring & remote removal -// - (Legacy) ring-based ownership removed in first-touch model -// -// Thread-safety: public methods that mutate internal maps lock p.mu (RWMutex). -type SyncedPool struct { - LocalHostname string - local *GrainLocalPool - - // New ownership tracking (first-touch / announcement model) - // remoteOwners maps cart id -> owning host (excluding locally owned carts which live in local.grains) - remoteOwners map[CartId]*RemoteHostGRPC - - mu sync.RWMutex - - // Remote host state (gRPC only) - remoteHosts map[string]*RemoteHostGRPC // host -> remote host - - // Discovery handler for re-adding hosts after failures - discardedHostHandler *DiscardedHostHandler -} - -// RemoteHostGRPC tracks a remote host's clients & health. -type RemoteHostGRPC struct { - Host string - Conn *grpc.ClientConn - Transport *http.Transport - Client *http.Client - ControlClient proto.ControlPlaneClient - MissedPings int -} - -func (h *RemoteHostGRPC) Name() string { - return h.Host -} - -func (h *RemoteHostGRPC) Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error) { - - req, err := http.NewRequestWithContext(r.Context(), r.Method, h.Host, r.Body) - - if err != nil { - http.Error(w, "proxy build error", http.StatusBadGateway) - return true, err - } - for k, v := range r.Header { - for _, vv := range v { - req.Header.Add(k, vv) - } - } - res, err := h.Client.Do(req) - if err != nil { - http.Error(w, "proxy request error", http.StatusBadGateway) - return true, err - } - defer res.Body.Close() - - for k, v := range res.Header { - for _, vv := range v { - w.Header().Add(k, vv) - } - } - w.Header().Set("X-Cart-Owner-Routed", "true") - if res.StatusCode >= 200 && res.StatusCode <= 299 { - w.WriteHeader(res.StatusCode) - _, copyErr := io.Copy(w, res.Body) - if copyErr != nil { - return true, copyErr - } - return true, nil - } - - return false, fmt.Errorf("proxy response status %d", res.StatusCode) - -} - -func (r *RemoteHostGRPC) IsHealthy() bool { - return r.MissedPings < 3 -} - -var ( - negotiationCount = promauto.NewCounter(prometheus.CounterOpts{ - Name: "cart_remote_negotiation_total", - Help: "The total number of remote negotiations", - }) - connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cart_connected_remotes", - Help: "The number of connected remotes", - }) - cartMutationsTotal = promauto.NewCounter(prometheus.CounterOpts{ - Name: "cart_mutations_total", - Help: "Total number of cart state mutations applied.", - }) - cartMutationFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{ - Name: "cart_mutation_failures_total", - Help: "Total number of failed cart state mutations.", - }) - cartMutationLatencySeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Name: "cart_mutation_latency_seconds", - Help: "Latency of cart mutations in seconds.", - Buckets: prometheus.DefBuckets, - }, []string{"mutation"}) - cartActiveGrains = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cart_active_grains", - Help: "Number of active (resident) local grains.", - }) -) - -func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) { - p := &SyncedPool{ - LocalHostname: hostname, - local: local, - remoteHosts: make(map[string]*RemoteHostGRPC), - remoteOwners: make(map[CartId]*RemoteHostGRPC), - discardedHostHandler: NewDiscardedHostHandler(1338), - } - p.discardedHostHandler.SetReconnectHandler(p.AddRemote) - - if discovery != nil { - go func() { - time.Sleep(3 * time.Second) // allow gRPC server startup - log.Printf("Starting discovery watcher") - ch, err := discovery.Watch() - if err != nil { - log.Printf("Discovery error: %v", err) - return - } - for evt := range ch { - if evt.Host == "" { - continue - } - switch evt.Type { - case watch.Deleted: - if p.IsKnown(evt.Host) { - p.RemoveHost(evt.Host) - } - default: - if !p.IsKnown(evt.Host) { - log.Printf("Discovered host %s", evt.Host) - p.AddRemote(evt.Host) - } - } - } - }() - } else { - log.Printf("No discovery configured; expecting manual AddRemote or static host injection") - } - - return p, nil -} - -// ------------------------- Remote Host Management ----------------------------- - -// AddRemote dials a remote host and initializes grain proxies. -func (p *SyncedPool) AddRemote(host string) { - if host == "" || host == p.LocalHostname { - return - } - - p.mu.Lock() - if _, exists := p.remoteHosts[host]; exists { - p.mu.Unlock() - return - } - p.mu.Unlock() - - target := fmt.Sprintf("%s:1337", host) - //ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - //defer cancel() - conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials())) //grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithBlock()) - if err != nil { - log.Printf("AddRemote: dial %s failed: %v", target, err) - return - } - - controlClient := proto.NewControlPlaneClient(conn) - - // Health check (Ping) with limited retries - pings := 3 - for pings > 0 { - ctxPing, cancelPing := context.WithTimeout(context.Background(), 1*time.Second) - _, pingErr := controlClient.Ping(ctxPing, &proto.Empty{}) - cancelPing() - if pingErr == nil { - break - } - pings-- - time.Sleep(200 * time.Millisecond) - if pings == 0 { - log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr) - conn.Close() - return - } - } - transport := &http.Transport{ - MaxIdleConns: 100, // Maximum idle connections - MaxIdleConnsPerHost: 100, // Maximum idle connections per host - IdleConnTimeout: 120 * time.Second, // Timeout for idle connections - } - - client := &http.Client{ - Transport: transport, - Timeout: 10 * time.Second, // Request timeout - } - - remote := &RemoteHostGRPC{ - Host: host, - Conn: conn, - Transport: transport, - Client: client, - ControlClient: controlClient, - MissedPings: 0, - } - - p.mu.Lock() - p.remoteHosts[host] = remote - p.mu.Unlock() - connectedRemotes.Set(float64(p.RemoteCount())) - // Rebuild consistent hashing ring including this new host - //p.rebuildRing() - - log.Printf("Connected to remote host %s", host) - - go p.pingLoop(remote) - go p.initializeRemote(remote) - go p.Negotiate() -} - -// initializeRemote fetches remote cart ids and sets up remote grain proxies. -func (p *SyncedPool) initializeRemote(remote *RemoteHostGRPC) { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - reply, err := remote.ControlClient.GetCartIds(ctx, &proto.Empty{}) - if err != nil { - log.Printf("Init remote %s: GetCartIds error: %v", remote.Host, err) - return - } - count := 0 - // Record remote ownership (first-touch model) instead of spawning remote grain proxies. - p.mu.Lock() - for _, cid := range reply.CartIds { - - // Only set if not already claimed (first claim wins) - if _, exists := p.remoteOwners[CartId(cid)]; !exists { - p.remoteOwners[CartId(cid)] = remote - } - count++ - } - p.mu.Unlock() - log.Printf("Remote %s reported %d remote-owned carts (ownership cached)", remote.Host, count) -} - -// RemoveHost removes remote host and its grains. -func (p *SyncedPool) RemoveHost(host string) { - p.mu.Lock() - remote, exists := p.remoteHosts[host] - if exists { - delete(p.remoteHosts, host) - } - // purge remote ownership entries for this host - for id, h := range p.remoteOwners { - if h.Host == host { - delete(p.remoteOwners, id) - } - } - p.mu.Unlock() - - if exists { - remote.Conn.Close() - } - connectedRemotes.Set(float64(p.RemoteCount())) - // Rebuild ring after host removal - // p.rebuildRing() -} - -// RemoteCount returns number of tracked remote hosts. -func (p *SyncedPool) RemoteCount() int { - p.mu.RLock() - defer p.mu.RUnlock() - return len(p.remoteHosts) -} - -func (p *SyncedPool) IsKnown(host string) bool { - if host == p.LocalHostname { - return true - } - p.mu.RLock() - defer p.mu.RUnlock() - _, ok := p.remoteHosts[host] - return ok -} - -func (p *SyncedPool) ExcludeKnown(hosts []string) []string { - ret := make([]string, 0, len(hosts)) - for _, h := range hosts { - if !p.IsKnown(h) { - ret = append(ret, h) - } - } - return ret -} - -// ------------------------- Health / Ping ------------------------------------- - -func (p *SyncedPool) pingLoop(remote *RemoteHostGRPC) { - ticker := time.NewTicker(3 * time.Second) - defer ticker.Stop() - for range ticker.C { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - _, err := remote.ControlClient.Ping(ctx, &proto.Empty{}) - cancel() - if err != nil { - remote.MissedPings++ - log.Printf("Ping %s failed (%d)", remote.Host, remote.MissedPings) - if !remote.IsHealthy() { - log.Printf("Remote %s unhealthy, removing", remote.Host) - p.RemoveHost(remote.Host) - return - } - continue - } - remote.MissedPings = 0 - } -} - -func (p *SyncedPool) IsHealthy() bool { - p.mu.RLock() - defer p.mu.RUnlock() - for _, r := range p.remoteHosts { - if !r.IsHealthy() { - return false - } - } - return true -} - -// ------------------------- Negotiation --------------------------------------- - -func (p *SyncedPool) Negotiate() { - negotiationCount.Inc() - - p.mu.RLock() - hosts := make([]string, 0, len(p.remoteHosts)+1) - hosts = append(hosts, p.LocalHostname) - for h := range p.remoteHosts { - hosts = append(hosts, h) - } - remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) - for _, r := range p.remoteHosts { - remotes = append(remotes, r) - } - p.mu.RUnlock() - - for _, r := range remotes { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - reply, err := r.ControlClient.Negotiate(ctx, &proto.NegotiateRequest{KnownHosts: hosts}) - cancel() - if err != nil { - log.Printf("Negotiate with %s failed: %v", r.Host, err) - continue - } - for _, h := range reply.Hosts { - if !p.IsKnown(h) { - p.AddRemote(h) - } - } - } - - // Ring rebuild removed (first-touch ownership model no longer uses ring) -} - -// ------------------------- Grain / Ring Ownership ---------------------------- - -// RemoveRemoteGrain obsolete in first-touch model (no remote grain proxies retained) - -// SpawnRemoteGrain removed (remote grain proxies eliminated in first-touch model) - -// GetHealthyRemotes retained (still useful for broadcasting ownership) -func (p *SyncedPool) GetHealthyRemotes() []*RemoteHostGRPC { - p.mu.RLock() - defer p.mu.RUnlock() - ret := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) - for _, r := range p.remoteHosts { - if r.IsHealthy() { - ret = append(ret, r) - } - } - return ret -} - -func (p *SyncedPool) removeLocalGrain(id CartId) { - p.mu.Lock() - delete(p.local.grains, uint64(id)) - p.mu.Unlock() -} - -// ------------------------- First-Touch Ownership Resolution ------------------ - -// ErrNotOwner is returned when an operation is attempted on a cart that is -// owned by a different host (according to first-touch ownership mapping). -var ErrNotOwner = fmt.Errorf("not owner") - -// resolveOwnerFirstTouch implements the new semantics: -// 1. If local grain exists -> local host owns it. -// 2. Else if remoteOwners has an entry -> return that host. -// 3. Else: claim locally (spawn), insert into remoteOwners map locally for -// idempotency, and asynchronously announce ownership to all remotes. -// -// NOTE: This does NOT (yet) reconcile conflicting announcements; first claim -// wins. Later improvements can add tie-break via timestamp or host ordering. -func (p *SyncedPool) resolveOwnerFirstTouch(id CartId) error { - // Fast local existence check - p.local.mu.RLock() - _, existsLocal := p.local.grains[uint64(id)] - p.local.mu.RUnlock() - if existsLocal { - return nil - } - - // Remote ownership map lookup - p.mu.RLock() - remoteHost, foundRemote := p.remoteOwners[id] - p.mu.RUnlock() - if foundRemote && remoteHost.Host != "" { - log.Printf("other owner exists %s", remoteHost.Host) - return nil - } - - // Claim: spawn locally - _, err := p.local.GetGrain(id) - if err != nil { - return err - } - - // Announce asynchronously - go p.broadcastOwnership([]CartId{id}) - return nil -} - -// broadcastOwnership sends an AnnounceOwnership RPC to all healthy remotes. -// Best-effort: failures are logged and ignored. -func (p *SyncedPool) broadcastOwnership(ids []CartId) { - if len(ids) == 0 { - return - } - - uids := make([]uint64, 0, len(ids)) - for _, id := range ids { - uids = append(uids, uint64(id)) - } - - p.mu.RLock() - defer p.mu.RUnlock() - - for _, r := range p.remoteHosts { - if r.IsHealthy() { - go func(rh *RemoteHostGRPC) { - rh.ControlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{ - Host: p.LocalHostname, - CartIds: uids, - }) - }(r) - } - } -} - -// AdoptRemoteOwnership processes an incoming ownership announcement for cart ids. -func (p *SyncedPool) AdoptRemoteOwnership(host string, ids []string) { - if host == "" || host == p.LocalHostname { - return - } - remoteHost, ok := p.remoteHosts[host] - if !ok { - log.Printf("remote host does not exist!!") - } - p.mu.Lock() - defer p.mu.Unlock() - for _, s := range ids { - if s == "" { - continue - } - parsed, ok := ParseCartId(s) - if !ok { - continue // skip invalid cart id strings - } - id := parsed - // Do not overwrite if already claimed by another host (first wins). - if existing, ok := p.remoteOwners[id]; ok && existing != remoteHost { - continue - } - // Skip if we own locally (local wins for our own process) - p.local.mu.RLock() - _, localHas := p.local.grains[uint64(id)] - p.local.mu.RUnlock() - if localHas { - continue - } - p.remoteOwners[id] = remoteHost - } -} - -// getGrain returns a local grain if this host is (or becomes) the owner under -// the first-touch model. If another host owns the cart, ErrNotOwner is returned. -// Remote grain proxy logic and ring-based spawning have been removed. -func (p *SyncedPool) getGrain(id CartId) (Grain, error) { - - // Owner is local (either existing or just claimed), fetch/create grain. - grain, err := p.local.GetGrain(id) - if err != nil { - return nil, err - } - p.resolveOwnerFirstTouch(id) - return grain, nil -} - -// Apply applies a single mutation to a grain (local or remote). -// Replication (RF>1) scaffolding: future enhancement will fan-out mutations -// to replica owners (best-effort) and reconcile quorum on read. -func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { - grain, err := p.getGrain(id) - if err != nil { - log.Printf("could not get grain %v", err) - return nil, err - } - // if err == ErrNotOwner { - // // Remote owner reported but either unreachable or failed earlier in stack. - // // Takeover strategy: remove remote mapping (first-touch override) and claim locally. - // p.mu.Lock() - // delete(p.remoteOwners, id) - // p.mu.Unlock() - // if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil { - // return nil, terr - // } else if owner == p.LocalHostname { - // // Fetch (now-local) grain - // grain, err = p.local.GetGrain(id) - // if err != nil { - // return nil, err - // } - // } else { - // // Another host reclaimed before us; treat as not owner. - // return nil, ErrNotOwner - // } - // } else if err != nil { - // return nil, err - // } - - start := time.Now() - result, applyErr := grain.Apply(mutation, false) - - // Derive mutation type label (strip pointer) - mutationType := "unknown" - if mutation != nil { - if t := reflect.TypeOf(mutation); t != nil { - if t.Kind() == reflect.Ptr { - t = t.Elem() - } - if t.Name() != "" { - mutationType = t.Name() - } - } - } - cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds()) - - if applyErr == nil && result != nil { - cartMutationsTotal.Inc() - //if p.ownerHostFor(id) == p.LocalHostname { - // Update active grains gauge only for local ownership - cartActiveGrains.Set(float64(p.local.DebugGrainCount())) - //} - } else if applyErr != nil { - cartMutationFailuresTotal.Inc() - } - return result, applyErr -} - -// Get returns current state of a grain (local or remote). -// Future replication hook: Read-repair or quorum read can be added here. -func (p *SyncedPool) Get(id CartId) (*CartGrain, error) { - grain, err := p.getGrain(id) - if err != nil { - log.Printf("could not get grain %v", err) - return nil, err - } - return grain.GetCurrentState() -} - -// Close notifies remotes this host is terminating. -func (p *SyncedPool) Close() { - p.mu.RLock() - remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) - for _, r := range p.remoteHosts { - remotes = append(remotes, r) - } - p.mu.RUnlock() - - for _, r := range remotes { - go func(rh *RemoteHostGRPC) { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - _, err := rh.ControlClient.Closing(ctx, &proto.ClosingNotice{Host: p.LocalHostname}) - cancel() - if err != nil { - log.Printf("Close notify to %s failed: %v", rh.Host, err) - } - }(r) - } -} - -// Hostname implements the GrainPool interface, returning this node's hostname. -func (p *SyncedPool) Hostname() string { - return p.LocalHostname -} - -// OwnerHost returns the primary owning host for a given cart id (ring lookup). -func (p *SyncedPool) OwnerHost(id CartId) (Host, bool) { - ownerHost, ok := p.remoteOwners[id] - return ownerHost, ok -}