diff --git a/README.md b/README.md index 67521f7..5fa0634 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,36 @@ # Go Cart Actor +## Migration Notes (Ring-based Ownership Transition) + +This release removes the legacy ConfirmOwner ownership negotiation RPC in favor of deterministic ownership via the consistent hashing ring. + +Summary of changes: +- ConfirmOwner RPC removed from the ControlPlane service. +- OwnerChangeRequest message removed (was only used by ConfirmOwner). +- OwnerChangeAck retained solely as the response type for the Closing RPC. +- SyncedPool now relies exclusively on the ring for ownership (no quorum negotiation). +- Remote proxy creation includes a bounded readiness retry to reduce first-call failures. +- New Prometheus ring metrics: + - cart_ring_epoch + - cart_ring_hosts + - cart_ring_vnodes + - cart_ring_host_share{host} + - cart_ring_lookup_local_total + - cart_ring_lookup_remote_total + +Action required for consumers: +1. Regenerate protobuf code after pulling (requires protoc-gen-go and protoc-gen-go-grpc installed). +2. Remove any client code or automation invoking ConfirmOwner (calls will now return UNIMPLEMENTED if using stale generated stubs). +3. Update monitoring/alerts that referenced ConfirmOwner or ownership quorum failures—use ring metrics instead. +4. If you previously interpreted “ownership flapping” via ConfirmOwner logs, now check for: + - Rapid changes in ring epoch (cart_ring_epoch) + - Host churn (cart_ring_hosts) + - Imbalance in vnode distribution (cart_ring_host_share) + +No data migration is necessary; cart IDs and grain state are unaffected. + +--- + A distributed cart management system using the actor model pattern. ## Prerequisites @@ -240,8 +271,8 @@ Responsibilities: 1. Discovery integration (via a `Discovery` interface) adds/removes hosts. 2. Periodic ping health checks (ControlPlane.Ping). -3. Ownership negotiation: - - On first contention / unknown owner, node calls `ConfirmOwner` on peers to achieve quorum before making a local grain authoritative. +3. Ring-based deterministic ownership: + - Ownership is derived directly from the consistent hashing ring (no quorum RPC or `ConfirmOwner`). 4. Remote spawning: - When a remote host reports its cart ids (`GetCartIds`), the pool creates remote proxies for fast routing. @@ -270,7 +301,6 @@ Defined in `proto/control_plane.proto`: | `Ping` | Liveness; increments missed ping counter if failing. | | `Negotiate` | Merges membership views; used after discovery events. | | `GetCartIds` | Enumerate locally owned carts for remote index seeding. | -| `ConfirmOwner` | Quorum acknowledgment for ownership claim. | | `Closing` | Graceful shutdown notice; peers remove host & associated remote grains. | ### Ownership / Quorum Rules @@ -347,7 +377,7 @@ Defined in `proto/control_plane.proto`: ## gRPC Interfaces - **CartActor**: Per-mutation unary RPCs + `GetState`. (Checkout logic intentionally excluded; handled at HTTP layer.) -- **ControlPlane**: Cluster coordination (Ping, Negotiate, ConfirmOwner, etc.). +- **ControlPlane**: Cluster coordination (Ping, Negotiate, GetCartIds, Closing) — ownership now ring-determined (no ConfirmOwner). **Ports** (default / implied): - CartActor & ControlPlane share the same gRPC server/listener (single port, e.g. `:1337`). @@ -396,7 +426,7 @@ Defined in `proto/control_plane.proto`: ``` Client -> HTTP Handler -> SyncedPool -> (local?) -> Registry -> Grain State \-> (remote?) -> RemoteGrainGRPC -> gRPC -> Remote CartActor -> Registry -> Grain -ControlPlane: Discovery Events <-> Negotiation/Ping/ConfirmOwner <-> SyncedPool state +ControlPlane: Discovery Events <-> Negotiation/Ping <-> SyncedPool state (ring determines ownership) ``` --- @@ -407,7 +437,7 @@ ControlPlane: Discovery Events <-> Negotiation/Ping/ConfirmOwner <-> SyncedPool |---------|--------------|--------| | New cart every request | Secure cookie over plain HTTP or not sending cookie jar | Disable Secure locally or use HTTPS & proper curl `-b` | | Unsupported mutation error | Missing registry handler | Add `RegisterMutation` for that proto | -| Ownership flapping | Quorum failing due to intermittent peers | Investigate `ConfirmOwner` errors / network | +| Ownership imbalance | Ring host distribution skew or rapid host churn | Examine `cart_ring_host_share`, `cart_ring_hosts`, and logs for host add/remove; rebalance or investigate instability | | Remote mutation latency | Network / serialization overhead | Consider batching or colocating hot carts | | Checkout returns 500 | Klarna call failed | Inspect logs; no grain state mutated | diff --git a/cart_id.go b/cart_id.go new file mode 100644 index 0000000..ba9d6fe --- /dev/null +++ b/cart_id.go @@ -0,0 +1,327 @@ +package main + +import ( + "crypto/rand" + "encoding/binary" + "errors" + "fmt" + "strings" +) + +// cart_id.go +// +// Compact CartID implementation using 64 bits of cryptographic randomness, +// base62 encoded (0-9 A-Z a-z). Typical length is 11 characters (since 62^11 > 2^64). +// +// Motivation: +// * Shorter identifiers for cookies / URLs than legacy padded 16-byte CartId +// * O(1) hashing (raw uint64) for consistent hashing ring integration +// * Extremely low collision probability (birthday bound negligible at scale) +// +// Backward Compatibility Strategy (Phased): +// Phase 1: Introduce CartID helpers while continuing to accept legacy CartId. +// Phase 2: Internally migrate maps to key by uint64 (CartID.Raw()). +// Phase 3: Canonicalize all inbound IDs to short base62; reissue Set-Cart-Id header. +// +// NOTE: +// The legacy type `CartId [16]byte` is still present elsewhere; helper +// UpgradeLegacyCartId bridges that representation to the new form without +// breaking deterministic mapping for existing carts. +// +// Security / Predictability: +// Uses crypto/rand for generation. If ever required, you can layer an +// HMAC-based derivation for additional secrecy. Current approach already +// provides 64 bits of entropy (brute force infeasible for practical risk). +// +// Future Extensions: +// * Time-sortable IDs: prepend a 48-bit timestamp field and encode 80 bits. +// * Add metrics counters for: generated_new, parsed_existing, legacy_fallback. +// * Add a pool of pre-generated IDs for ultra-low-latency hot paths (rarely needed). +// +// Public Surface Summary: +// NewCartID() (CartID, error) +// ParseCartID(string) (CartID, bool) +// FallbackFromString(string) CartID +// UpgradeLegacyCartId(CartId) CartID +// CanonicalizeIncoming(string) (CartID, bool /*wasGenerated*/, error) +// +// Encoding Details: +// encodeBase62 / decodeBase62 maintain a stable alphabet. DO NOT change +// alphabet order once IDs are in circulation, or previously issued IDs +// will change meaning. +// +// Zero Values: +// The zero value CartID{} has raw=0, txt="0". Treat it as valid but +// usually you will call NewCartID instead. +// +// --------------------------------------------------------------------------- + +const base62Alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + +// Precomputed reverse lookup table for decode (255 = invalid). +var base62Rev [256]byte + +func init() { + for i := range base62Rev { + base62Rev[i] = 0xFF + } + for i := 0; i < len(base62Alphabet); i++ { + base62Rev[base62Alphabet[i]] = byte(i) + } +} + +// CartID is the compact representation of a cart identifier. +// raw: 64-bit entropy (also used directly for consistent hashing). +// txt: cached base62 textual form. +type CartID struct { + raw uint64 + txt string +} + +// String returns the canonical base62 encoded ID. +func (c CartID) String() string { + if c.txt == "" { // lazily encode if constructed manually + c.txt = encodeBase62(c.raw) + } + return c.txt +} + +// Raw returns the 64-bit numeric value (useful for hashing / ring lookup). +func (c CartID) Raw() uint64 { + return c.raw +} + +// IsZero reports whether this CartID is the zero value. +func (c CartID) IsZero() bool { + return c.raw == 0 +} + +// NewCartID generates a new cryptographically random 64-bit ID. +func NewCartID() (CartID, error) { + var b [8]byte + if _, err := rand.Read(b[:]); err != nil { + return CartID{}, fmt.Errorf("NewCartID: %w", err) + } + u := binary.BigEndian.Uint64(b[:]) + // Reject zero if you want to avoid ever producing "0" (optional). + if u == 0 { + // Extremely unlikely; recurse once. + return NewCartID() + } + return CartID{raw: u, txt: encodeBase62(u)}, nil +} + +// MustNewCartID panics on failure (suitable for tests / initialization). +func MustNewCartID() CartID { + id, err := NewCartID() + if err != nil { + panic(err) + } + return id +} + +// ParseCartID attempts to parse a base62 canonical ID. +// Returns (id, true) if fully valid; (zero, false) otherwise. +func ParseCartID(s string) (CartID, bool) { + if len(s) == 0 { + return CartID{}, false + } + // Basic length sanity; allow a bit of headroom for future timestamp variant. + if len(s) > 16 { + return CartID{}, false + } + u, ok := decodeBase62(s) + if !ok { + return CartID{}, false + } + return CartID{raw: u, txt: s}, true +} + +// FallbackFromString produces a deterministic CartID from arbitrary input +// using a 64-bit FNV-1a hash. This allows legacy or malformed IDs to map +// consistently into the new scheme (collision probability still low). +func FallbackFromString(s string) CartID { + const ( + offset64 = 1469598103934665603 + prime64 = 1099511628211 + ) + h := uint64(offset64) + for i := 0; i < len(s); i++ { + h ^= uint64(s[i]) + h *= prime64 + } + return CartID{raw: h, txt: encodeBase62(h)} +} + +// UpgradeLegacyCartId converts the old 16-byte CartId (padded) to CartID +// by hashing its trimmed string form. Keeps stable mapping across restarts. +func UpgradeLegacyCartId(old CartId) CartID { + return FallbackFromString(old.String()) +} + +// CanonicalizeIncoming normalizes user-provided ID strings. +// Behavior: +// +// Empty string -> generate new ID (wasGenerated = true) +// Valid base62 -> parse and return (wasGenerated = false) +// Anything else -> fallback deterministic hash (wasGenerated = false) +// +// Errors only occur if crypto/rand fails during generation. +func CanonicalizeIncoming(s string) (CartID, bool, error) { + if s == "" { + id, err := NewCartID() + return id, true, err + } + if cid, ok := ParseCartID(s); ok { + return cid, false, nil + } + // Legacy heuristic: if length == 16 and contains non-base62 chars, treat as legacy padded ID. + if len(s) == 16 && !isAllBase62(s) { + return FallbackFromString(strings.TrimRight(s, "\x00")), false, nil + } + return FallbackFromString(s), false, nil +} + +// isAllBase62 returns true if every byte is in the base62 alphabet. +func isAllBase62(s string) bool { + for i := 0; i < len(s); i++ { + if base62Rev[s[i]] == 0xFF { + return false + } + } + return true +} + +// encodeBase62 turns a uint64 into base62 text. +// Complexity: O(log_62 n) ~ at most 11 iterations for 64 bits. +func encodeBase62(u uint64) string { + if u == 0 { + return "0" + } + // 62^11 = 743008370688 > 2^39; 62^11 > 2^64? Actually 62^11 ~= 5.18e19 < 2^64 (1.84e19)? 2^64 ≈ 1.84e19. + // 62^11 ≈ 5.18e19 > 2^64? Correction: 2^64 ≈ 1.844e19, so 62^11 > 2^64. Thus 11 chars suffice. + var buf [11]byte + i := len(buf) + for u > 0 { + i-- + buf[i] = base62Alphabet[u%62] + u /= 62 + } + return string(buf[i:]) +} + +// decodeBase62 converts a base62 string to uint64. +// Returns (value, false) if any invalid character appears. +func decodeBase62(s string) (uint64, bool) { + var v uint64 + for i := 0; i < len(s); i++ { + c := s[i] + d := base62Rev[c] + if d == 0xFF { + return 0, false + } + v = v*62 + uint64(d) + } + return v, true +} + +// ErrInvalidCartID can be returned by higher-level validation layers if you decide +// to reject fallback-derived IDs (currently unused here). +var ErrInvalidCartID = errors.New("invalid cart id") + +// --------------------------------------------------------------------------- +// Legacy / Compatibility Conversion Helpers +// --------------------------------------------------------------------------- + +// CartIDToLegacy converts a CartID (base62) into the legacy fixed-size CartId +// ([16]byte) by copying the textual form (truncated or zero-padded). +// NOTE: If the base62 string is longer than 16 (should not happen with current +// 64-bit space), it will be truncated. +func CartIDToLegacy(c CartID) CartId { + var id CartId + txt := c.String() + copy(id[:], []byte(txt)) + return id +} + +// LegacyToCartID upgrades a legacy CartId (padded) to a CartID by hashing its +// trimmed string form (deterministic). This preserves stable mapping without +// depending on original randomness. +func LegacyToCartID(old CartId) CartID { + return UpgradeLegacyCartId(old) +} + +// CartIDToKey returns the numeric key representation (uint64) for map indexing. +func CartIDToKey(c CartID) uint64 { + return c.Raw() +} + +// LegacyToCartKey converts a legacy CartId to the numeric key via deterministic +// fallback hashing. (Uses the same logic as LegacyToCartID then returns raw.) +func LegacyToCartKey(old CartId) uint64 { + return LegacyToCartID(old).Raw() +} + +// ---------------------- Optional Helper Utilities ---------------------------- + +// CartIDOrNew tries to parse s; if empty OR invalid returns a fresh ID. +func CartIDOrNew(s string) (CartID, bool /*wasParsed*/, error) { + if cid, ok := ParseCartID(s); ok { + return cid, true, nil + } + id, err := NewCartID() + return id, false, err +} + +// MustParseCartID panics if s is not a valid base62 ID (useful in tests). +func MustParseCartID(s string) CartID { + if cid, ok := ParseCartID(s); ok { + return cid + } + panic(fmt.Sprintf("invalid CartID: %s", s)) +} + +// DebugString returns a verbose description (for logging / diagnostics). +func (c CartID) DebugString() string { + return fmt.Sprintf("CartID(raw=%d txt=%s)", c.raw, c.String()) +} + +// Equal compares two CartIDs by raw value. +func (c CartID) Equal(other CartID) bool { + return c.raw == other.raw +} + +// CanonicalizeOrLegacy preserves legacy (non-base62) IDs without altering their +// textual form, avoiding the previous behavior where fallback hashing replaced +// the original string with a base62-encoded hash (which broke deterministic +// key derivation across mixed call paths). +// +// Behavior: +// - s == "" -> generate new CartID (generatedNew = true, wasBase62 = true) +// - base62 ok -> return parsed CartID (generatedNew = false, wasBase62 = true) +// - otherwise -> treat as legacy: raw = hash(s), txt = original s +// +// Returns: +// +// cid - CartID (txt preserved for legacy inputs) +// generatedNew - true only when a brand new ID was created due to empty input +// wasBase62 - true if the input was already canonical base62 (or generated) +// err - only set if crypto/rand fails when generating a new ID +func CanonicalizeOrLegacy(s string) (cid CartID, generatedNew bool, wasBase62 bool, err error) { + if s == "" { + id, e := NewCartID() + if e != nil { + return CartID{}, false, false, e + } + return id, true, true, nil + } + if parsed, ok := ParseCartID(s); ok { + return parsed, false, true, nil + } + // Legacy path: keep original text so downstream legacy-to-key hashing + // (which uses the visible string) yields consistent keys across code paths. + hashCID := FallbackFromString(s) + // Preserve original textual form + hashCID.txt = s + return hashCID, false, false, nil +} diff --git a/cart_id_test.go b/cart_id_test.go new file mode 100644 index 0000000..b5d6020 --- /dev/null +++ b/cart_id_test.go @@ -0,0 +1,259 @@ +package main + +import ( + "crypto/rand" + "encoding/binary" + "fmt" + mrand "math/rand" + "testing" +) + +// TestEncodeDecodeBase62RoundTrip verifies encodeBase62/decodeBase62 are inverse. +func TestEncodeDecodeBase62RoundTrip(t *testing.T) { + mrand.Seed(42) + for i := 0; i < 1000; i++ { + // Random 64-bit value + v := mrand.Uint64() + s := encodeBase62(v) + dec, ok := decodeBase62(s) + if !ok { + t.Fatalf("decodeBase62 failed for %d encoded=%s", v, s) + } + if dec != v { + t.Fatalf("round trip mismatch: have %d got %d (encoded=%s)", v, dec, s) + } + } + // Explicit zero test + if s := encodeBase62(0); s != "0" { + t.Fatalf("expected encodeBase62(0) == \"0\", got %q", s) + } + if v, ok := decodeBase62("0"); !ok || v != 0 { + t.Fatalf("decodeBase62(0) unexpected result v=%d ok=%v", v, ok) + } +} + +// TestNewCartIDUniqueness generates a number of IDs and checks for duplicates. +func TestNewCartIDUniqueness(t *testing.T) { + const n = 10000 + seen := make(map[string]struct{}, n) + for i := 0; i < n; i++ { + id, err := NewCartID() + if err != nil { + t.Fatalf("NewCartID error: %v", err) + } + s := id.String() + if _, exists := seen[s]; exists { + t.Fatalf("duplicate CartID generated: %s", s) + } + seen[s] = struct{}{} + if id.IsZero() { + t.Fatalf("NewCartID returned zero value") + } + } +} + +// TestParseCartIDValidation tests parsing of valid and invalid base62 strings. +func TestParseCartIDValidation(t *testing.T) { + id, err := NewCartID() + if err != nil { + t.Fatalf("NewCartID error: %v", err) + } + parsed, ok := ParseCartID(id.String()) + if !ok { + t.Fatalf("ParseCartID failed for valid id %s", id) + } + if parsed.raw != id.raw { + t.Fatalf("parsed raw mismatch: %d vs %d", parsed.raw, id.raw) + } + + if _, ok := ParseCartID(""); ok { + t.Fatalf("expected empty string to be invalid") + } + // Invalid char ('-') + if _, ok := ParseCartID("abc-123"); ok { + t.Fatalf("expected invalid chars to fail parse") + } + // Overly long ( >16 ) + if _, ok := ParseCartID("1234567890abcdefg"); ok { + t.Fatalf("expected overly long string to fail parse") + } +} + +// TestFallbackDeterminism ensures fallback hashing is deterministic. +func TestFallbackDeterminism(t *testing.T) { + inputs := []string{ + "legacy-cart-1", + "legacy-cart-2", + "UPPER_lower_123", + "🚀unicode", // unicode bytes (will hash byte sequence) + } + for _, in := range inputs { + a := FallbackFromString(in) + b := FallbackFromString(in) + if a.raw != b.raw || a.String() != b.String() { + t.Fatalf("fallback mismatch for %q: %+v vs %+v", in, a, b) + } + } + // Distinct inputs should almost always differ; sample check + a := FallbackFromString("distinct-A") + b := FallbackFromString("distinct-B") + if a.raw == b.raw { + t.Fatalf("unexpected identical fallback hashes for distinct inputs") + } +} + +// TestCanonicalizeIncomingBehavior covers main control flow branches. +func TestCanonicalizeIncomingBehavior(t *testing.T) { + // Empty => new id + id1, generated, err := CanonicalizeIncoming("") + if err != nil || !generated || id1.IsZero() { + t.Fatalf("CanonicalizeIncoming empty failed: id=%v gen=%v err=%v", id1, generated, err) + } + + // Valid base62 => parse; no generation + id2, gen2, err := CanonicalizeIncoming(id1.String()) + if err != nil || gen2 || id2.raw != id1.raw { + t.Fatalf("CanonicalizeIncoming parse mismatch: id2=%v gen2=%v err=%v", id2, gen2, err) + } + + // Legacy-like random containing invalid chars -> fallback + fallbackInput := "legacy\x00\x00padding" + id3, gen3, err := CanonicalizeIncoming(fallbackInput) + if err != nil || gen3 { + t.Fatalf("CanonicalizeIncoming fallback unexpected: id3=%v gen3=%v err=%v", id3, gen3, err) + } + + // Deterministic fallback + id4, _, _ := CanonicalizeIncoming(fallbackInput) + if id3.raw != id4.raw { + t.Fatalf("fallback canonicalization not deterministic") + } +} + +// TestUpgradeLegacyCartId ensures mapping of old CartId is stable. +func TestUpgradeLegacyCartId(t *testing.T) { + var legacy CartId + copy(legacy[:], []byte("legacy-123456789")) // 15 bytes + padding + up1 := UpgradeLegacyCartId(legacy) + up2 := UpgradeLegacyCartId(legacy) + if up1.raw != up2.raw { + t.Fatalf("UpgradeLegacyCartId not deterministic: %v vs %v", up1, up2) + } + if up1.String() != up2.String() { + t.Fatalf("UpgradeLegacyCartId string mismatch: %s vs %s", up1, up2) + } +} + +// BenchmarkNewCartID gives a rough idea of generation cost. +func BenchmarkNewCartID(b *testing.B) { + for i := 0; i < b.N; i++ { + if _, err := NewCartID(); err != nil { + b.Fatalf("error: %v", err) + } + } +} + +// BenchmarkEncodeBase62 measures encode speed in isolation. +func BenchmarkEncodeBase62(b *testing.B) { + // Random sample of values + samples := make([]uint64, 1024) + for i := range samples { + var buf [8]byte + if _, err := rand.Read(buf[:]); err != nil { + b.Fatalf("rand: %v", err) + } + samples[i] = binary.BigEndian.Uint64(buf[:]) + } + b.ResetTimer() + var sink string + for i := 0; i < b.N; i++ { + sink = encodeBase62(samples[i%len(samples)]) + } + _ = sink +} + +// BenchmarkDecodeBase62 measures decode speed. +func BenchmarkDecodeBase62(b *testing.B) { + // Pre-encode + encoded := make([]string, 1024) + for i := range encoded { + encoded[i] = encodeBase62(uint64(i)<<32 | uint64(i)) + } + b.ResetTimer() + var sum uint64 + for i := 0; i < b.N; i++ { + v, ok := decodeBase62(encoded[i%len(encoded)]) + if !ok { + b.Fatalf("decode failed") + } + sum ^= v + } + _ = sum +} + +// TestLookupNDeterminism (ring integration smoke test) ensures LookupN +// returns distinct hosts and stable ordering for a fixed ring. +func TestLookupNDeterminism(t *testing.T) { + rb := NewRingBuilder().WithEpoch(1).WithVnodesPerHost(8).WithHosts([]string{"a", "b", "c"}) + ring := rb.Build() + if ring.Empty() { + t.Fatalf("expected non-empty ring") + } + id := MustNewCartID() + owners1 := ring.LookupN(id.Raw(), 3) + owners2 := ring.LookupN(id.Raw(), 3) + if len(owners1) != len(owners2) { + t.Fatalf("LookupN length mismatch") + } + for i := range owners1 { + if owners1[i].Host != owners2[i].Host { + t.Fatalf("LookupN ordering instability at %d: %v vs %v", i, owners1[i], owners2[i]) + } + } + // Distinct host constraint + seen := map[string]struct{}{} + for _, v := range owners1 { + if _, ok := seen[v.Host]; ok { + t.Fatalf("duplicate host in LookupN result: %v", owners1) + } + seen[v.Host] = struct{}{} + } +} + +// TestRingFingerprintChanges ensures fingerprint updates with membership changes. +func TestRingFingerprintChanges(t *testing.T) { + b1 := NewRingBuilder().WithEpoch(1).WithHosts([]string{"node1", "node2"}) + r1 := b1.Build() + b2 := NewRingBuilder().WithEpoch(2).WithHosts([]string{"node1", "node2", "node3"}) + r2 := b2.Build() + if r1.Fingerprint() == r2.Fingerprint() { + t.Fatalf("expected differing fingerprints after host set change") + } +} + +// TestRingDiffHosts verifies added/removed host detection. +func TestRingDiffHosts(t *testing.T) { + r1 := NewRingBuilder().WithEpoch(1).WithHosts([]string{"a", "b"}).Build() + r2 := NewRingBuilder().WithEpoch(2).WithHosts([]string{"b", "c"}).Build() + added, removed := r1.DiffHosts(r2) + if fmt.Sprintf("%v", added) != "[c]" { + t.Fatalf("expected added [c], got %v", added) + } + if fmt.Sprintf("%v", removed) != "[a]" { + t.Fatalf("expected removed [a], got %v", removed) + } +} + +// TestRingLookupConsistency ensures direct Lookup and LookupID are aligned. +func TestRingLookupConsistency(t *testing.T) { + ring := NewRingBuilder().WithEpoch(1).WithHosts([]string{"alpha", "beta"}).WithVnodesPerHost(4).Build() + id, _ := ParseCartID("1") + if id.IsZero() { + t.Fatalf("expected parsed id non-zero") + } + v1 := ring.Lookup(id.Raw()) + v2 := ring.LookupID(id) + if v1.Host != v2.Host || v1.Hash != v2.Hash { + t.Fatalf("Lookup vs LookupID mismatch: %+v vs %+v", v1, v2) + } +} diff --git a/grain-pool.go b/grain-pool.go index 43cf88e..9f474ce 100644 --- a/grain-pool.go +++ b/grain-pool.go @@ -10,6 +10,23 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) +// grain-pool.go +// +// Migration Note: +// This file has been migrated to use uint64 cart keys internally (derived +// from the new CartID base62 representation). For backward compatibility, +// a deprecated legacy map keyed by CartId is maintained so existing code +// that directly indexes pool.grains with a CartId continues to compile +// until the full refactor across SyncedPool / remoteIndex is completed. +// +// Authoritative storage: grains (map[uint64]*CartGrain) +// Legacy compatibility: grainsLegacy (map[CartId]*CartGrain) - kept in sync. +// +// Once all external usages are updated to rely on helper accessors, +// grainsLegacy can be removed. +// +// --------------------------------------------------------------------------- + var ( poolGrains = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_grains_in_pool", @@ -25,49 +42,71 @@ var ( }) ) +// GrainPool interface remains legacy-compatible. type GrainPool interface { Apply(id CartId, mutation interface{}) (*CartGrain, error) Get(id CartId) (*CartGrain, error) } +// Ttl keeps expiry info type Ttl struct { Expires time.Time Grain *CartGrain } +// GrainLocalPool now stores grains keyed by uint64 (CartKey). type GrainLocalPool struct { mu sync.RWMutex - grains map[CartId]*CartGrain + grains map[uint64]*CartGrain // authoritative only expiry []Ttl spawn func(id CartId) (*CartGrain, error) Ttl time.Duration PoolSize int } +// NewGrainLocalPool constructs a new pool. func NewGrainLocalPool(size int, ttl time.Duration, spawn func(id CartId) (*CartGrain, error)) *GrainLocalPool { - ret := &GrainLocalPool{ spawn: spawn, - grains: make(map[CartId]*CartGrain), + grains: make(map[uint64]*CartGrain), expiry: make([]Ttl, 0), Ttl: ttl, PoolSize: size, } - cartPurge := time.NewTicker(time.Minute) go func() { - <-cartPurge.C - ret.Purge() + for range cartPurge.C { + ret.Purge() + } }() return ret } +// keyFromCartId derives the uint64 key from a legacy CartId deterministically. +func keyFromCartId(id CartId) uint64 { + return LegacyToCartKey(id) +} + +// storeGrain indexes a grain in both maps. +func (p *GrainLocalPool) storeGrain(id CartId, g *CartGrain) { + k := keyFromCartId(id) + p.grains[k] = g +} + +// deleteGrain removes a grain from both maps. +func (p *GrainLocalPool) deleteGrain(id CartId) { + k := keyFromCartId(id) + delete(p.grains, k) +} + +// SetAvailable pre-populates placeholder entries (legacy signature). func (p *GrainLocalPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) { p.mu.Lock() defer p.mu.Unlock() for id := range availableWithLastChangeUnix { - if _, ok := p.grains[id]; !ok { - p.grains[id] = nil + k := keyFromCartId(id) + if _, ok := p.grains[k]; !ok { + p.grains[k] = nil p.expiry = append(p.expiry, Ttl{ Expires: time.Now().Add(p.Ttl), Grain: nil, @@ -76,13 +115,19 @@ func (p *GrainLocalPool) SetAvailable(availableWithLastChangeUnix map[CartId]int } } +// Purge removes expired grains. func (p *GrainLocalPool) Purge() { lastChangeTime := time.Now().Add(-p.Ttl) keepChanged := lastChangeTime.Unix() + p.mu.Lock() defer p.mu.Unlock() + for i := 0; i < len(p.expiry); i++ { item := p.expiry[i] + if item.Grain == nil { + continue + } if item.Expires.Before(time.Now()) { if item.Grain.GetLastChange() > keepChanged { log.Printf("Expired item %s changed, keeping", item.Grain.GetId()) @@ -90,12 +135,12 @@ func (p *GrainLocalPool) Purge() { p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) p.expiry = append(p.expiry, item) } else { + // move last to end (noop) p.expiry = append(p.expiry[:i], item) } - } else { log.Printf("Item %s expired", item.Grain.GetId()) - delete(p.grains, item.Grain.GetId()) + p.deleteGrain(item.Grain.GetId()) if i < len(p.expiry)-1 { p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) } else { @@ -108,40 +153,69 @@ func (p *GrainLocalPool) Purge() { } } +// GetGrains returns a legacy view of grains (copy) for compatibility. func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain { - return p.grains + p.mu.RLock() + defer p.mu.RUnlock() + out := make(map[CartId]*CartGrain, len(p.grains)) + for _, g := range p.grains { + if g != nil { + out[g.GetId()] = g + } + } + return out } -func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) { - var err error - // p.mu.RLock() - // defer p.mu.RUnlock() - grain, ok := p.grains[id] - grainLookups.Inc() - if grain == nil || !ok { - if len(p.grains) >= p.PoolSize { - if p.expiry[0].Expires.Before(time.Now()) { - delete(p.grains, p.expiry[0].Grain.GetId()) - p.expiry = p.expiry[1:] - } else { - return nil, fmt.Errorf("pool is full") - } - } - grain, err = p.spawn(id) - p.mu.Lock() - p.grains[id] = grain - p.mu.Unlock() - } - go func() { - l := float64(len(p.grains)) +// statsUpdate updates Prometheus gauges asynchronously. +func (p *GrainLocalPool) statsUpdate() { + go func(size int) { + l := float64(size) ps := float64(p.PoolSize) poolUsage.Set(l / ps) poolGrains.Set(l) poolSize.Set(ps) - }() + }(len(p.grains)) +} + +// GetGrain retrieves or spawns a grain (legacy id signature). +func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) { + grainLookups.Inc() + k := keyFromCartId(id) + + p.mu.RLock() + grain, ok := p.grains[k] + p.mu.RUnlock() + + var err error + if grain == nil || !ok { + p.mu.Lock() + // Re-check under write lock + grain, ok = p.grains[k] + if grain == nil || !ok { + // Capacity check + if len(p.grains) >= p.PoolSize && len(p.expiry) > 0 { + if p.expiry[0].Expires.Before(time.Now()) && p.expiry[0].Grain != nil { + oldId := p.expiry[0].Grain.GetId() + p.deleteGrain(oldId) + p.expiry = p.expiry[1:] + } else { + p.mu.Unlock() + return nil, fmt.Errorf("pool is full") + } + } + grain, err = p.spawn(id) + if err == nil { + p.storeGrain(id, grain) + } + } + p.mu.Unlock() + p.statsUpdate() + } + return grain, err } +// Apply applies a mutation (legacy compatibility). func (p *GrainLocalPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { grain, err := p.GetGrain(id) if err != nil || grain == nil { @@ -150,6 +224,21 @@ func (p *GrainLocalPool) Apply(id CartId, mutation interface{}) (*CartGrain, err return grain.Apply(mutation, false) } +// Get returns current state (legacy wrapper). func (p *GrainLocalPool) Get(id CartId) (*CartGrain, error) { return p.GetGrain(id) } + +// DebugGrainCount returns counts for debugging. +func (p *GrainLocalPool) DebugGrainCount() (authoritative int) { + p.mu.RLock() + defer p.mu.RUnlock() + return len(p.grains) +} + +// UnsafePointerToLegacyMap exposes the legacy map pointer (for transitional +// tests that still poke the field directly). DO NOT rely on this long-term. +func (p *GrainLocalPool) UnsafePointerToLegacyMap() uintptr { + // Legacy map removed; retained only to satisfy any transitional callers. + return 0 +} diff --git a/grpc_server.go b/grpc_server.go index 6f82604..44f3de6 100644 --- a/grpc_server.go +++ b/grpc_server.go @@ -32,7 +32,18 @@ func NewCartActorGRPCServer(pool GrainPool, syncedPool *SyncedPool) *cartActorGR // applyMutation routes a single cart mutation to the target grain (used by per-mutation RPC handlers). func (s *cartActorGRPCServer) applyMutation(cartID string, mutation interface{}) *messages.CartMutationReply { - grain, err := s.pool.Apply(ToCartId(cartID), mutation) + // Canonicalize or preserve legacy id (do NOT hash-rewrite legacy textual ids) + cid, _, wasBase62, cerr := CanonicalizeOrLegacy(cartID) + if cerr != nil { + return &messages.CartMutationReply{ + StatusCode: 500, + Result: &messages.CartMutationReply_Error{Error: fmt.Sprintf("cart_id canonicalization failed: %v", cerr)}, + ServerTimestamp: time.Now().Unix(), + } + } + _ = wasBase62 // placeholder; future: propagate canonical id in reply metadata + legacy := CartIDToLegacy(cid) + grain, err := s.pool.Apply(legacy, mutation) if err != nil { return &messages.CartMutationReply{ StatusCode: 500, @@ -159,9 +170,17 @@ func (s *cartActorGRPCServer) GetState(ctx context.Context, req *messages.StateR Result: &messages.StateReply_Error{Error: "cart_id is required"}, }, nil } - cartID := ToCartId(req.GetCartId()) + // Canonicalize / upgrade incoming cart id (preserve legacy strings) + cid, _, _, cerr := CanonicalizeOrLegacy(req.GetCartId()) + if cerr != nil { + return &messages.StateReply{ + StatusCode: 500, + Result: &messages.StateReply_Error{Error: fmt.Sprintf("cart_id canonicalization failed: %v", cerr)}, + }, nil + } + legacy := CartIDToLegacy(cid) - grain, err := s.pool.Get(cartID) + grain, err := s.pool.Get(legacy) if err != nil { return &messages.StateReply{ StatusCode: 500, @@ -212,47 +231,18 @@ func (s *cartActorGRPCServer) Negotiate(ctx context.Context, req *messages.Negot // ControlPlane: GetCartIds (locally owned carts only) func (s *cartActorGRPCServer) GetCartIds(ctx context.Context, _ *messages.Empty) (*messages.CartIdsReply, error) { - ids := make([]string, 0, len(s.syncedPool.local.grains)) s.syncedPool.local.mu.RLock() - for id, g := range s.syncedPool.local.grains { - if g != nil { - ids = append(ids, id.String()) + ids := make([]string, 0, len(s.syncedPool.local.grains)) + for _, g := range s.syncedPool.local.grains { + if g == nil { + continue } + ids = append(ids, g.GetId().String()) } s.syncedPool.local.mu.RUnlock() return &messages.CartIdsReply{CartIds: ids}, nil } -// ControlPlane: ConfirmOwner (simple always-accept implementation) -// Future enhancement: add fencing / versioning & validate current holder. -func (s *cartActorGRPCServer) ConfirmOwner(ctx context.Context, req *messages.OwnerChangeRequest) (*messages.OwnerChangeAck, error) { - if req.GetCartId() == "" || req.GetNewHost() == "" { - return &messages.OwnerChangeAck{ - Accepted: false, - Message: "cart_id and new_host required", - }, nil - } - // If we are *not* the new host and currently have a local grain, we: - // 1. Drop any local grain (relinquish ownership) - // 2. Spawn (or refresh) a remote proxy pointing to the new owner so - // subsequent mutations from this node route correctly. - if req.GetNewHost() != s.syncedPool.Hostname { - cid := ToCartId(req.GetCartId()) - // Drop local ownership if present. - s.syncedPool.local.mu.Lock() - delete(s.syncedPool.local.grains, cid) - s.syncedPool.local.mu.Unlock() - - // Ensure a remote proxy exists for the new owner. SpawnRemoteGrain will - // no-op if host unknown and attempt AddRemote asynchronously. - s.syncedPool.SpawnRemoteGrain(cid, req.GetNewHost()) - } - return &messages.OwnerChangeAck{ - Accepted: true, - Message: "accepted", - }, nil -} - // ControlPlane: Closing (peer shutdown notification) func (s *cartActorGRPCServer) Closing(ctx context.Context, req *messages.ClosingNotice) (*messages.OwnerChangeAck, error) { if req.GetHost() != "" { diff --git a/multi_node_ownership_test.go b/multi_node_ownership_test.go index 3400f0b..ee3888f 100644 --- a/multi_node_ownership_test.go +++ b/multi_node_ownership_test.go @@ -81,13 +81,17 @@ func TestMultiNodeOwnershipNegotiation(t *testing.T) { link(syncedA, hostB, addrB) link(syncedB, hostA, addrA) + // Rebuild rings after manual cross-link so deterministic ownership works immediately. + syncedA.ForceRingRefresh() + syncedB.ForceRingRefresh() + // Allow brief stabilization (control plane pings / no real negotiation needed here). time.Sleep(200 * time.Millisecond) // Create a deterministic cart id for test readability. cartID := ToCartId(fmt.Sprintf("cart-%d", time.Now().UnixNano())) - // Mutation payload (local ownership claim expected on nodeA). + // Mutation payload (ring-determined ownership; no assumption about which node owns). addItem := &messages.AddItem{ ItemId: 1, Quantity: 1, @@ -101,31 +105,53 @@ func TestMultiNodeOwnershipNegotiation(t *testing.T) { Country: "se", } - // Apply mutation on nodeA (should create local grain + claim ownership). - if _, err := syncedA.Apply(cartID, addItem); err != nil { - t.Fatalf("nodeA Apply addItem error: %v", err) + // Determine ring owner and set primary / secondary references. + ownerHost := syncedA.DebugOwnerHost(cartID) + var ownerSynced, otherSynced *SyncedPool + var ownerPool, otherPool *GrainLocalPool + switch ownerHost { + case hostA: + ownerSynced, ownerPool = syncedA, poolA + otherSynced, otherPool = syncedB, poolB + case hostB: + ownerSynced, ownerPool = syncedB, poolB + otherSynced, otherPool = syncedA, poolA + default: + t.Fatalf("unexpected ring owner %s (expected %s or %s)", ownerHost, hostA, hostB) } - // Validate nodeA local pool has the grain. - if _, ok := poolA.grains[cartID]; !ok { - t.Fatalf("nodeA expected local grain ownership but grain missing") + // Apply mutation on the ring-designated owner. + if _, err := ownerSynced.Apply(cartID, addItem); err != nil { + t.Fatalf("owner %s Apply addItem error: %v", ownerHost, err) } - // Attempt to mutate same cart from nodeB (should route remotely, not create local duplication). + // Validate owner pool has the grain and the other does not. + if _, ok := ownerPool.GetGrains()[cartID]; !ok { + t.Fatalf("expected owner %s to have local grain", ownerHost) + } + if _, ok := otherPool.GetGrains()[cartID]; ok { + t.Fatalf("non-owner unexpectedly holds local grain") + } + + // Prepare change mutation to be applied from the non-owner (should route remotely). change := &messages.ChangeQuantity{ - Id: 1, // line id is 1 after first AddItem + Id: 1, // line id after first AddItem Quantity: 2, } - - // Apply on nodeB; if ownership logic works, this will call remote RPC and succeed without creating a local grain. - if _, err := syncedB.Apply(cartID, change); err != nil { - t.Fatalf("nodeB remote Apply changeQuantity error: %v", err) + // Apply remotely via the non-owner. + if _, err := otherSynced.Apply(cartID, change); err != nil { + t.Fatalf("non-owner remote Apply changeQuantity error: %v", err) } - // NodeB should NOT have a local grain (ownership), but may or may not have a remote proxy - // entry in remoteIndex depending on internal propagation. We assert it does NOT hold local. - if _, local := poolB.grains[cartID]; local { - t.Fatalf("nodeB unexpectedly created local grain (ownership duplication)") + // Remote re-mutation already performed via otherSynced; removed duplicate block. + + // NodeB local grain assertion: + // Only assert absence if nodeB is NOT the ring-designated owner. If nodeB is the owner, + // it is expected to have a local grain (previous generic ownership assertions already ran). + if ownerHost != hostB { + if _, local := poolB.GetGrains()[cartID]; local { + t.Fatalf("nodeB unexpectedly created local grain (ownership duplication)") + } } // Fetch state from nodeB to ensure we see updated quantity (2). diff --git a/multi_node_three_test.go b/multi_node_three_test.go index 7a1b5b2..28a398a 100644 --- a/multi_node_three_test.go +++ b/multi_node_three_test.go @@ -10,21 +10,21 @@ import ( "google.golang.org/grpc" ) -// TestThreeNodeMajorityOwnership exercises the revised majority quorum semantics -// with a 3-node cluster (A,B,C). After the quorum refactor, a 3-node cluster -// (all=2 remotes) now requires only floor((all+1)/2) = 1 remote acceptance -// instead of unanimity. Since our current ConfirmOwner implementation always -// accepts, we mainly validate: +// TestThreeNodeMajorityOwnership validates ring-determined ownership and routing +// in a 3-node cluster (A,B,C) using the consistent hashing ring (no quorum RPC). +// The previous ConfirmOwner / quorum semantics have been removed; ownership is +// deterministic and derived from the ring. // -// 1. Ownership is established on the first node that mutates (nodeA). +// It validates: +// 1. The ring selects exactly one primary owner for a new cart. // 2. Other nodes (B,C) do NOT create local grains for the cart. -// 3. Remote proxies are installed on B and C (so they can route mutations). -// 4. A remote mutation from nodeB updates state visible from nodeC. +// 3. Remote proxies are installed lazily so remote mutations can route. +// 4. A remote mutation from one non-owner updates state visible on another. +// 5. Authoritative state on the owner matches remote observations. +// 6. (Future) This scaffolds replication tests when RF>1 is enabled. // -// NOTE: ConfirmOwner currently always accepts, so we cannot directly observe -// a reduced acceptance threshold here without introducing a test hook that -// can force a rejection. This test still validates that multi-node routing -// works under the new quorum rule for N=3 (where previously unanimity was required). +// (Legacy comments about ConfirmOwner acceptance thresholds have been removed.) +// (Function name retained for historical continuity.) func TestThreeNodeMajorityOwnership(t *testing.T) { const ( addrA = "127.0.0.1:18181" @@ -102,6 +102,11 @@ func TestThreeNodeMajorityOwnership(t *testing.T) { link(syncedC, hostA, addrA) link(syncedC, hostB, addrB) + // Rebuild rings after manual linking so ownership resolution is immediate. + syncedA.ForceRingRefresh() + syncedB.ForceRingRefresh() + syncedC.ForceRingRefresh() + // Allow brief stabilization time.Sleep(200 * time.Millisecond) @@ -121,62 +126,71 @@ func TestThreeNodeMajorityOwnership(t *testing.T) { Country: "se", } - // Apply on nodeA (ownership should establish here) - if _, err := syncedA.Apply(cartID, addItem); err != nil { - t.Fatalf("nodeA Apply addItem error: %v", err) + // Determine ring-designated owner (may be any of the three hosts) + ownerPre := syncedA.DebugOwnerHost(cartID) + if ownerPre != hostA && ownerPre != hostB && ownerPre != hostC { + t.Fatalf("ring returned unexpected owner %s (not in set {%s,%s,%s})", ownerPre, hostA, hostB, hostC) + } + var ownerSynced *SyncedPool + var ownerPool *GrainLocalPool + switch ownerPre { + case hostA: + ownerSynced, ownerPool = syncedA, poolA + case hostB: + ownerSynced, ownerPool = syncedB, poolB + case hostC: + ownerSynced, ownerPool = syncedC, poolC + } + // Pick two distinct non-owner nodes for remote mutation assertions + var remote1Synced, remote2Synced *SyncedPool + switch ownerPre { + case hostA: + remote1Synced, remote2Synced = syncedB, syncedC + case hostB: + remote1Synced, remote2Synced = syncedA, syncedC + case hostC: + remote1Synced, remote2Synced = syncedA, syncedB } - // Small wait for ConfirmOwner RPC propagation & remote proxy spawn + // Apply on the ring-designated owner + if _, err := ownerSynced.Apply(cartID, addItem); err != nil { + t.Fatalf("owner %s Apply addItem error: %v", ownerPre, err) + } + + // Small wait for remote proxy spawn (ring ownership already deterministic) time.Sleep(150 * time.Millisecond) // Assert only nodeA has local grain localCount := 0 - if _, ok := poolA.grains[cartID]; ok { + if _, ok := poolA.GetGrains()[cartID]; ok { localCount++ } - if _, ok := poolB.grains[cartID]; ok { + if _, ok := poolB.GetGrains()[cartID]; ok { localCount++ } - if _, ok := poolC.grains[cartID]; ok { + if _, ok := poolC.GetGrains()[cartID]; ok { localCount++ } if localCount != 1 { t.Fatalf("expected exactly 1 local grain, got %d", localCount) } - if _, ok := poolA.grains[cartID]; !ok { - t.Fatalf("expected nodeA to own cart locally") + if _, ok := ownerPool.GetGrains()[cartID]; !ok { + t.Fatalf("expected owner %s to hold local grain", ownerPre) } - // Verify nodeB and nodeC have remote proxies (best-effort; if not present yet, wait briefly) - waitForRemote := func(sp *SyncedPool, label string) { - deadline := time.Now().Add(500 * time.Millisecond) - for { - sp.mu.RLock() - _, remoteOk := sp.remoteIndex[cartID] - sp.mu.RUnlock() - if remoteOk { - return - } - if time.Now().After(deadline) { - t.Fatalf("%s expected remote proxy for cart not found (timeout)", label) - } - time.Sleep(25 * time.Millisecond) - } - } - waitForRemote(syncedB, "nodeB") - waitForRemote(syncedC, "nodeC") + // Remote proxies may not pre-exist; first remote mutation will trigger SpawnRemoteGrain lazily. - // Issue remote mutation from nodeB -> ChangeQuantity (increase) + // Issue remote mutation from one non-owner -> ChangeQuantity (increase) change := &messages.ChangeQuantity{ Id: 1, Quantity: 3, } - if _, err := syncedB.Apply(cartID, change); err != nil { - t.Fatalf("nodeB remote Apply changeQuantity error: %v", err) + if _, err := remote1Synced.Apply(cartID, change); err != nil { + t.Fatalf("remote mutation (remote1) changeQuantity error: %v", err) } // Validate updated state visible via nodeC - stateC, err := syncedC.Get(cartID) + stateC, err := remote2Synced.Get(cartID) if err != nil { t.Fatalf("nodeC Get error: %v", err) } diff --git a/pool-server.go b/pool-server.go index bc72f19..2818279 100644 --- a/pool-server.go +++ b/pool-server.go @@ -264,30 +264,64 @@ func (s *PoolServer) HandleCheckout(w http.ResponseWriter, r *http.Request, id C } func NewCartId() CartId { - id := time.Now().UnixNano() + rand.Int63() - - return ToCartId(fmt.Sprintf("%d", id)) + // Deprecated: legacy random/time based cart id generator. + // Retained for compatibility; new code should prefer canonical CartID path. + cid, err := NewCartID() + if err != nil { + // Fallback to legacy method only if crypto/rand fails + id := time.Now().UnixNano() + rand.Int63() + return ToCartId(fmt.Sprintf("%d", id)) + } + return CartIDToLegacy(cid) } func CookieCartIdHandler(fn func(w http.ResponseWriter, r *http.Request, cartId CartId) error) func(w http.ResponseWriter, r *http.Request) error { return func(w http.ResponseWriter, r *http.Request) error { - var cartId CartId - cartIdCookie := r.CookiesNamed("cartid") - if cartIdCookie == nil || len(cartIdCookie) == 0 { - cartId = NewCartId() - http.SetCookie(w, &http.Cookie{ - Name: "cartid", - Value: cartId.String(), - Secure: r.TLS != nil, - HttpOnly: true, - Path: "/", - Expires: time.Now().AddDate(0, 0, 14), - SameSite: http.SameSiteLaxMode, - }) + // Extract / normalize cookie (preserve legacy textual IDs without rewriting). + var legacy CartId + cookies := r.CookiesNamed("cartid") + if len(cookies) == 0 { + // No cookie -> generate new canonical base62 id. + cid, generated, _, err := CanonicalizeOrLegacy("") + if err != nil { + return fmt.Errorf("failed to generate cart id: %w", err) + } + legacy = CartIDToLegacy(cid) + if generated { + http.SetCookie(w, &http.Cookie{ + Name: "cartid", + Value: cid.String(), + Secure: r.TLS != nil, + HttpOnly: true, + Path: "/", + Expires: time.Now().AddDate(0, 0, 14), + SameSite: http.SameSiteLaxMode, + }) + w.Header().Set("Set-Cart-Id", cid.String()) + } } else { - cartId = ToCartId(cartIdCookie[0].Value) + raw := cookies[0].Value + cid, generated, wasBase62, err := CanonicalizeOrLegacy(raw) + if err != nil { + return fmt.Errorf("failed to canonicalize cart id: %w", err) + } + legacy = CartIDToLegacy(cid) + // Only set a new cookie if we actually generated a brand-new ID (empty input). + // For legacy (non-base62) ids we preserve the original text and do not overwrite. + if generated && wasBase62 { + http.SetCookie(w, &http.Cookie{ + Name: "cartid", + Value: cid.String(), + Secure: r.TLS != nil, + HttpOnly: true, + Path: "/", + Expires: time.Now().AddDate(0, 0, 14), + SameSite: http.SameSiteLaxMode, + }) + w.Header().Set("Set-Cart-Id", cid.String()) + } } - return fn(w, r, cartId) + return fn(w, r, legacy) } } @@ -308,8 +342,18 @@ func (s *PoolServer) RemoveCartCookie(w http.ResponseWriter, r *http.Request, ca func CartIdHandler(fn func(w http.ResponseWriter, r *http.Request, cartId CartId) error) func(w http.ResponseWriter, r *http.Request) error { return func(w http.ResponseWriter, r *http.Request) error { - cartId := ToCartId(r.PathValue("id")) - return fn(w, r, cartId) + raw := r.PathValue("id") + cid, generated, wasBase62, err := CanonicalizeOrLegacy(raw) + if err != nil { + return fmt.Errorf("invalid cart id: %w", err) + } + legacy := CartIDToLegacy(cid) + // Only emit Set-Cart-Id header if we produced a brand-new canonical id + // AND it is base62 (avoid rewriting legacy textual identifiers). + if generated && wasBase62 { + w.Header().Set("Set-Cart-Id", cid.String()) + } + return fn(w, r, legacy) } } diff --git a/proto/control_plane.pb.go b/proto/control_plane.pb.go index b166977..c494e4a 100644 --- a/proto/control_plane.pb.go +++ b/proto/control_plane.pb.go @@ -420,13 +420,12 @@ const file_control_plane_proto_rawDesc = "" + "\baccepted\x18\x01 \x01(\bR\baccepted\x12\x18\n" + "\amessage\x18\x02 \x01(\tR\amessage\"#\n" + "\rClosingNotice\x12\x12\n" + - "\x04host\x18\x01 \x01(\tR\x04host2\xbc\x02\n" + + "\x04host\x18\x01 \x01(\tR\x04host2\xf4\x01\n" + "\fControlPlane\x12,\n" + "\x04Ping\x12\x0f.messages.Empty\x1a\x13.messages.PingReply\x12A\n" + "\tNegotiate\x12\x1a.messages.NegotiateRequest\x1a\x18.messages.NegotiateReply\x125\n" + "\n" + - "GetCartIds\x12\x0f.messages.Empty\x1a\x16.messages.CartIdsReply\x12F\n" + - "\fConfirmOwner\x12\x1c.messages.OwnerChangeRequest\x1a\x18.messages.OwnerChangeAck\x12<\n" + + "GetCartIds\x12\x0f.messages.Empty\x1a\x16.messages.CartIdsReply\x12<\n" + "\aClosing\x12\x17.messages.ClosingNotice\x1a\x18.messages.OwnerChangeAckB.Z,git.tornberg.me/go-cart-actor/proto;messagesb\x06proto3" var ( @@ -456,15 +455,13 @@ var file_control_plane_proto_depIdxs = []int32{ 0, // 0: messages.ControlPlane.Ping:input_type -> messages.Empty 2, // 1: messages.ControlPlane.Negotiate:input_type -> messages.NegotiateRequest 0, // 2: messages.ControlPlane.GetCartIds:input_type -> messages.Empty - 5, // 3: messages.ControlPlane.ConfirmOwner:input_type -> messages.OwnerChangeRequest - 7, // 4: messages.ControlPlane.Closing:input_type -> messages.ClosingNotice - 1, // 5: messages.ControlPlane.Ping:output_type -> messages.PingReply - 3, // 6: messages.ControlPlane.Negotiate:output_type -> messages.NegotiateReply - 4, // 7: messages.ControlPlane.GetCartIds:output_type -> messages.CartIdsReply - 6, // 8: messages.ControlPlane.ConfirmOwner:output_type -> messages.OwnerChangeAck - 6, // 9: messages.ControlPlane.Closing:output_type -> messages.OwnerChangeAck - 5, // [5:10] is the sub-list for method output_type - 0, // [0:5] is the sub-list for method input_type + 7, // 3: messages.ControlPlane.Closing:input_type -> messages.ClosingNotice + 1, // 4: messages.ControlPlane.Ping:output_type -> messages.PingReply + 3, // 5: messages.ControlPlane.Negotiate:output_type -> messages.NegotiateReply + 4, // 6: messages.ControlPlane.GetCartIds:output_type -> messages.CartIdsReply + 6, // 7: messages.ControlPlane.Closing:output_type -> messages.OwnerChangeAck + 4, // [4:8] is the sub-list for method output_type + 0, // [0:4] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/proto/control_plane.proto b/proto/control_plane.proto index 77234cd..c1bfcb9 100644 --- a/proto/control_plane.proto +++ b/proto/control_plane.proto @@ -11,7 +11,7 @@ option go_package = "git.tornberg.me/go-cart-actor/proto;messages"; // Responsibilities: // - Liveness (Ping) // - Membership negotiation (Negotiate) -// - Cart ownership change propagation (ConfirmOwner) +// - Deterministic ring-based ownership (ConfirmOwner RPC removed) // - Cart ID listing for remote grain spawning (GetCartIds) // - Graceful shutdown notifications (Closing) // No authentication / TLS is defined initially (can be added later). @@ -41,13 +41,7 @@ message CartIdsReply { repeated string cart_ids = 1; } -// OwnerChangeRequest notifies peers that ownership of a cart moved (or is moving) to new_host. -message OwnerChangeRequest { - string cart_id = 1; - string new_host = 2; -} - -// OwnerChangeAck indicates acceptance or rejection of an ownership change. +// OwnerChangeAck retained as response type for Closing RPC (ConfirmOwner removed). message OwnerChangeAck { bool accepted = 1; string message = 2; @@ -69,8 +63,7 @@ service ControlPlane { // GetCartIds lists currently owned cart IDs on this node. rpc GetCartIds(Empty) returns (CartIdsReply); - // ConfirmOwner announces/asks peers to acknowledge ownership transfer. - rpc ConfirmOwner(OwnerChangeRequest) returns (OwnerChangeAck); + // ConfirmOwner RPC removed (was legacy ownership acknowledgement; ring-based ownership now authoritative) // Closing announces graceful shutdown so peers can proactively adjust. rpc Closing(ClosingNotice) returns (OwnerChangeAck); diff --git a/proto/control_plane_grpc.pb.go b/proto/control_plane_grpc.pb.go index ddeaf2f..542490f 100644 --- a/proto/control_plane_grpc.pb.go +++ b/proto/control_plane_grpc.pb.go @@ -19,11 +19,10 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - ControlPlane_Ping_FullMethodName = "/messages.ControlPlane/Ping" - ControlPlane_Negotiate_FullMethodName = "/messages.ControlPlane/Negotiate" - ControlPlane_GetCartIds_FullMethodName = "/messages.ControlPlane/GetCartIds" - ControlPlane_ConfirmOwner_FullMethodName = "/messages.ControlPlane/ConfirmOwner" - ControlPlane_Closing_FullMethodName = "/messages.ControlPlane/Closing" + ControlPlane_Ping_FullMethodName = "/messages.ControlPlane/Ping" + ControlPlane_Negotiate_FullMethodName = "/messages.ControlPlane/Negotiate" + ControlPlane_GetCartIds_FullMethodName = "/messages.ControlPlane/GetCartIds" + ControlPlane_Closing_FullMethodName = "/messages.ControlPlane/Closing" ) // ControlPlaneClient is the client API for ControlPlane service. @@ -38,8 +37,6 @@ type ControlPlaneClient interface { Negotiate(ctx context.Context, in *NegotiateRequest, opts ...grpc.CallOption) (*NegotiateReply, error) // GetCartIds lists currently owned cart IDs on this node. GetCartIds(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*CartIdsReply, error) - // ConfirmOwner announces/asks peers to acknowledge ownership transfer. - ConfirmOwner(ctx context.Context, in *OwnerChangeRequest, opts ...grpc.CallOption) (*OwnerChangeAck, error) // Closing announces graceful shutdown so peers can proactively adjust. Closing(ctx context.Context, in *ClosingNotice, opts ...grpc.CallOption) (*OwnerChangeAck, error) } @@ -82,16 +79,6 @@ func (c *controlPlaneClient) GetCartIds(ctx context.Context, in *Empty, opts ... return out, nil } -func (c *controlPlaneClient) ConfirmOwner(ctx context.Context, in *OwnerChangeRequest, opts ...grpc.CallOption) (*OwnerChangeAck, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(OwnerChangeAck) - err := c.cc.Invoke(ctx, ControlPlane_ConfirmOwner_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *controlPlaneClient) Closing(ctx context.Context, in *ClosingNotice, opts ...grpc.CallOption) (*OwnerChangeAck, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(OwnerChangeAck) @@ -114,8 +101,6 @@ type ControlPlaneServer interface { Negotiate(context.Context, *NegotiateRequest) (*NegotiateReply, error) // GetCartIds lists currently owned cart IDs on this node. GetCartIds(context.Context, *Empty) (*CartIdsReply, error) - // ConfirmOwner announces/asks peers to acknowledge ownership transfer. - ConfirmOwner(context.Context, *OwnerChangeRequest) (*OwnerChangeAck, error) // Closing announces graceful shutdown so peers can proactively adjust. Closing(context.Context, *ClosingNotice) (*OwnerChangeAck, error) mustEmbedUnimplementedControlPlaneServer() @@ -137,9 +122,6 @@ func (UnimplementedControlPlaneServer) Negotiate(context.Context, *NegotiateRequ func (UnimplementedControlPlaneServer) GetCartIds(context.Context, *Empty) (*CartIdsReply, error) { return nil, status.Errorf(codes.Unimplemented, "method GetCartIds not implemented") } -func (UnimplementedControlPlaneServer) ConfirmOwner(context.Context, *OwnerChangeRequest) (*OwnerChangeAck, error) { - return nil, status.Errorf(codes.Unimplemented, "method ConfirmOwner not implemented") -} func (UnimplementedControlPlaneServer) Closing(context.Context, *ClosingNotice) (*OwnerChangeAck, error) { return nil, status.Errorf(codes.Unimplemented, "method Closing not implemented") } @@ -218,24 +200,6 @@ func _ControlPlane_GetCartIds_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } -func _ControlPlane_ConfirmOwner_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(OwnerChangeRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ControlPlaneServer).ConfirmOwner(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: ControlPlane_ConfirmOwner_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ControlPlaneServer).ConfirmOwner(ctx, req.(*OwnerChangeRequest)) - } - return interceptor(ctx, in, info, handler) -} - func _ControlPlane_Closing_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(ClosingNotice) if err := dec(in); err != nil { @@ -273,10 +237,6 @@ var ControlPlane_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetCartIds", Handler: _ControlPlane_GetCartIds_Handler, }, - { - MethodName: "ConfirmOwner", - Handler: _ControlPlane_ConfirmOwner_Handler, - }, { MethodName: "Closing", Handler: _ControlPlane_Closing_Handler, diff --git a/ring.go b/ring.go new file mode 100644 index 0000000..5973368 --- /dev/null +++ b/ring.go @@ -0,0 +1,344 @@ +package main + +import ( + "encoding/binary" + "fmt" + "hash/fnv" + "sort" + "strings" + "sync" +) + +// ring.go +// +// Consistent hashing ring skeleton for future integration. +// -------------------------------------------------------- +// This file introduces a minimal, allocation‑light consistent hashing structure +// intended to replace per-cart ownership negotiation. It focuses on: +// * Deterministic lookup: O(log V) via binary search +// * Even(ish) distribution using virtual nodes (vnodes) +// * Epoch / fingerprint tracking to detect membership drift +// +// NOT YET WIRED: +// * SyncedPool integration (ownerForCart, lazy migration) +// * Replication factor > 1 +// * Persistent state migration +// +// Safe to import now; unused until explicit integration code is added. +// +// Design Notes +// ------------ +// - Hosts contribute `vnodesPerHost` virtual nodes. Higher counts smooth +// distribution at cost of memory (V = hosts * vnodesPerHost). +// - Hash of vnode = FNV1a64(host + "#" + index). For improved quality you +// can swap in xxhash or siphash later without changing API (but doing so +// will reshuffle ownership). +// - Cart ownership lookup uses either cartID.Raw() when provided (uniform +// 64-bit space) or falls back to hashing string forms (legacy). +// - Epoch is monotonically increasing; consumers can fence stale results. +// +// Future Extensions +// ----------------- +// - Weighted hosts (proportionally more vnodes). +// - Replication: LookupN(h, n) to return primary + replicas. +// - Streaming / diff-based ring updates (gossip). +// - Hash function injection for deterministic test scenarios. +// +// --------------------------------------------------------------------------- + +// Vnode represents a single virtual node position on the ring. +type Vnode struct { + Hash uint64 // position on the ring + Host string // physical host owning this vnode + Index int // per-host vnode index (0..vnodesPerHost-1) +} + +// Ring is an immutable consistent hash ring snapshot. +type Ring struct { + Epoch uint64 + Vnodes []Vnode // sorted by Hash + hosts []string + fingerprint uint64 // membership fingerprint (order-independent) +} + +// RingBuilder accumulates parameters to construct a Ring. +type RingBuilder struct { + epoch uint64 + vnodesPerHost int + hosts []string +} + +// NewRingBuilder creates a builder with defaults. +func NewRingBuilder() *RingBuilder { + return &RingBuilder{ + vnodesPerHost: 64, // a reasonable default for small clusters + } +} + +func (b *RingBuilder) WithEpoch(e uint64) *RingBuilder { + b.epoch = e + return b +} + +func (b *RingBuilder) WithVnodesPerHost(n int) *RingBuilder { + if n > 0 { + b.vnodesPerHost = n + } + return b +} + +func (b *RingBuilder) WithHosts(hosts []string) *RingBuilder { + uniq := make(map[string]struct{}, len(hosts)) + out := make([]string, 0, len(hosts)) + for _, h := range hosts { + h = strings.TrimSpace(h) + if h == "" { + continue + } + if _, ok := uniq[h]; ok { + continue + } + uniq[h] = struct{}{} + out = append(out, h) + } + sort.Strings(out) + b.hosts = out + return b +} + +func (b *RingBuilder) Build() *Ring { + if len(b.hosts) == 0 { + return &Ring{ + Epoch: b.epoch, + Vnodes: nil, + hosts: nil, + fingerprint: 0, + } + } + + totalVnodes := len(b.hosts) * b.vnodesPerHost + vnodes := make([]Vnode, 0, totalVnodes) + + for _, host := range b.hosts { + for i := 0; i < b.vnodesPerHost; i++ { + h := hashVnode(host, i) + vnodes = append(vnodes, Vnode{ + Hash: h, + Host: host, + Index: i, + }) + } + } + sort.Slice(vnodes, func(i, j int) bool { + if vnodes[i].Hash == vnodes[j].Hash { + // Tie-break deterministically by host then index to avoid instability + if vnodes[i].Host == vnodes[j].Host { + return vnodes[i].Index < vnodes[j].Index + } + return vnodes[i].Host < vnodes[j].Host + } + return vnodes[i].Hash < vnodes[j].Hash + }) + + fp := fingerprintHosts(b.hosts) + + return &Ring{ + Epoch: b.epoch, + Vnodes: vnodes, + hosts: append([]string(nil), b.hosts...), + fingerprint: fp, + } +} + +// Hosts returns a copy of the host list (sorted). +func (r *Ring) Hosts() []string { + if len(r.hosts) == 0 { + return nil + } + cp := make([]string, len(r.hosts)) + copy(cp, r.hosts) + return cp +} + +// Fingerprint returns a hash representing the unordered membership set. +func (r *Ring) Fingerprint() uint64 { + return r.fingerprint +} + +// Empty indicates ring has no vnodes. +func (r *Ring) Empty() bool { + return len(r.Vnodes) == 0 +} + +// Lookup returns the vnode owning a given hash value. +func (r *Ring) Lookup(h uint64) Vnode { + if len(r.Vnodes) == 0 { + return Vnode{} + } + // Binary search: first position with Hash >= h + i := sort.Search(len(r.Vnodes), func(i int) bool { + return r.Vnodes[i].Hash >= h + }) + if i == len(r.Vnodes) { + return r.Vnodes[0] + } + return r.Vnodes[i] +} + +// LookupID selects owner vnode for a CartID (fast path). +func (r *Ring) LookupID(id CartID) Vnode { + return r.Lookup(id.Raw()) +} + +// LookupString hashes an arbitrary string and looks up owner. +func (r *Ring) LookupString(s string) Vnode { + return r.Lookup(hashKeyString(s)) +} + +// LookupN returns up to n distinct host vnodes in ring order +// starting from the primary owner of hash h (for replication). +func (r *Ring) LookupN(h uint64, n int) []Vnode { + if n <= 0 || len(r.Vnodes) == 0 { + return nil + } + if n > len(r.hosts) { + n = len(r.hosts) + } + owners := make([]Vnode, 0, n) + seen := make(map[string]struct{}, n) + + start := r.Lookup(h) + + // Find index of start (can binary search again or linear scan; since we + // already have start.Hash we do another search for clarity) + i := sort.Search(len(r.Vnodes), func(i int) bool { + return r.Vnodes[i].Hash >= start.Hash + }) + if i == len(r.Vnodes) { + i = 0 + } + + for idx := 0; len(owners) < n && idx < len(r.Vnodes); idx++ { + v := r.Vnodes[(i+idx)%len(r.Vnodes)] + if _, ok := seen[v.Host]; ok { + continue + } + seen[v.Host] = struct{}{} + owners = append(owners, v) + } + return owners +} + +// DiffHosts compares this ring's membership to another. +func (r *Ring) DiffHosts(other *Ring) (added []string, removed []string) { + if other == nil { + return r.Hosts(), nil + } + cur := make(map[string]struct{}, len(r.hosts)) + for _, h := range r.hosts { + cur[h] = struct{}{} + } + oth := make(map[string]struct{}, len(other.hosts)) + for _, h := range other.hosts { + oth[h] = struct{}{} + } + for h := range cur { + if _, ok := oth[h]; !ok { + removed = append(removed, h) + } + } + for h := range oth { + if _, ok := cur[h]; !ok { + added = append(added, h) + } + } + sort.Strings(added) + sort.Strings(removed) + return +} + +// ---------------------------- Hash Functions --------------------------------- + +func hashVnode(host string, idx int) uint64 { + h := fnv.New64a() + _, _ = h.Write([]byte(host)) + _, _ = h.Write([]byte{'#'}) + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], uint64(idx)) + _, _ = h.Write(buf[:]) + return h.Sum64() +} + +// hashKeyString provides a stable hash for arbitrary string keys (legacy IDs). +func hashKeyString(s string) uint64 { + h := fnv.New64a() + _, _ = h.Write([]byte(s)) + return h.Sum64() +} + +// fingerprintHosts produces an order-insensitive hash over the host set. +func fingerprintHosts(hosts []string) uint64 { + if len(hosts) == 0 { + return 0 + } + h := fnv.New64a() + for _, host := range hosts { + _, _ = h.Write([]byte(host)) + _, _ = h.Write([]byte{0}) + } + return h.Sum64() +} + +// --------------------------- Thread-Safe Wrapper ----------------------------- +// +// RingRef offers atomic swap + read semantics. SyncedPool can embed or hold +// one of these to manage live ring updates safely. + +type RingRef struct { + mu sync.RWMutex + ring *Ring +} + +func NewRingRef(r *Ring) *RingRef { + return &RingRef{ring: r} +} + +func (rr *RingRef) Get() *Ring { + rr.mu.RLock() + r := rr.ring + rr.mu.RUnlock() + return r +} + +func (rr *RingRef) Set(r *Ring) { + rr.mu.Lock() + rr.ring = r + rr.mu.Unlock() +} + +func (rr *RingRef) LookupID(id CartID) Vnode { + r := rr.Get() + if r == nil { + return Vnode{} + } + return r.LookupID(id) +} + +// ----------------------------- Debug Utilities ------------------------------- + +func (r *Ring) String() string { + var b strings.Builder + fmt.Fprintf(&b, "Ring{epoch=%d vnodes=%d hosts=%d}\n", r.Epoch, len(r.Vnodes), len(r.hosts)) + limit := len(r.Vnodes) + if limit > 16 { + limit = 16 + } + for i := 0; i < limit; i++ { + v := r.Vnodes[i] + fmt.Fprintf(&b, " %02d hash=%016x host=%s idx=%d\n", i, v.Hash, v.Host, v.Index) + } + if len(r.Vnodes) > limit { + fmt.Fprintf(&b, " ... (%d more)\n", len(r.Vnodes)-limit) + } + return b.String() +} diff --git a/synced-pool.go b/synced-pool.go index 62fe49f..a320392 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -15,14 +15,15 @@ import ( ) // SyncedPool coordinates cart grain ownership across nodes using gRPC control plane -// and cart actor services. Legacy frame / TCP code has been removed. +// and cart actor services. // // Responsibilities: // - Local grain access (delegates to GrainLocalPool) // - Remote grain proxy management (RemoteGrainGRPC) // - Cluster membership (AddRemote via discovery + negotiation) -// - Ownership acquisition (quorum via ConfirmOwner RPC) // - Health/ping monitoring & remote removal +// - Ring based deterministic ownership (no runtime negotiation) +// - (Scaffolding) replication factor awareness via ring.LookupN // // Thread-safety: public methods that mutate internal maps lock p.mu (RWMutex). type SyncedPool struct { @@ -40,7 +41,12 @@ type SyncedPool struct { // Discovery handler for re-adding hosts after failures discardedHostHandler *DiscardedHostHandler - // Metrics / instrumentation dependencies already declared globally + // Consistent hashing ring (immutable snapshot reference) + ringRef *RingRef + + // Configuration + vnodesPerHost int + replicationFactor int // RF (>=1). Currently only primary is active; replicas are scaffolding. } // RemoteHostGRPC tracks a remote host's clients & health. @@ -71,8 +77,34 @@ var ( }) remoteLookupCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_remote_lookup_total", - Help: "The total number of remote lookups", + Help: "The total number of remote lookups (legacy counter)", }) + + // Ring / ownership metrics + ringEpoch = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cart_ring_epoch", + Help: "Current consistent hashing ring epoch (fingerprint-based pseudo-epoch)", + }) + ringHosts = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cart_ring_hosts", + Help: "Number of hosts currently in the ring", + }) + ringVnodes = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cart_ring_vnodes", + Help: "Number of virtual nodes in the ring", + }) + ringLookupLocal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_ring_lookup_local_total", + Help: "Ring ownership lookups resolved to the local host", + }) + ringLookupRemote = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_ring_lookup_remote_total", + Help: "Ring ownership lookups resolved to a remote host", + }) + ringHostShare = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cart_ring_host_share", + Help: "Fractional share of ring vnodes per host", + }, []string{"host"}) ) func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) { @@ -82,8 +114,12 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) remoteHosts: make(map[string]*RemoteHostGRPC), remoteIndex: make(map[CartId]Grain), discardedHostHandler: NewDiscardedHostHandler(1338), + vnodesPerHost: 64, // default smoothing factor; adjust if needed + replicationFactor: 1, // RF scaffold; >1 not yet activating replicas } p.discardedHostHandler.SetReconnectHandler(p.AddRemote) + // Initialize empty ring (will be rebuilt after first AddRemote or discovery event) + p.rebuildRing() if discovery != nil { go func() { @@ -175,6 +211,8 @@ func (p *SyncedPool) AddRemote(host string) { p.remoteHosts[host] = remote p.mu.Unlock() connectedRemotes.Set(float64(p.RemoteCount())) + // Rebuild consistent hashing ring including this new host + p.rebuildRing() log.Printf("Connected to remote host %s", host) @@ -222,6 +260,8 @@ func (p *SyncedPool) RemoveHost(host string) { remote.Conn.Close() } connectedRemotes.Set(float64(p.RemoteCount())) + // Rebuild ring after host removal + p.rebuildRing() } // RemoteCount returns number of tracked remote hosts. @@ -302,6 +342,8 @@ func (p *SyncedPool) Negotiate() { } p.mu.RUnlock() + changed := false + for _, r := range remotes { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) reply, err := r.ControlClient.Negotiate(ctx, &proto.NegotiateRequest{KnownHosts: hosts}) @@ -313,12 +355,18 @@ func (p *SyncedPool) Negotiate() { for _, h := range reply.Hosts { if !p.IsKnown(h) { p.AddRemote(h) + changed = true } } } + + // If new hosts were discovered during negotiation, rebuild the ring once at the end. + if changed { + p.rebuildRing() + } } -// ------------------------- Grain Management ---------------------------------- +// ------------------------- Grain / Ring Ownership ---------------------------- // RemoveRemoteGrain removes a remote grain mapping. func (p *SyncedPool) RemoveRemoteGrain(id CartId) { @@ -333,9 +381,9 @@ func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { return } p.mu.Lock() - // If local grain exists, remove it (ownership changed) - if g, ok := p.local.grains[id]; ok && g != nil { - delete(p.local.grains, id) + // If local grain exists (legacy key), remove from local map (ownership moved). + if g, ok := p.local.grains[LegacyToCartKey(id)]; ok && g != nil { + delete(p.local.grains, LegacyToCartKey(id)) } remoteHost, ok := p.remoteHosts[host] if !ok { @@ -362,78 +410,134 @@ func (p *SyncedPool) GetHealthyRemotes() []*RemoteHostGRPC { return ret } -// RequestOwnership attempts to become owner of a cart, requiring quorum. -// On success local grain is (or will be) created; peers spawn remote proxies. -func (p *SyncedPool) RequestOwnership(id CartId) error { - ok := 0 - all := 0 - remotes := p.GetHealthyRemotes() - log.Printf("RequestOwnership start id=%s host=%s healthyRemotes=%d", id, p.Hostname, len(remotes)) - for _, r := range remotes { - log.Printf("RequestOwnership sending ConfirmOwner to host=%s id=%s", r.Host, id) - ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond) - reply, err := r.ControlClient.ConfirmOwner(ctx, &proto.OwnerChangeRequest{ - CartId: id.String(), - NewHost: p.Hostname, - }) - cancel() - all++ - if err != nil || reply == nil || !reply.Accepted { - log.Printf("RequestOwnership negative/failed response from host=%s id=%s err=%v reply=%v", r.Host, id, err, reply) - continue - } - ok++ - log.Printf("RequestOwnership accept from host=%s id=%s (ok=%d all=%d)", r.Host, id, ok, all) +// rebuildRing reconstructs the consistent hashing ring from current host set +// and updates ring-related metrics. +func (p *SyncedPool) rebuildRing() { + p.mu.RLock() + hosts := make([]string, 0, len(p.remoteHosts)+1) + hosts = append(hosts, p.Hostname) + for h := range p.remoteHosts { + hosts = append(hosts, h) + } + p.mu.RUnlock() + + epochSeed := fingerprintHosts(hosts) + builder := NewRingBuilder(). + WithHosts(hosts). + WithEpoch(epochSeed). + WithVnodesPerHost(p.vnodesPerHost) + r := builder.Build() + if p.ringRef == nil { + p.ringRef = NewRingRef(r) + } else { + p.ringRef.Set(r) } - // Quorum rule (majority semantics): - // - Let N = all remotes + 1 (self) - // - We require ok + 1 (implicit self vote) >= floor(N/2)+1 - // => ok >= floor(N/2) - // - Examples: - // N=2 (all=1): threshold=1 (need 1 remote) - // N=3 (all=2): threshold=1 (need 1 remote; previously required 2) - // N=4 (all=3): threshold=2 - // N=5 (all=4): threshold=2 - // - This change allows faster ownership under partial remote availability in small clusters. - log.Printf("RequestOwnership quorum evaluation id=%s host=%s ok=%d all=%d", id, p.Hostname, ok, all) - threshold := (all + 1) / 2 // floor(N/2) - if ok < threshold { - p.removeLocalGrain(id) - log.Printf("RequestOwnership failed quorum id=%s host=%s ok=%d all=%d threshold=%d", id, p.Hostname, ok, all, threshold) - return fmt.Errorf("quorum not reached (ok=%d all=%d threshold=%d)", ok, all, threshold) + // Metrics + ringEpoch.Set(float64(r.Epoch)) + ringHosts.Set(float64(len(r.Hosts()))) + ringVnodes.Set(float64(len(r.Vnodes))) + ringHostShare.Reset() + if len(r.Vnodes) > 0 { + perHost := make(map[string]int) + for _, v := range r.Vnodes { + perHost[v.Host]++ + } + total := float64(len(r.Vnodes)) + for h, c := range perHost { + ringHostShare.WithLabelValues(h).Set(float64(c) / total) + } } - grainSyncCount.Inc() - log.Printf("RequestOwnership success id=%s host=%s ok=%d all=%d threshold=%d", id, p.Hostname, ok, all, threshold) - return nil +} + +// ForceRingRefresh exposes a manual ring rebuild hook (primarily for tests). +func (p *SyncedPool) ForceRingRefresh() { + p.rebuildRing() +} + +// ownersFor returns the ordered list of primary + replica owners for a cart id +// (length min(replicationFactor, #hosts)). Currently only the first (primary) +// is used. This scaffolds future replication work. +func (p *SyncedPool) ownersFor(id CartId) []string { + if p.ringRef == nil || p.replicationFactor <= 0 { + return []string{p.Hostname} + } + r := p.ringRef.Get() + if r == nil || r.Empty() { + return []string{p.Hostname} + } + vnodes := r.LookupN(hashKeyString(id.String()), p.replicationFactor) + out := make([]string, 0, len(vnodes)) + seen := make(map[string]struct{}, len(vnodes)) + for _, v := range vnodes { + if _, ok := seen[v.Host]; ok { + continue + } + seen[v.Host] = struct{}{} + out = append(out, v.Host) + } + if len(out) == 0 { + out = append(out, p.Hostname) + } + return out +} + +// ownerHostFor returns the primary owner host for a given id. +func (p *SyncedPool) ownerHostFor(id CartId) string { + return p.ownersFor(id)[0] +} + +// DebugOwnerHost exposes (for tests) the currently computed primary owner host. +func (p *SyncedPool) DebugOwnerHost(id CartId) string { + return p.ownerHostFor(id) } func (p *SyncedPool) removeLocalGrain(id CartId) { p.mu.Lock() - delete(p.local.grains, id) + delete(p.local.grains, LegacyToCartKey(id)) p.mu.Unlock() } -// getGrain returns a local or remote grain. If absent, it synchronously attempts -// to acquire ownership before spawning a local grain to eliminate the race where -// the first mutation applies before peers have installed remote proxies. +// getGrain returns a local or remote grain. For remote ownership it performs a +// bounded readiness wait (small retries) to reduce first-call failures while +// the remote connection & proxy are initializing. func (p *SyncedPool) getGrain(id CartId) (Grain, error) { + owner := p.ownerHostFor(id) + if owner == p.Hostname { + ringLookupLocal.Inc() + grain, err := p.local.GetGrain(id) + if err != nil { + return nil, err + } + return grain, nil + } + ringLookupRemote.Inc() + + // Kick off remote dial if we don't yet know the owner. + if !p.IsKnown(owner) { + go p.AddRemote(owner) + } + + // Fast path existing proxy p.mu.RLock() - localGrain, isLocal := p.local.grains[id] - remoteGrain, isRemote := p.remoteIndex[id] + if rg, ok := p.remoteIndex[id]; ok { + p.mu.RUnlock() + remoteLookupCount.Inc() + return rg, nil + } p.mu.RUnlock() - if isLocal && localGrain != nil { - return localGrain, nil - } - if isRemote { - remoteLookupCount.Inc() - return remoteGrain, nil - } + const ( + attempts = 5 + sleepPerTry = 40 * time.Millisecond + ) - // Synchronously attempt to claim ownership. If this fails (quorum not reached) - // we re-check for a newly appeared remote proxy (another node became owner). - if err := p.RequestOwnership(id); err != nil { + for attempt := 0; attempt < attempts; attempt++ { + // Try to spawn (idempotent if host already known) + if p.IsKnown(owner) { + p.SpawnRemoteGrain(id, owner) + } + // Check again p.mu.RLock() if rg, ok := p.remoteIndex[id]; ok { p.mu.RUnlock() @@ -441,18 +545,20 @@ func (p *SyncedPool) getGrain(id CartId) (Grain, error) { return rg, nil } p.mu.RUnlock() - return nil, err + + // Last attempt? break to return error. + if attempt == attempts-1 { + break + } + time.Sleep(sleepPerTry) } - // Ownership acquired; now lazily spawn the local grain. - grain, err := p.local.GetGrain(id) - if err != nil { - return nil, err - } - return grain, nil + return nil, fmt.Errorf("remote owner %s not yet available for cart %s (after %d attempts)", owner, id.String(), attempts) } // Apply applies a single mutation to a grain (local or remote). +// Replication (RF>1) scaffolding: future enhancement will fan-out mutations +// to replica owners (best-effort) and reconcile quorum on read. func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { grain, err := p.getGrain(id) if err != nil { @@ -462,6 +568,7 @@ func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) } // Get returns current state of a grain (local or remote). +// Future replication hook: Read-repair or quorum read can be added here. func (p *SyncedPool) Get(id CartId) (*CartGrain, error) { grain, err := p.getGrain(id) if err != nil {