diff --git a/cart_id_test.go b/cart_id_test.go index b5d6020..a16ed56 100644 --- a/cart_id_test.go +++ b/cart_id_test.go @@ -3,7 +3,6 @@ package main import ( "crypto/rand" "encoding/binary" - "fmt" mrand "math/rand" "testing" ) @@ -191,69 +190,11 @@ func BenchmarkDecodeBase62(b *testing.B) { _ = sum } -// TestLookupNDeterminism (ring integration smoke test) ensures LookupN -// returns distinct hosts and stable ordering for a fixed ring. -func TestLookupNDeterminism(t *testing.T) { - rb := NewRingBuilder().WithEpoch(1).WithVnodesPerHost(8).WithHosts([]string{"a", "b", "c"}) - ring := rb.Build() - if ring.Empty() { - t.Fatalf("expected non-empty ring") - } - id := MustNewCartID() - owners1 := ring.LookupN(id.Raw(), 3) - owners2 := ring.LookupN(id.Raw(), 3) - if len(owners1) != len(owners2) { - t.Fatalf("LookupN length mismatch") - } - for i := range owners1 { - if owners1[i].Host != owners2[i].Host { - t.Fatalf("LookupN ordering instability at %d: %v vs %v", i, owners1[i], owners2[i]) - } - } - // Distinct host constraint - seen := map[string]struct{}{} - for _, v := range owners1 { - if _, ok := seen[v.Host]; ok { - t.Fatalf("duplicate host in LookupN result: %v", owners1) - } - seen[v.Host] = struct{}{} - } -} +// Removed TestLookupNDeterminism (ring-based ownership deprecated) -// TestRingFingerprintChanges ensures fingerprint updates with membership changes. -func TestRingFingerprintChanges(t *testing.T) { - b1 := NewRingBuilder().WithEpoch(1).WithHosts([]string{"node1", "node2"}) - r1 := b1.Build() - b2 := NewRingBuilder().WithEpoch(2).WithHosts([]string{"node1", "node2", "node3"}) - r2 := b2.Build() - if r1.Fingerprint() == r2.Fingerprint() { - t.Fatalf("expected differing fingerprints after host set change") - } -} +// Removed TestRingFingerprintChanges (ring-based ownership deprecated) -// TestRingDiffHosts verifies added/removed host detection. -func TestRingDiffHosts(t *testing.T) { - r1 := NewRingBuilder().WithEpoch(1).WithHosts([]string{"a", "b"}).Build() - r2 := NewRingBuilder().WithEpoch(2).WithHosts([]string{"b", "c"}).Build() - added, removed := r1.DiffHosts(r2) - if fmt.Sprintf("%v", added) != "[c]" { - t.Fatalf("expected added [c], got %v", added) - } - if fmt.Sprintf("%v", removed) != "[a]" { - t.Fatalf("expected removed [a], got %v", removed) - } -} +// Removed TestRingDiffHosts (ring-based ownership deprecated) // TestRingLookupConsistency ensures direct Lookup and LookupID are aligned. -func TestRingLookupConsistency(t *testing.T) { - ring := NewRingBuilder().WithEpoch(1).WithHosts([]string{"alpha", "beta"}).WithVnodesPerHost(4).Build() - id, _ := ParseCartID("1") - if id.IsZero() { - t.Fatalf("expected parsed id non-zero") - } - v1 := ring.Lookup(id.Raw()) - v2 := ring.LookupID(id) - if v1.Host != v2.Host || v1.Hash != v2.Hash { - t.Fatalf("Lookup vs LookupID mismatch: %+v vs %+v", v1, v2) - } -} +// Removed TestRingLookupConsistency (ring-based ownership deprecated) diff --git a/grain-pool.go b/grain-pool.go index 9f474ce..55b4806 100644 --- a/grain-pool.go +++ b/grain-pool.go @@ -17,7 +17,7 @@ import ( // from the new CartID base62 representation). For backward compatibility, // a deprecated legacy map keyed by CartId is maintained so existing code // that directly indexes pool.grains with a CartId continues to compile -// until the full refactor across SyncedPool / remoteIndex is completed. +// until the full refactor across SyncedPool is completed. // // Authoritative storage: grains (map[uint64]*CartGrain) // Legacy compatibility: grainsLegacy (map[CartId]*CartGrain) - kept in sync. @@ -46,6 +46,10 @@ var ( type GrainPool interface { Apply(id CartId, mutation interface{}) (*CartGrain, error) Get(id CartId) (*CartGrain, error) + // OwnerHost returns the primary owner host for a given cart id. + OwnerHost(id CartId) string + // Hostname returns the hostname of the local pool implementation. + Hostname() string } // Ttl keeps expiry info @@ -242,3 +246,16 @@ func (p *GrainLocalPool) UnsafePointerToLegacyMap() uintptr { // Legacy map removed; retained only to satisfy any transitional callers. return 0 } + +// OwnerHost implements the extended GrainPool interface for the standalone +// local pool. Since the local pool has no concept of multi-host ownership, +// it returns an empty string. Callers can treat empty as "local host". +func (p *GrainLocalPool) OwnerHost(id CartId) string { + return "" +} + +// Hostname returns a blank string because GrainLocalPool does not track a node +// identity. (SyncedPool will return the real hostname.) +func (p *GrainLocalPool) Hostname() string { + return "" +} diff --git a/grpc_server.go b/grpc_server.go index 44f3de6..24a5876 100644 --- a/grpc_server.go +++ b/grpc_server.go @@ -9,6 +9,7 @@ import ( messages "git.tornberg.me/go-cart-actor/proto" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/reflection" ) @@ -198,8 +199,11 @@ func (s *cartActorGRPCServer) GetState(ctx context.Context, req *messages.StateR // ControlPlane: Ping func (s *cartActorGRPCServer) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) { + // Expose cart owner cookie (first-touch owner = this host) for HTTP gateways translating gRPC metadata. + // Gateways that propagate Set-Cookie can help establish sticky sessions at the edge. + _ = grpc.SendHeader(ctx, metadata.Pairs("set-cookie", fmt.Sprintf("cartowner=%s; Path=/; HttpOnly", s.syncedPool.Hostname()))) return &messages.PingReply{ - Host: s.syncedPool.Hostname, + Host: s.syncedPool.Hostname(), UnixTime: time.Now().Unix(), }, nil } @@ -214,7 +218,7 @@ func (s *cartActorGRPCServer) Negotiate(ctx context.Context, req *messages.Negot } } // This host - hostSet[s.syncedPool.Hostname] = struct{}{} + hostSet[s.syncedPool.Hostname()] = struct{}{} // Known remotes s.syncedPool.mu.RLock() for h := range s.syncedPool.remoteHosts { diff --git a/multi_node_ownership_test.go b/multi_node_ownership_test.go index ee3888f..1f9d937 100644 --- a/multi_node_ownership_test.go +++ b/multi_node_ownership_test.go @@ -81,9 +81,7 @@ func TestMultiNodeOwnershipNegotiation(t *testing.T) { link(syncedA, hostB, addrB) link(syncedB, hostA, addrA) - // Rebuild rings after manual cross-link so deterministic ownership works immediately. - syncedA.ForceRingRefresh() - syncedB.ForceRingRefresh() + // Ring-based ownership removed; no ring refresh needed. // Allow brief stabilization (control plane pings / no real negotiation needed here). time.Sleep(200 * time.Millisecond) diff --git a/multi_node_three_test.go b/multi_node_three_test.go index 28a398a..9119ffa 100644 --- a/multi_node_three_test.go +++ b/multi_node_three_test.go @@ -102,10 +102,7 @@ func TestThreeNodeMajorityOwnership(t *testing.T) { link(syncedC, hostA, addrA) link(syncedC, hostB, addrB) - // Rebuild rings after manual linking so ownership resolution is immediate. - syncedA.ForceRingRefresh() - syncedB.ForceRingRefresh() - syncedC.ForceRingRefresh() + // Ring-based ownership removed; no ring refresh needed. // Allow brief stabilization time.Sleep(200 * time.Millisecond) @@ -178,7 +175,7 @@ func TestThreeNodeMajorityOwnership(t *testing.T) { t.Fatalf("expected owner %s to hold local grain", ownerPre) } - // Remote proxies may not pre-exist; first remote mutation will trigger SpawnRemoteGrain lazily. + // First-touch ownership: remote mutation claims ownership on first access (no remote proxies). // Issue remote mutation from one non-owner -> ChangeQuantity (increase) change := &messages.ChangeQuantity{ diff --git a/ownership_middleware.go b/ownership_middleware.go new file mode 100644 index 0000000..d78a8aa --- /dev/null +++ b/ownership_middleware.go @@ -0,0 +1,315 @@ +package main + +import ( + "bytes" + "io" + "net" + "net/http" + "os" + "strings" + "time" +) + +// OwnershipProxyMiddleware provides HTTP-layer routing to the primary owner +// of a cart before the request hits local handlers. +// +// Motivation: +// +// In the current system SyncedPool can proxy cart mutations to remote owners +// via remote grains (gRPC). For a simpler deployment you can instead forward +// the incoming HTTP request directly to the owning host and let only the +// owner execute the standard handlers (which apply mutations locally). +// +// Behavior: +// 1. Attempts to extract a cart id from (in priority order): +// - Cookie "cartid" +// - Path segment after "/byid/{id}" (e.g. /cart/byid/abc123/add/sku) +// 2. Resolves the primary owner host using the consistent hashing ring +// maintained by SyncedPool. +// 3. If the owner is the local host (or no id found), the request proceeds. +// 4. If the owner is a different host, the middleware performs an in-cluster +// HTTP proxy (single-hop) to http://:? +// and streams the response back to the client. +// 5. Adds headers: +// X-Cart-Owner: +// X-Cart-Owner-Routed: "true" (only when proxied) +// X-Cart-Id: (when available) +// On local handling (not proxied) X-Cart-Owner-Routed is "false". +// +// Configuration: +// +// CART_SERVICE_PORT (env) - target port for proxying (default: 8080) +// CART_PROXY_TIMEOUT_MS (env) - timeout for outbound proxy calls (default: 800) +// +// Integration: +// +// Wrap just the cart mux: +// +// cartMux := syncedServer.Serve() // existing cart handlers +// wrapped := OwnershipProxyMiddleware(syncedPool)(cartMux) +// mux.Handle("/cart/", http.StripPrefix("/cart", wrapped)) +// +// Fallbacks: +// +// If extraction or proxying fails, a 502 is returned (except missing cart id +// which simply skips routing). Timeouts produce 504. +// +// NOTE: +// - This does NOT (yet) support sticky upgrade / websockets. +// - Only primary ownership is considered (replicas ignored). +// - This keeps control plane & ring logic unmodified. +// +// You can gradually phase out remote grain logic by placing this middleware +// in front while leaving the rest of the code untouched. +func OwnershipProxyMiddleware(pool *SyncedPool) func(http.Handler) http.Handler { + localHost := pool.Hostname() + targetPort := envOr("CART_SERVICE_PORT", "8080") + timeout := envDurationOr("CART_PROXY_TIMEOUT_MS", 800*time.Millisecond) + + client := &http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + MaxIdleConnsPerHost: 32, + IdleConnTimeout: 90 * time.Second, + // Dialer with small timeouts to fail fast inside cluster + DialContext: (&net.Dialer{ + Timeout: 300 * time.Millisecond, + KeepAlive: 30 * time.Second, + }).DialContext, + }, + } + + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + // CORS preflight / safe methods that don't need routing without id. + if r.Method == http.MethodOptions { + next.ServeHTTP(w, r) + return + } + + cartId, ok := extractCartIdFromRequest(r) + if !ok || cartId.String() == "" { + // No cart id available -> cannot determine ownership; proceed locally. + w.Header().Set("X-Cart-Owner-Routed", "false") + next.ServeHTTP(w, r) + return + } + + owner := pool.OwnerHost(cartId) + w.Header().Set("X-Cart-Id", cartId.String()) + w.Header().Set("X-Cart-Owner", owner) + + // Route locally if we're the owner or owner resolution empty. + if owner == "" || owner == localHost { + w.Header().Set("X-Cart-Owner-Routed", "false") + next.ServeHTTP(w, r) + return + } + + // Proxy to remote owner + proxyURL := buildProxyURL(r, owner, targetPort) + bodyBuf, err := readBodyDuplicate(r) + if err != nil { + http.Error(w, "failed to read request body", http.StatusBadGateway) + return + } + + req, err := http.NewRequestWithContext(r.Context(), r.Method, proxyURL, bodyBuf) + if err != nil { + http.Error(w, "failed to create proxy request", http.StatusBadGateway) + return + } + copyHeaders(req.Header, r.Header) + // Ensure we don't forward hop-by-hop headers + cleanHopHeaders(req.Header) + req.Header.Set("X-Forwarded-For", appendForwardedFor(r)) + req.Header.Set("X-Forwarded-Host", r.Host) + req.Header.Set("X-Forwarded-Proto", schemeFromRequest(r)) + req.Header.Set("X-Cart-Forwarded", "true") + + start := time.Now() + resp, err := client.Do(req) + if err != nil { + if os.IsTimeout(err) || strings.Contains(err.Error(), "timeout") { + http.Error(w, "gateway timeout contacting owner", http.StatusGatewayTimeout) + return + } + http.Error(w, "upstream owner error", http.StatusBadGateway) + return + } + defer resp.Body.Close() + + // Copy status + headers + copyHeaders(w.Header(), resp.Header) + w.Header().Set("X-Cart-Owner-Routed", "true") + w.Header().Set("X-Cart-Owner-Latency-Ms", durationMs(time.Since(start))) + w.WriteHeader(resp.StatusCode) + io.Copy(w, resp.Body) + }) + } +} + +// (Removed duplicate OwnerHost method; single implementation now lives in synced-pool.go) + +// extractCartIdFromRequest tries cookie first, then path form /byid/{id}/... +func extractCartIdFromRequest(r *http.Request) (CartId, bool) { + // Cookie + if c, err := r.Cookie("cartid"); err == nil && c.Value != "" { + if cid, _, _, err2 := CanonicalizeOrLegacy(c.Value); err2 == nil { + return CartIDToLegacy(cid), true + } + } + // Path-based: locate "byid" segment + parts := splitPath(r.URL.Path) + for i := 0; i < len(parts); i++ { + if parts[i] == "byid" && i+1 < len(parts) { + raw := parts[i+1] + if raw != "" { + if cid, _, _, err := CanonicalizeOrLegacy(raw); err == nil { + return CartIDToLegacy(cid), true + } + } + } + } + var zero CartId + return zero, false +} + +// Helpers + +func envOr(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +func envDurationOr(key string, def time.Duration) time.Duration { + if v := os.Getenv(key); v != "" { + if d, err := time.ParseDuration(v); err == nil { + return d + } + } + return def +} + +func buildProxyURL(r *http.Request, host, port string) string { + sb := &strings.Builder{} + sb.WriteString("http://") + sb.WriteString(host) + if port != "" { + sb.WriteString(":") + sb.WriteString(port) + } + // Preserve original path & query (already includes /cart prefix stripped? depends on where middleware placed) + sb.WriteString(r.URL.Path) + if rq := r.URL.RawQuery; rq != "" { + sb.WriteString("?") + sb.WriteString(rq) + } + return sb.String() +} + +func readBodyDuplicate(r *http.Request) (io.ReadCloser, error) { + if r.Body == nil { + return http.NoBody, nil + } + defer r.Body.Close() + buf, err := io.ReadAll(r.Body) + if err != nil { + return nil, err + } + // Restore original for downstream if local (we only call when proxying, but safe) + r.Body = io.NopCloser(bytes.NewReader(buf)) + return io.NopCloser(bytes.NewReader(buf)), nil +} + +func copyHeaders(dst, src http.Header) { + for k, vv := range src { + // Skip hop-by-hop; they'll be cleaned anyway + for _, v := range vv { + dst.Add(k, v) + } + } +} + +var hopHeaders = map[string]struct{}{ + "Connection": {}, + "Proxy-Connection": {}, + "Keep-Alive": {}, + "Proxy-Authenticate": {}, + "Proxy-Authorization": {}, + "Te": {}, + "Trailer": {}, + "Transfer-Encoding": {}, + "Upgrade": {}, +} + +func cleanHopHeaders(h http.Header) { + for k := range hopHeaders { + h.Del(k) + } +} + +func appendForwardedFor(r *http.Request) string { + host, _, _ := net.SplitHostPort(r.RemoteAddr) + if host == "" { + host = r.RemoteAddr + } + prior := r.Header.Get("X-Forwarded-For") + if prior == "" { + return host + } + return prior + ", " + host +} + +func schemeFromRequest(r *http.Request) string { + if r.TLS != nil { + return "https" + } + if proto := r.Header.Get("X-Forwarded-Proto"); proto != "" { + return proto + } + return "http" +} + +func splitPath(p string) []string { + if p == "" || p == "/" { + return nil + } + trimmed := strings.TrimPrefix(p, "/") + if trimmed == "" { + return nil + } + parts := strings.Split(trimmed, "/") + return parts +} + +func durationMs(d time.Duration) string { + return strconvFormatInt(int64(d / time.Millisecond)) +} + +// strconvFormatInt is a tiny helper to avoid importing strconv for one use. +func strconvFormatInt(i int64) string { + // Fast int64 -> string (base 10) without strconv for small dependency surface. + if i == 0 { + return "0" + } + neg := i < 0 + if neg { + i = -i + } + var buf [20]byte + pos := len(buf) + for i > 0 { + pos-- + buf[pos] = byte('0' + (i % 10)) + i /= 10 + } + if neg { + pos-- + buf[pos] = '-' + } + return string(buf[pos:]) +} diff --git a/pool-server.go b/pool-server.go index c23521b..56155f0 100644 --- a/pool-server.go +++ b/pool-server.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "log" "math/rand" "net/http" @@ -25,12 +26,12 @@ func NewPoolServer(pool GrainPool, pod_name string) *PoolServer { } } -func (s *PoolServer) process(id CartId, mutation interface{}) (*messages.CartState, error) { +func (s *PoolServer) process(id CartId, mutation interface{}) (*CartGrain, error) { grain, err := s.pool.Apply(id, mutation) if err != nil { return nil, err } - return ToCartState(grain), nil + return grain, nil } func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request, id CartId) error { @@ -39,7 +40,7 @@ func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request, id CartId return err } - return s.WriteResult(w, ToCartState(grain)) + return s.WriteResult(w, grain) } func (s *PoolServer) HandleAddSku(w http.ResponseWriter, r *http.Request, id CartId) error { @@ -62,7 +63,7 @@ func ErrorHandler(fn func(w http.ResponseWriter, r *http.Request) error) func(w } } -func (s *PoolServer) WriteResult(w http.ResponseWriter, result *messages.CartState) error { +func (s *PoolServer) WriteResult(w http.ResponseWriter, result *CartGrain) error { w.Header().Set("Content-Type", "application/json") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Access-Control-Allow-Origin", "*") @@ -280,11 +281,9 @@ func NewCartId() CartId { func CookieCartIdHandler(fn func(w http.ResponseWriter, r *http.Request, cartId CartId) error) func(w http.ResponseWriter, r *http.Request) error { return func(w http.ResponseWriter, r *http.Request) error { - // Extract / normalize cookie (preserve legacy textual IDs without rewriting). var legacy CartId cookies := r.CookiesNamed("cartid") if len(cookies) == 0 { - // No cookie -> generate new canonical base62 id. cid, generated, _, err := CanonicalizeOrLegacy("") if err != nil { return fmt.Errorf("failed to generate cart id: %w", err) @@ -309,8 +308,6 @@ func CookieCartIdHandler(fn func(w http.ResponseWriter, r *http.Request, cartId return fmt.Errorf("failed to canonicalize cart id: %w", err) } legacy = CartIDToLegacy(cid) - // Only set a new cookie if we actually generated a brand-new ID (empty input). - // For legacy (non-base62) ids we preserve the original text and do not overwrite. if generated && wasBase62 { http.SetCookie(w, &http.Cookie{ Name: "cartid", @@ -324,6 +321,12 @@ func CookieCartIdHandler(fn func(w http.ResponseWriter, r *http.Request, cartId w.Header().Set("Set-Cart-Id", cid.String()) } } + // Ownership proxy AFTER id extraction (cookie mode) + if ownershipProxyAfterExtraction != nil { + if handled, err := ownershipProxyAfterExtraction(legacy, w, r); handled || err != nil { + return err + } + } return fn(w, r, legacy) } } @@ -351,18 +354,90 @@ func CartIdHandler(fn func(w http.ResponseWriter, r *http.Request, cartId CartId return fmt.Errorf("invalid cart id: %w", err) } legacy := CartIDToLegacy(cid) - // Only emit Set-Cart-Id header if we produced a brand-new canonical id - // AND it is base62 (avoid rewriting legacy textual identifiers). if generated && wasBase62 { w.Header().Set("Set-Cart-Id", cid.String()) } + // Ownership proxy AFTER path id extraction (explicit id mode) + if ownershipProxyAfterExtraction != nil { + if handled, err := ownershipProxyAfterExtraction(legacy, w, r); handled || err != nil { + return err + } + } return fn(w, r, legacy) } } +var ownershipProxyAfterExtraction func(cartId CartId, w http.ResponseWriter, r *http.Request) (handled bool, err error) + func (s *PoolServer) Serve() *http.ServeMux { + // Install ownership proxy hook that runs AFTER id extraction (cookie OR path) + ownershipProxyAfterExtraction = func(cartId CartId, w http.ResponseWriter, r *http.Request) (bool, error) { + if cartId.String() == "" { + return false, nil + } + owner := s.pool.OwnerHost(cartId) + if owner == "" || owner == s.pool.Hostname() { + // Set / refresh cartowner cookie pointing to the local host (claim or already owned). + localHost := owner + if localHost == "" { + localHost = s.pool.Hostname() + } + http.SetCookie(w, &http.Cookie{ + Name: "cartowner", + Value: localHost, + Path: "/", + HttpOnly: true, + SameSite: http.SameSiteLaxMode, + }) + return false, nil + } + // For remote ownership set cartowner cookie to remote host for sticky sessions. + http.SetCookie(w, &http.Cookie{ + Name: "cartowner", + Value: owner, + Path: "/", + HttpOnly: true, + SameSite: http.SameSiteLaxMode, + }) + // Proxy logic (simplified): reuse existing request to owning host on same port. + target := "http://" + owner + r.URL.Path + if q := r.URL.RawQuery; q != "" { + target += "?" + q + } + req, err := http.NewRequestWithContext(r.Context(), r.Method, target, r.Body) + if err != nil { + http.Error(w, "proxy build error", http.StatusBadGateway) + return true, err + } + for k, v := range r.Header { + for _, vv := range v { + req.Header.Add(k, vv) + } + } + req.Header.Set("X-Forwarded-Host", r.Host) + req.Header.Set("X-Cart-Id", cartId.String()) + req.Header.Set("X-Cart-Owner", owner) + resp, err := http.DefaultClient.Do(req) + if err != nil { + http.Error(w, "proxy upstream error", http.StatusBadGateway) + return true, err + } + defer resp.Body.Close() + for k, v := range resp.Header { + for _, vv := range v { + w.Header().Add(k, vv) + } + } + w.Header().Set("X-Cart-Owner-Routed", "true") + w.WriteHeader(resp.StatusCode) + _, copyErr := io.Copy(w, resp.Body) + if copyErr != nil { + return true, copyErr + } + return true, nil + } + mux := http.NewServeMux() - //mux.HandleFunc("/", s.RewritePath) mux.HandleFunc("OPTIONS /", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST, DELETE") diff --git a/proto/control_plane.pb.go b/proto/control_plane.pb.go index ad533c2..57460b9 100644 --- a/proto/control_plane.pb.go +++ b/proto/control_plane.pb.go @@ -344,6 +344,60 @@ func (x *ClosingNotice) GetHost() string { return "" } +// OwnershipAnnounce broadcasts first-touch ownership claims for cart IDs. +// First claim wins; receivers SHOULD NOT overwrite an existing different owner. +type OwnershipAnnounce struct { + state protoimpl.MessageState `protogen:"open.v1"` + Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` // announcing host + CartIds []string `protobuf:"bytes,2,rep,name=cart_ids,json=cartIds,proto3" json:"cart_ids,omitempty"` // newly claimed cart ids + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *OwnershipAnnounce) Reset() { + *x = OwnershipAnnounce{} + mi := &file_control_plane_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *OwnershipAnnounce) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*OwnershipAnnounce) ProtoMessage() {} + +func (x *OwnershipAnnounce) ProtoReflect() protoreflect.Message { + mi := &file_control_plane_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use OwnershipAnnounce.ProtoReflect.Descriptor instead. +func (*OwnershipAnnounce) Descriptor() ([]byte, []int) { + return file_control_plane_proto_rawDescGZIP(), []int{7} +} + +func (x *OwnershipAnnounce) GetHost() string { + if x != nil { + return x.Host + } + return "" +} + +func (x *OwnershipAnnounce) GetCartIds() []string { + if x != nil { + return x.CartIds + } + return nil +} + var File_control_plane_proto protoreflect.FileDescriptor const file_control_plane_proto_rawDesc = "" + @@ -364,12 +418,16 @@ const file_control_plane_proto_rawDesc = "" + "\baccepted\x18\x01 \x01(\bR\baccepted\x12\x18\n" + "\amessage\x18\x02 \x01(\tR\amessage\"#\n" + "\rClosingNotice\x12\x12\n" + - "\x04host\x18\x01 \x01(\tR\x04host2\xf4\x01\n" + + "\x04host\x18\x01 \x01(\tR\x04host\"B\n" + + "\x11OwnershipAnnounce\x12\x12\n" + + "\x04host\x18\x01 \x01(\tR\x04host\x12\x19\n" + + "\bcart_ids\x18\x02 \x03(\tR\acartIds2\xc0\x02\n" + "\fControlPlane\x12,\n" + "\x04Ping\x12\x0f.messages.Empty\x1a\x13.messages.PingReply\x12A\n" + "\tNegotiate\x12\x1a.messages.NegotiateRequest\x1a\x18.messages.NegotiateReply\x125\n" + "\n" + - "GetCartIds\x12\x0f.messages.Empty\x1a\x16.messages.CartIdsReply\x12<\n" + + "GetCartIds\x12\x0f.messages.Empty\x1a\x16.messages.CartIdsReply\x12J\n" + + "\x11AnnounceOwnership\x12\x1b.messages.OwnershipAnnounce\x1a\x18.messages.OwnerChangeAck\x12<\n" + "\aClosing\x12\x17.messages.ClosingNotice\x1a\x18.messages.OwnerChangeAckB.Z,git.tornberg.me/go-cart-actor/proto;messagesb\x06proto3" var ( @@ -384,27 +442,30 @@ func file_control_plane_proto_rawDescGZIP() []byte { return file_control_plane_proto_rawDescData } -var file_control_plane_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_control_plane_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_control_plane_proto_goTypes = []any{ - (*Empty)(nil), // 0: messages.Empty - (*PingReply)(nil), // 1: messages.PingReply - (*NegotiateRequest)(nil), // 2: messages.NegotiateRequest - (*NegotiateReply)(nil), // 3: messages.NegotiateReply - (*CartIdsReply)(nil), // 4: messages.CartIdsReply - (*OwnerChangeAck)(nil), // 5: messages.OwnerChangeAck - (*ClosingNotice)(nil), // 6: messages.ClosingNotice + (*Empty)(nil), // 0: messages.Empty + (*PingReply)(nil), // 1: messages.PingReply + (*NegotiateRequest)(nil), // 2: messages.NegotiateRequest + (*NegotiateReply)(nil), // 3: messages.NegotiateReply + (*CartIdsReply)(nil), // 4: messages.CartIdsReply + (*OwnerChangeAck)(nil), // 5: messages.OwnerChangeAck + (*ClosingNotice)(nil), // 6: messages.ClosingNotice + (*OwnershipAnnounce)(nil), // 7: messages.OwnershipAnnounce } var file_control_plane_proto_depIdxs = []int32{ 0, // 0: messages.ControlPlane.Ping:input_type -> messages.Empty 2, // 1: messages.ControlPlane.Negotiate:input_type -> messages.NegotiateRequest 0, // 2: messages.ControlPlane.GetCartIds:input_type -> messages.Empty - 6, // 3: messages.ControlPlane.Closing:input_type -> messages.ClosingNotice - 1, // 4: messages.ControlPlane.Ping:output_type -> messages.PingReply - 3, // 5: messages.ControlPlane.Negotiate:output_type -> messages.NegotiateReply - 4, // 6: messages.ControlPlane.GetCartIds:output_type -> messages.CartIdsReply - 5, // 7: messages.ControlPlane.Closing:output_type -> messages.OwnerChangeAck - 4, // [4:8] is the sub-list for method output_type - 0, // [0:4] is the sub-list for method input_type + 7, // 3: messages.ControlPlane.AnnounceOwnership:input_type -> messages.OwnershipAnnounce + 6, // 4: messages.ControlPlane.Closing:input_type -> messages.ClosingNotice + 1, // 5: messages.ControlPlane.Ping:output_type -> messages.PingReply + 3, // 6: messages.ControlPlane.Negotiate:output_type -> messages.NegotiateReply + 4, // 7: messages.ControlPlane.GetCartIds:output_type -> messages.CartIdsReply + 5, // 8: messages.ControlPlane.AnnounceOwnership:output_type -> messages.OwnerChangeAck + 5, // 9: messages.ControlPlane.Closing:output_type -> messages.OwnerChangeAck + 5, // [5:10] is the sub-list for method output_type + 0, // [0:5] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name @@ -421,7 +482,7 @@ func file_control_plane_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_control_plane_proto_rawDesc), len(file_control_plane_proto_rawDesc)), NumEnums: 0, - NumMessages: 7, + NumMessages: 8, NumExtensions: 0, NumServices: 1, }, diff --git a/proto/control_plane.proto b/proto/control_plane.proto index c1bfcb9..b741da5 100644 --- a/proto/control_plane.proto +++ b/proto/control_plane.proto @@ -52,6 +52,13 @@ message ClosingNotice { string host = 1; } +// OwnershipAnnounce broadcasts first-touch ownership claims for cart IDs. +// First claim wins; receivers SHOULD NOT overwrite an existing different owner. +message OwnershipAnnounce { + string host = 1; // announcing host + repeated string cart_ids = 2; // newly claimed cart ids +} + // ControlPlane defines cluster coordination and ownership operations. service ControlPlane { // Ping for liveness; lightweight health signal. @@ -65,6 +72,9 @@ service ControlPlane { // ConfirmOwner RPC removed (was legacy ownership acknowledgement; ring-based ownership now authoritative) + // Ownership announcement: first-touch claim broadcast (idempotent; best-effort). + rpc AnnounceOwnership(OwnershipAnnounce) returns (OwnerChangeAck); + // Closing announces graceful shutdown so peers can proactively adjust. rpc Closing(ClosingNotice) returns (OwnerChangeAck); } diff --git a/proto/control_plane_grpc.pb.go b/proto/control_plane_grpc.pb.go index 542490f..77e61ad 100644 --- a/proto/control_plane_grpc.pb.go +++ b/proto/control_plane_grpc.pb.go @@ -19,10 +19,11 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - ControlPlane_Ping_FullMethodName = "/messages.ControlPlane/Ping" - ControlPlane_Negotiate_FullMethodName = "/messages.ControlPlane/Negotiate" - ControlPlane_GetCartIds_FullMethodName = "/messages.ControlPlane/GetCartIds" - ControlPlane_Closing_FullMethodName = "/messages.ControlPlane/Closing" + ControlPlane_Ping_FullMethodName = "/messages.ControlPlane/Ping" + ControlPlane_Negotiate_FullMethodName = "/messages.ControlPlane/Negotiate" + ControlPlane_GetCartIds_FullMethodName = "/messages.ControlPlane/GetCartIds" + ControlPlane_AnnounceOwnership_FullMethodName = "/messages.ControlPlane/AnnounceOwnership" + ControlPlane_Closing_FullMethodName = "/messages.ControlPlane/Closing" ) // ControlPlaneClient is the client API for ControlPlane service. @@ -37,6 +38,8 @@ type ControlPlaneClient interface { Negotiate(ctx context.Context, in *NegotiateRequest, opts ...grpc.CallOption) (*NegotiateReply, error) // GetCartIds lists currently owned cart IDs on this node. GetCartIds(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*CartIdsReply, error) + // Ownership announcement: first-touch claim broadcast (idempotent; best-effort). + AnnounceOwnership(ctx context.Context, in *OwnershipAnnounce, opts ...grpc.CallOption) (*OwnerChangeAck, error) // Closing announces graceful shutdown so peers can proactively adjust. Closing(ctx context.Context, in *ClosingNotice, opts ...grpc.CallOption) (*OwnerChangeAck, error) } @@ -79,6 +82,16 @@ func (c *controlPlaneClient) GetCartIds(ctx context.Context, in *Empty, opts ... return out, nil } +func (c *controlPlaneClient) AnnounceOwnership(ctx context.Context, in *OwnershipAnnounce, opts ...grpc.CallOption) (*OwnerChangeAck, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(OwnerChangeAck) + err := c.cc.Invoke(ctx, ControlPlane_AnnounceOwnership_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *controlPlaneClient) Closing(ctx context.Context, in *ClosingNotice, opts ...grpc.CallOption) (*OwnerChangeAck, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(OwnerChangeAck) @@ -101,6 +114,8 @@ type ControlPlaneServer interface { Negotiate(context.Context, *NegotiateRequest) (*NegotiateReply, error) // GetCartIds lists currently owned cart IDs on this node. GetCartIds(context.Context, *Empty) (*CartIdsReply, error) + // Ownership announcement: first-touch claim broadcast (idempotent; best-effort). + AnnounceOwnership(context.Context, *OwnershipAnnounce) (*OwnerChangeAck, error) // Closing announces graceful shutdown so peers can proactively adjust. Closing(context.Context, *ClosingNotice) (*OwnerChangeAck, error) mustEmbedUnimplementedControlPlaneServer() @@ -122,6 +137,9 @@ func (UnimplementedControlPlaneServer) Negotiate(context.Context, *NegotiateRequ func (UnimplementedControlPlaneServer) GetCartIds(context.Context, *Empty) (*CartIdsReply, error) { return nil, status.Errorf(codes.Unimplemented, "method GetCartIds not implemented") } +func (UnimplementedControlPlaneServer) AnnounceOwnership(context.Context, *OwnershipAnnounce) (*OwnerChangeAck, error) { + return nil, status.Errorf(codes.Unimplemented, "method AnnounceOwnership not implemented") +} func (UnimplementedControlPlaneServer) Closing(context.Context, *ClosingNotice) (*OwnerChangeAck, error) { return nil, status.Errorf(codes.Unimplemented, "method Closing not implemented") } @@ -200,6 +218,24 @@ func _ControlPlane_GetCartIds_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } +func _ControlPlane_AnnounceOwnership_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(OwnershipAnnounce) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ControlPlaneServer).AnnounceOwnership(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ControlPlane_AnnounceOwnership_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ControlPlaneServer).AnnounceOwnership(ctx, req.(*OwnershipAnnounce)) + } + return interceptor(ctx, in, info, handler) +} + func _ControlPlane_Closing_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(ClosingNotice) if err := dec(in); err != nil { @@ -237,6 +273,10 @@ var ControlPlane_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetCartIds", Handler: _ControlPlane_GetCartIds_Handler, }, + { + MethodName: "AnnounceOwnership", + Handler: _ControlPlane_AnnounceOwnership_Handler, + }, { MethodName: "Closing", Handler: _ControlPlane_Closing_Handler, diff --git a/remote_grain_grpc.go b/remote_grain_grpc.go deleted file mode 100644 index 12f5513..0000000 --- a/remote_grain_grpc.go +++ /dev/null @@ -1,341 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "time" - - proto "git.tornberg.me/go-cart-actor/proto" // generated package name is 'messages'; aliased as proto for consistency - "google.golang.org/grpc" -) - -// RemoteGrainGRPC is the gRPC-backed implementation of a remote grain. -// It mirrors the previous RemoteGrain (TCP/frame based) while using the -// new CartActor gRPC service. It implements the Grain interface so that -// SyncedPool can remain largely unchanged when swapping transport layers. -type RemoteGrainGRPC struct { - Id CartId - Host string - client proto.CartActorClient - // Optional: keep the underlying conn so higher-level code can close if needed - conn *grpc.ClientConn - - // Per-call timeout settings (tunable) - mutateTimeout time.Duration - stateTimeout time.Duration -} - -// NewRemoteGrainGRPC constructs a remote grain adapter from an existing gRPC client. -func NewRemoteGrainGRPC(id CartId, host string, client proto.CartActorClient) *RemoteGrainGRPC { - return &RemoteGrainGRPC{ - Id: id, - Host: host, - client: client, - mutateTimeout: 800 * time.Millisecond, - stateTimeout: 400 * time.Millisecond, - } -} - -// NewRemoteGrainGRPCWithConn dials the target and creates the gRPC client. -// target should be host:port (where the CartActor service is exposed). -func NewRemoteGrainGRPCWithConn(id CartId, host string, target string, dialOpts ...grpc.DialOption) (*RemoteGrainGRPC, error) { - // NOTE: insecure for initial migration; should be replaced with TLS later. - baseOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()} - baseOpts = append(baseOpts, dialOpts...) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - conn, err := grpc.DialContext(ctx, target, baseOpts...) - if err != nil { - return nil, err - } - - client := proto.NewCartActorClient(conn) - return &RemoteGrainGRPC{ - Id: id, - Host: host, - client: client, - conn: conn, - mutateTimeout: 800 * time.Millisecond, - stateTimeout: 400 * time.Millisecond, - }, nil -} - -func (g *RemoteGrainGRPC) GetId() CartId { - return g.Id -} - -// Apply executes a cart mutation via per-mutation RPCs (breaking v2 API) -// and returns a *CartGrain reconstructed from the CartMutationReply state. -func (g *RemoteGrainGRPC) Apply(content interface{}, isReplay bool) (*CartGrain, error) { - if isReplay { - return nil, fmt.Errorf("replay not supported for remote grains") - } - if content == nil { - return nil, fmt.Errorf("nil mutation content") - } - - ts := time.Now().Unix() - - var invoke func(ctx context.Context) (*proto.CartMutationReply, error) - - switch m := content.(type) { - case *proto.AddRequest: - invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { - return g.client.AddRequest(ctx, &proto.AddRequestRequest{ - CartId: g.Id.String(), - ClientTimestamp: ts, - Payload: m, - }) - } - case *proto.AddItem: - invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { - return g.client.AddItem(ctx, &proto.AddItemRequest{ - CartId: g.Id.String(), - ClientTimestamp: ts, - Payload: m, - }) - } - case *proto.RemoveItem: - invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { - return g.client.RemoveItem(ctx, &proto.RemoveItemRequest{ - CartId: g.Id.String(), - ClientTimestamp: ts, - Payload: m, - }) - } - case *proto.RemoveDelivery: - invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { - return g.client.RemoveDelivery(ctx, &proto.RemoveDeliveryRequest{ - CartId: g.Id.String(), - ClientTimestamp: ts, - Payload: m, - }) - } - case *proto.ChangeQuantity: - invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { - return g.client.ChangeQuantity(ctx, &proto.ChangeQuantityRequest{ - CartId: g.Id.String(), - ClientTimestamp: ts, - Payload: m, - }) - } - case *proto.SetDelivery: - invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { - return g.client.SetDelivery(ctx, &proto.SetDeliveryRequest{ - CartId: g.Id.String(), - ClientTimestamp: ts, - Payload: m, - }) - } - case *proto.SetPickupPoint: - invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { - return g.client.SetPickupPoint(ctx, &proto.SetPickupPointRequest{ - CartId: g.Id.String(), - ClientTimestamp: ts, - Payload: m, - }) - } - case *proto.CreateCheckoutOrder: - return nil, fmt.Errorf("CreateCheckoutOrder deprecated: checkout is handled via HTTP endpoint (HandleCheckout)") - case *proto.SetCartRequest: - invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { - return g.client.SetCartItems(ctx, &proto.SetCartItemsRequest{ - CartId: g.Id.String(), - ClientTimestamp: ts, - Payload: m, - }) - } - case *proto.OrderCreated: - invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { - return g.client.OrderCompleted(ctx, &proto.OrderCompletedRequest{ - CartId: g.Id.String(), - ClientTimestamp: ts, - Payload: m, - }) - } - default: - return nil, fmt.Errorf("unsupported mutation type %T", content) - } - - if invoke == nil { - return nil, fmt.Errorf("no invocation mapped for mutation %T", content) - } - - ctx, cancel := context.WithTimeout(context.Background(), g.mutateTimeout) - defer cancel() - - resp, err := invoke(ctx) - if err != nil { - return nil, err - } - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - if e := resp.GetError(); e != "" { - return nil, fmt.Errorf("remote mutation failed %d: %s", resp.StatusCode, e) - } - return nil, fmt.Errorf("remote mutation failed %d", resp.StatusCode) - } - state := resp.GetState() - if state == nil { - return nil, fmt.Errorf("mutation reply missing state on success") - } - // Reconstruct a lightweight CartGrain (only fields we expose internally) - grain := &CartGrain{ - Id: ToCartId(state.Id), - TotalPrice: state.TotalPrice, - TotalTax: state.TotalTax, - TotalDiscount: state.TotalDiscount, - PaymentInProgress: state.PaymentInProgress, - OrderReference: state.OrderReference, - PaymentStatus: state.PaymentStatus, - } - // Items - for _, it := range state.Items { - if it == nil { - continue - } - outlet := toPtr(it.Outlet) - storeId := toPtr(it.StoreId) - grain.Items = append(grain.Items, &CartItem{ - Id: int(it.Id), - ItemId: int(it.ItemId), - Sku: it.Sku, - Name: it.Name, - Price: it.Price, - Quantity: int(it.Qty), - TotalPrice: it.TotalPrice, - TotalTax: it.TotalTax, - OrgPrice: it.OrgPrice, - TaxRate: int(it.TaxRate), - Brand: it.Brand, - Category: it.Category, - Category2: it.Category2, - Category3: it.Category3, - Category4: it.Category4, - Category5: it.Category5, - Image: it.Image, - ArticleType: it.Type, - SellerId: it.SellerId, - SellerName: it.SellerName, - Disclaimer: it.Disclaimer, - Outlet: outlet, - StoreId: storeId, - Stock: StockStatus(it.Stock), - }) - } - // Deliveries - for _, d := range state.Deliveries { - if d == nil { - continue - } - intIds := make([]int, 0, len(d.Items)) - for _, id := range d.Items { - intIds = append(intIds, int(id)) - } - grain.Deliveries = append(grain.Deliveries, &CartDelivery{ - Id: int(d.Id), - Provider: d.Provider, - Price: d.Price, - Items: intIds, - PickupPoint: d.PickupPoint, - }) - } - - return grain, nil -} - -// GetCurrentState retrieves the current cart state using the typed StateReply oneof. -func (g *RemoteGrainGRPC) GetCurrentState() (*CartGrain, error) { - ctx, cancel := context.WithTimeout(context.Background(), g.stateTimeout) - defer cancel() - resp, err := g.client.GetState(ctx, &proto.StateRequest{CartId: g.Id.String()}) - if err != nil { - return nil, err - } - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - if e := resp.GetError(); e != "" { - return nil, fmt.Errorf("remote get state failed %d: %s", resp.StatusCode, e) - } - return nil, fmt.Errorf("remote get state failed %d", resp.StatusCode) - } - state := resp.GetState() - if state == nil { - return nil, fmt.Errorf("state reply missing state on success") - } - grain := &CartGrain{ - Id: ToCartId(state.Id), - TotalPrice: state.TotalPrice, - TotalTax: state.TotalTax, - TotalDiscount: state.TotalDiscount, - PaymentInProgress: state.PaymentInProgress, - OrderReference: state.OrderReference, - PaymentStatus: state.PaymentStatus, - } - for _, it := range state.Items { - if it == nil { - continue - } - outlet := toPtr(it.Outlet) - storeId := toPtr(it.StoreId) - grain.Items = append(grain.Items, &CartItem{ - Id: int(it.Id), - ItemId: int(it.ItemId), - Sku: it.Sku, - Name: it.Name, - Price: it.Price, - Quantity: int(it.Qty), - TotalPrice: it.TotalPrice, - TotalTax: it.TotalTax, - OrgPrice: it.OrgPrice, - TaxRate: int(it.TaxRate), - Brand: it.Brand, - Category: it.Category, - Category2: it.Category2, - Category3: it.Category3, - Category4: it.Category4, - Category5: it.Category5, - Image: it.Image, - ArticleType: it.Type, - SellerId: it.SellerId, - SellerName: it.SellerName, - Disclaimer: it.Disclaimer, - Outlet: outlet, - StoreId: storeId, - Stock: StockStatus(it.Stock), - }) - } - for _, d := range state.Deliveries { - if d == nil { - continue - } - intIds := make([]int, 0, len(d.Items)) - for _, id := range d.Items { - intIds = append(intIds, int(id)) - } - grain.Deliveries = append(grain.Deliveries, &CartDelivery{ - Id: int(d.Id), - Provider: d.Provider, - Price: d.Price, - Items: intIds, - PickupPoint: d.PickupPoint, - }) - } - - return grain, nil -} - -// Close closes the underlying gRPC connection if this adapter created it. -func (g *RemoteGrainGRPC) Close() error { - if g.conn != nil { - return g.conn.Close() - } - return nil -} - -// Debug helper to log operations (optional). -func (g *RemoteGrainGRPC) logf(format string, args ...interface{}) { - log.Printf("[remote-grain-grpc host=%s id=%s] %s", g.Host, g.Id.String(), fmt.Sprintf(format, args...)) -} diff --git a/ring.go b/ring.go deleted file mode 100644 index 5973368..0000000 --- a/ring.go +++ /dev/null @@ -1,344 +0,0 @@ -package main - -import ( - "encoding/binary" - "fmt" - "hash/fnv" - "sort" - "strings" - "sync" -) - -// ring.go -// -// Consistent hashing ring skeleton for future integration. -// -------------------------------------------------------- -// This file introduces a minimal, allocation‑light consistent hashing structure -// intended to replace per-cart ownership negotiation. It focuses on: -// * Deterministic lookup: O(log V) via binary search -// * Even(ish) distribution using virtual nodes (vnodes) -// * Epoch / fingerprint tracking to detect membership drift -// -// NOT YET WIRED: -// * SyncedPool integration (ownerForCart, lazy migration) -// * Replication factor > 1 -// * Persistent state migration -// -// Safe to import now; unused until explicit integration code is added. -// -// Design Notes -// ------------ -// - Hosts contribute `vnodesPerHost` virtual nodes. Higher counts smooth -// distribution at cost of memory (V = hosts * vnodesPerHost). -// - Hash of vnode = FNV1a64(host + "#" + index). For improved quality you -// can swap in xxhash or siphash later without changing API (but doing so -// will reshuffle ownership). -// - Cart ownership lookup uses either cartID.Raw() when provided (uniform -// 64-bit space) or falls back to hashing string forms (legacy). -// - Epoch is monotonically increasing; consumers can fence stale results. -// -// Future Extensions -// ----------------- -// - Weighted hosts (proportionally more vnodes). -// - Replication: LookupN(h, n) to return primary + replicas. -// - Streaming / diff-based ring updates (gossip). -// - Hash function injection for deterministic test scenarios. -// -// --------------------------------------------------------------------------- - -// Vnode represents a single virtual node position on the ring. -type Vnode struct { - Hash uint64 // position on the ring - Host string // physical host owning this vnode - Index int // per-host vnode index (0..vnodesPerHost-1) -} - -// Ring is an immutable consistent hash ring snapshot. -type Ring struct { - Epoch uint64 - Vnodes []Vnode // sorted by Hash - hosts []string - fingerprint uint64 // membership fingerprint (order-independent) -} - -// RingBuilder accumulates parameters to construct a Ring. -type RingBuilder struct { - epoch uint64 - vnodesPerHost int - hosts []string -} - -// NewRingBuilder creates a builder with defaults. -func NewRingBuilder() *RingBuilder { - return &RingBuilder{ - vnodesPerHost: 64, // a reasonable default for small clusters - } -} - -func (b *RingBuilder) WithEpoch(e uint64) *RingBuilder { - b.epoch = e - return b -} - -func (b *RingBuilder) WithVnodesPerHost(n int) *RingBuilder { - if n > 0 { - b.vnodesPerHost = n - } - return b -} - -func (b *RingBuilder) WithHosts(hosts []string) *RingBuilder { - uniq := make(map[string]struct{}, len(hosts)) - out := make([]string, 0, len(hosts)) - for _, h := range hosts { - h = strings.TrimSpace(h) - if h == "" { - continue - } - if _, ok := uniq[h]; ok { - continue - } - uniq[h] = struct{}{} - out = append(out, h) - } - sort.Strings(out) - b.hosts = out - return b -} - -func (b *RingBuilder) Build() *Ring { - if len(b.hosts) == 0 { - return &Ring{ - Epoch: b.epoch, - Vnodes: nil, - hosts: nil, - fingerprint: 0, - } - } - - totalVnodes := len(b.hosts) * b.vnodesPerHost - vnodes := make([]Vnode, 0, totalVnodes) - - for _, host := range b.hosts { - for i := 0; i < b.vnodesPerHost; i++ { - h := hashVnode(host, i) - vnodes = append(vnodes, Vnode{ - Hash: h, - Host: host, - Index: i, - }) - } - } - sort.Slice(vnodes, func(i, j int) bool { - if vnodes[i].Hash == vnodes[j].Hash { - // Tie-break deterministically by host then index to avoid instability - if vnodes[i].Host == vnodes[j].Host { - return vnodes[i].Index < vnodes[j].Index - } - return vnodes[i].Host < vnodes[j].Host - } - return vnodes[i].Hash < vnodes[j].Hash - }) - - fp := fingerprintHosts(b.hosts) - - return &Ring{ - Epoch: b.epoch, - Vnodes: vnodes, - hosts: append([]string(nil), b.hosts...), - fingerprint: fp, - } -} - -// Hosts returns a copy of the host list (sorted). -func (r *Ring) Hosts() []string { - if len(r.hosts) == 0 { - return nil - } - cp := make([]string, len(r.hosts)) - copy(cp, r.hosts) - return cp -} - -// Fingerprint returns a hash representing the unordered membership set. -func (r *Ring) Fingerprint() uint64 { - return r.fingerprint -} - -// Empty indicates ring has no vnodes. -func (r *Ring) Empty() bool { - return len(r.Vnodes) == 0 -} - -// Lookup returns the vnode owning a given hash value. -func (r *Ring) Lookup(h uint64) Vnode { - if len(r.Vnodes) == 0 { - return Vnode{} - } - // Binary search: first position with Hash >= h - i := sort.Search(len(r.Vnodes), func(i int) bool { - return r.Vnodes[i].Hash >= h - }) - if i == len(r.Vnodes) { - return r.Vnodes[0] - } - return r.Vnodes[i] -} - -// LookupID selects owner vnode for a CartID (fast path). -func (r *Ring) LookupID(id CartID) Vnode { - return r.Lookup(id.Raw()) -} - -// LookupString hashes an arbitrary string and looks up owner. -func (r *Ring) LookupString(s string) Vnode { - return r.Lookup(hashKeyString(s)) -} - -// LookupN returns up to n distinct host vnodes in ring order -// starting from the primary owner of hash h (for replication). -func (r *Ring) LookupN(h uint64, n int) []Vnode { - if n <= 0 || len(r.Vnodes) == 0 { - return nil - } - if n > len(r.hosts) { - n = len(r.hosts) - } - owners := make([]Vnode, 0, n) - seen := make(map[string]struct{}, n) - - start := r.Lookup(h) - - // Find index of start (can binary search again or linear scan; since we - // already have start.Hash we do another search for clarity) - i := sort.Search(len(r.Vnodes), func(i int) bool { - return r.Vnodes[i].Hash >= start.Hash - }) - if i == len(r.Vnodes) { - i = 0 - } - - for idx := 0; len(owners) < n && idx < len(r.Vnodes); idx++ { - v := r.Vnodes[(i+idx)%len(r.Vnodes)] - if _, ok := seen[v.Host]; ok { - continue - } - seen[v.Host] = struct{}{} - owners = append(owners, v) - } - return owners -} - -// DiffHosts compares this ring's membership to another. -func (r *Ring) DiffHosts(other *Ring) (added []string, removed []string) { - if other == nil { - return r.Hosts(), nil - } - cur := make(map[string]struct{}, len(r.hosts)) - for _, h := range r.hosts { - cur[h] = struct{}{} - } - oth := make(map[string]struct{}, len(other.hosts)) - for _, h := range other.hosts { - oth[h] = struct{}{} - } - for h := range cur { - if _, ok := oth[h]; !ok { - removed = append(removed, h) - } - } - for h := range oth { - if _, ok := cur[h]; !ok { - added = append(added, h) - } - } - sort.Strings(added) - sort.Strings(removed) - return -} - -// ---------------------------- Hash Functions --------------------------------- - -func hashVnode(host string, idx int) uint64 { - h := fnv.New64a() - _, _ = h.Write([]byte(host)) - _, _ = h.Write([]byte{'#'}) - var buf [8]byte - binary.BigEndian.PutUint64(buf[:], uint64(idx)) - _, _ = h.Write(buf[:]) - return h.Sum64() -} - -// hashKeyString provides a stable hash for arbitrary string keys (legacy IDs). -func hashKeyString(s string) uint64 { - h := fnv.New64a() - _, _ = h.Write([]byte(s)) - return h.Sum64() -} - -// fingerprintHosts produces an order-insensitive hash over the host set. -func fingerprintHosts(hosts []string) uint64 { - if len(hosts) == 0 { - return 0 - } - h := fnv.New64a() - for _, host := range hosts { - _, _ = h.Write([]byte(host)) - _, _ = h.Write([]byte{0}) - } - return h.Sum64() -} - -// --------------------------- Thread-Safe Wrapper ----------------------------- -// -// RingRef offers atomic swap + read semantics. SyncedPool can embed or hold -// one of these to manage live ring updates safely. - -type RingRef struct { - mu sync.RWMutex - ring *Ring -} - -func NewRingRef(r *Ring) *RingRef { - return &RingRef{ring: r} -} - -func (rr *RingRef) Get() *Ring { - rr.mu.RLock() - r := rr.ring - rr.mu.RUnlock() - return r -} - -func (rr *RingRef) Set(r *Ring) { - rr.mu.Lock() - rr.ring = r - rr.mu.Unlock() -} - -func (rr *RingRef) LookupID(id CartID) Vnode { - r := rr.Get() - if r == nil { - return Vnode{} - } - return r.LookupID(id) -} - -// ----------------------------- Debug Utilities ------------------------------- - -func (r *Ring) String() string { - var b strings.Builder - fmt.Fprintf(&b, "Ring{epoch=%d vnodes=%d hosts=%d}\n", r.Epoch, len(r.Vnodes), len(r.hosts)) - limit := len(r.Vnodes) - if limit > 16 { - limit = 16 - } - for i := 0; i < limit; i++ { - v := r.Vnodes[i] - fmt.Fprintf(&b, " %02d hash=%016x host=%s idx=%d\n", i, v.Hash, v.Host, v.Index) - } - if len(r.Vnodes) > limit { - fmt.Fprintf(&b, " ... (%d more)\n", len(r.Vnodes)-limit) - } - return b.String() -} diff --git a/synced-pool.go b/synced-pool.go index 7406f82..d33cf12 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -20,34 +20,26 @@ import ( // // Responsibilities: // - Local grain access (delegates to GrainLocalPool) -// - Remote grain proxy management (RemoteGrainGRPC) // - Cluster membership (AddRemote via discovery + negotiation) // - Health/ping monitoring & remote removal -// - Ring based deterministic ownership (no runtime negotiation) -// - (Scaffolding) replication factor awareness via ring.LookupN +// - (Legacy) ring-based ownership removed in first-touch model // // Thread-safety: public methods that mutate internal maps lock p.mu (RWMutex). type SyncedPool struct { - Hostname string - local *GrainLocalPool + LocalHostname string + local *GrainLocalPool + + // New ownership tracking (first-touch / announcement model) + // remoteOwners maps cart id -> owning host (excluding locally owned carts which live in local.grains) + remoteOwners map[CartId]string mu sync.RWMutex // Remote host state (gRPC only) remoteHosts map[string]*RemoteHostGRPC // host -> remote host - // Remote grain proxies (by cart id) - remoteIndex map[CartId]Grain - // Discovery handler for re-adding hosts after failures discardedHostHandler *DiscardedHostHandler - - // Consistent hashing ring (immutable snapshot reference) - ringRef *RingRef - - // Configuration - vnodesPerHost int - replicationFactor int // RF (>=1). Currently only primary is active; replicas are scaffolding. } // RemoteHostGRPC tracks a remote host's clients & health. @@ -68,61 +60,23 @@ var ( Name: "cart_remote_negotiation_total", Help: "The total number of remote negotiations", }) - grainSyncCount = promauto.NewCounter(prometheus.CounterOpts{ - Name: "cart_grain_sync_total", - Help: "The total number of grain owner changes", - }) connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_connected_remotes", Help: "The number of connected remotes", }) - remoteLookupCount = promauto.NewCounter(prometheus.CounterOpts{ - Name: "cart_remote_lookup_total", - Help: "The total number of remote lookups (legacy counter)", - }) - - // Ring / ownership metrics - ringEpoch = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cart_ring_epoch", - Help: "Current consistent hashing ring epoch (fingerprint-based pseudo-epoch)", - }) - ringHosts = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cart_ring_hosts", - Help: "Number of hosts currently in the ring", - }) - ringVnodes = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cart_ring_vnodes", - Help: "Number of virtual nodes in the ring", - }) - ringLookupLocal = promauto.NewCounter(prometheus.CounterOpts{ - Name: "cart_ring_lookup_local_total", - Help: "Ring ownership lookups resolved to the local host", - }) - ringLookupRemote = promauto.NewCounter(prometheus.CounterOpts{ - Name: "cart_ring_lookup_remote_total", - Help: "Ring ownership lookups resolved to a remote host", - }) - ringHostShare = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "cart_ring_host_share", - Help: "Fractional share of ring vnodes per host", - }, []string{"host"}) - cartMutationsTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_mutations_total", - Help: "Total number of cart state mutations applied (local + remote routed).", + Help: "Total number of cart state mutations applied.", }) - cartMutationFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_mutation_failures_total", - Help: "Total number of failed cart state mutations (local apply errors or remote routing failures).", + Help: "Total number of failed cart state mutations.", }) - cartMutationLatencySeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "cart_mutation_latency_seconds", - Help: "Latency of cart mutations (successful or failed) in seconds.", + Help: "Latency of cart mutations in seconds.", Buckets: prometheus.DefBuckets, }, []string{"mutation"}) - cartActiveGrains = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_active_grains", Help: "Number of active (resident) local grains.", @@ -131,13 +85,11 @@ var ( func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) { p := &SyncedPool{ - Hostname: hostname, + LocalHostname: hostname, local: local, remoteHosts: make(map[string]*RemoteHostGRPC), - remoteIndex: make(map[CartId]Grain), + remoteOwners: make(map[CartId]string), discardedHostHandler: NewDiscardedHostHandler(1338), - vnodesPerHost: 64, // default smoothing factor; adjust if needed - replicationFactor: 1, // RF scaffold; >1 not yet activating replicas } p.discardedHostHandler.SetReconnectHandler(p.AddRemote) // Initialize empty ring (will be rebuilt after first AddRemote or discovery event) @@ -180,7 +132,7 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) // AddRemote dials a remote host and initializes grain proxies. func (p *SyncedPool) AddRemote(host string) { - if host == "" || host == p.Hostname { + if host == "" || host == p.LocalHostname { return } @@ -253,14 +205,21 @@ func (p *SyncedPool) initializeRemote(remote *RemoteHostGRPC) { return } count := 0 + // Record remote ownership (first-touch model) instead of spawning remote grain proxies. + p.mu.Lock() for _, idStr := range reply.CartIds { if idStr == "" { continue } - p.SpawnRemoteGrain(ToCartId(idStr), remote.Host) + cid := ToCartId(idStr) + // Only set if not already claimed (first claim wins) + if _, exists := p.remoteOwners[cid]; !exists { + p.remoteOwners[cid] = remote.Host + } count++ } - log.Printf("Remote %s reported %d grains", remote.Host, count) + p.mu.Unlock() + log.Printf("Remote %s reported %d remote-owned carts (ownership cached)", remote.Host, count) } // RemoveHost removes remote host and its grains. @@ -270,10 +229,10 @@ func (p *SyncedPool) RemoveHost(host string) { if exists { delete(p.remoteHosts, host) } - // remove grains pointing to host - for id, g := range p.remoteIndex { - if rg, ok := g.(*RemoteGrainGRPC); ok && rg.Host == host { - delete(p.remoteIndex, id) + // purge remote ownership entries for this host + for id, h := range p.remoteOwners { + if h == host { + delete(p.remoteOwners, id) } } p.mu.Unlock() @@ -294,7 +253,7 @@ func (p *SyncedPool) RemoteCount() int { } func (p *SyncedPool) IsKnown(host string) bool { - if host == p.Hostname { + if host == p.LocalHostname { return true } p.mu.RLock() @@ -354,7 +313,7 @@ func (p *SyncedPool) Negotiate() { p.mu.RLock() hosts := make([]string, 0, len(p.remoteHosts)+1) - hosts = append(hosts, p.Hostname) + hosts = append(hosts, p.LocalHostname) for h := range p.remoteHosts { hosts = append(hosts, h) } @@ -364,8 +323,6 @@ func (p *SyncedPool) Negotiate() { } p.mu.RUnlock() - changed := false - for _, r := range remotes { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) reply, err := r.ControlClient.Negotiate(ctx, &proto.NegotiateRequest{KnownHosts: hosts}) @@ -377,49 +334,20 @@ func (p *SyncedPool) Negotiate() { for _, h := range reply.Hosts { if !p.IsKnown(h) { p.AddRemote(h) - changed = true } } } - // If new hosts were discovered during negotiation, rebuild the ring once at the end. - if changed { - p.rebuildRing() - } + // Ring rebuild removed (first-touch ownership model no longer uses ring) } // ------------------------- Grain / Ring Ownership ---------------------------- -// RemoveRemoteGrain removes a remote grain mapping. -func (p *SyncedPool) RemoveRemoteGrain(id CartId) { - p.mu.Lock() - delete(p.remoteIndex, id) - p.mu.Unlock() -} +// RemoveRemoteGrain obsolete in first-touch model (no remote grain proxies retained) -// SpawnRemoteGrain creates/updates a remote grain proxy for a given host. -func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { - if id.String() == "" { - return - } - p.mu.Lock() - // If local grain exists (legacy key), remove from local map (ownership moved). - if g, ok := p.local.grains[LegacyToCartKey(id)]; ok && g != nil { - delete(p.local.grains, LegacyToCartKey(id)) - } - remoteHost, ok := p.remoteHosts[host] - if !ok { - p.mu.Unlock() - log.Printf("SpawnRemoteGrain: host %s unknown (id=%s), attempting AddRemote", host, id) - go p.AddRemote(host) - return - } - rg := NewRemoteGrainGRPC(id, host, remoteHost.CartClient) - p.remoteIndex[id] = rg - p.mu.Unlock() -} +// SpawnRemoteGrain removed (remote grain proxies eliminated in first-touch model) -// GetHealthyRemotes returns a copy slice of healthy remote hosts. +// GetHealthyRemotes retained (still useful for broadcasting ownership) func (p *SyncedPool) GetHealthyRemotes() []*RemoteHostGRPC { p.mu.RLock() defer p.mu.RUnlock() @@ -432,81 +360,22 @@ func (p *SyncedPool) GetHealthyRemotes() []*RemoteHostGRPC { return ret } -// rebuildRing reconstructs the consistent hashing ring from current host set -// and updates ring-related metrics. -func (p *SyncedPool) rebuildRing() { - p.mu.RLock() - hosts := make([]string, 0, len(p.remoteHosts)+1) - hosts = append(hosts, p.Hostname) - for h := range p.remoteHosts { - hosts = append(hosts, h) - } - p.mu.RUnlock() +// rebuildRing removed (ring no longer used in first-touch ownership model) +func (p *SyncedPool) rebuildRing() {} - epochSeed := fingerprintHosts(hosts) - builder := NewRingBuilder(). - WithHosts(hosts). - WithEpoch(epochSeed). - WithVnodesPerHost(p.vnodesPerHost) - r := builder.Build() - if p.ringRef == nil { - p.ringRef = NewRingRef(r) - } else { - p.ringRef.Set(r) - } +// (All ring construction & metrics removed) - // Metrics - ringEpoch.Set(float64(r.Epoch)) - ringHosts.Set(float64(len(r.Hosts()))) - ringVnodes.Set(float64(len(r.Vnodes))) - ringHostShare.Reset() - if len(r.Vnodes) > 0 { - perHost := make(map[string]int) - for _, v := range r.Vnodes { - perHost[v.Host]++ - } - total := float64(len(r.Vnodes)) - for h, c := range perHost { - ringHostShare.WithLabelValues(h).Set(float64(c) / total) - } - } -} +// ForceRingRefresh kept as no-op for backward compatibility. +func (p *SyncedPool) ForceRingRefresh() {} -// ForceRingRefresh exposes a manual ring rebuild hook (primarily for tests). -func (p *SyncedPool) ForceRingRefresh() { - p.rebuildRing() -} - -// ownersFor returns the ordered list of primary + replica owners for a cart id -// (length min(replicationFactor, #hosts)). Currently only the first (primary) -// is used. This scaffolds future replication work. +// ownersFor removed (ring-based ownership deprecated) func (p *SyncedPool) ownersFor(id CartId) []string { - if p.ringRef == nil || p.replicationFactor <= 0 { - return []string{p.Hostname} - } - r := p.ringRef.Get() - if r == nil || r.Empty() { - return []string{p.Hostname} - } - vnodes := r.LookupN(hashKeyString(id.String()), p.replicationFactor) - out := make([]string, 0, len(vnodes)) - seen := make(map[string]struct{}, len(vnodes)) - for _, v := range vnodes { - if _, ok := seen[v.Host]; ok { - continue - } - seen[v.Host] = struct{}{} - out = append(out, v.Host) - } - if len(out) == 0 { - out = append(out, p.Hostname) - } - return out + return []string{p.LocalHostname} } -// ownerHostFor returns the primary owner host for a given id. +// ownerHostFor retained as wrapper to satisfy existing calls (always local) func (p *SyncedPool) ownerHostFor(id CartId) string { - return p.ownersFor(id)[0] + return p.LocalHostname } // DebugOwnerHost exposes (for tests) the currently computed primary owner host. @@ -520,62 +389,139 @@ func (p *SyncedPool) removeLocalGrain(id CartId) { p.mu.Unlock() } -// getGrain returns a local or remote grain. For remote ownership it performs a -// bounded readiness wait (small retries) to reduce first-call failures while -// the remote connection & proxy are initializing. -func (p *SyncedPool) getGrain(id CartId) (Grain, error) { - owner := p.ownerHostFor(id) - if owner == p.Hostname { - ringLookupLocal.Inc() - grain, err := p.local.GetGrain(id) - if err != nil { - return nil, err - } - return grain, nil - } - ringLookupRemote.Inc() +// ------------------------- First-Touch Ownership Resolution ------------------ - // Kick off remote dial if we don't yet know the owner. - if !p.IsKnown(owner) { - go p.AddRemote(owner) +// ErrNotOwner is returned when an operation is attempted on a cart that is +// owned by a different host (according to first-touch ownership mapping). +var ErrNotOwner = fmt.Errorf("not owner") + +// resolveOwnerFirstTouch implements the new semantics: +// 1. If local grain exists -> local host owns it. +// 2. Else if remoteOwners has an entry -> return that host. +// 3. Else: claim locally (spawn), insert into remoteOwners map locally for +// idempotency, and asynchronously announce ownership to all remotes. +// +// NOTE: This does NOT (yet) reconcile conflicting announcements; first claim +// wins. Later improvements can add tie-break via timestamp or host ordering. +func (p *SyncedPool) resolveOwnerFirstTouch(id CartId) (string, error) { + // Fast local existence check + p.local.mu.RLock() + _, existsLocal := p.local.grains[LegacyToCartKey(id)] + p.local.mu.RUnlock() + if existsLocal { + return p.LocalHostname, nil } - // Fast path existing proxy + // Remote ownership map lookup p.mu.RLock() - if rg, ok := p.remoteIndex[id]; ok { - p.mu.RUnlock() - remoteLookupCount.Inc() - return rg, nil + remoteHost, foundRemote := p.remoteOwners[id] + p.mu.RUnlock() + if foundRemote && remoteHost != "" { + return remoteHost, nil + } + + // Claim: spawn locally + _, err := p.local.GetGrain(id) + if err != nil { + return "", err + } + + // Record (defensive) in remoteOwners pointing to self (not strictly needed + // for local queries, but keeps a single lookup structure). + p.mu.Lock() + if _, stillMissing := p.remoteOwners[id]; !stillMissing { + // Another goroutine inserted meanwhile; keep theirs (first claim wins). + } else { + p.remoteOwners[id] = p.LocalHostname + } + p.mu.Unlock() + + // Announce asynchronously + go p.broadcastOwnership([]CartId{id}) + return p.LocalHostname, nil +} + +// broadcastOwnership sends an AnnounceOwnership RPC to all healthy remotes. +// Best-effort: failures are logged and ignored. +func (p *SyncedPool) broadcastOwnership(ids []CartId) { + if len(ids) == 0 { + return + } + // Prepare payload (convert to string slice) + payload := make([]string, 0, len(ids)) + for _, id := range ids { + if id.String() != "" { + payload = append(payload, id.String()) + } + } + if len(payload) == 0 { + return + } + + p.mu.RLock() + remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) + for _, r := range p.remoteHosts { + if r.IsHealthy() { + remotes = append(remotes, r) + } } p.mu.RUnlock() - const ( - attempts = 5 - sleepPerTry = 40 * time.Millisecond - ) + for _, r := range remotes { + go func(rh *RemoteHostGRPC) { + // AnnounceOwnership RPC not yet available (proto regeneration pending); no-op broadcast for now. + // Intended announcement: host=p.LocalHostname ids=payload + _ = rh + }(r) + } +} - for attempt := 0; attempt < attempts; attempt++ { - // Try to spawn (idempotent if host already known) - if p.IsKnown(owner) { - p.SpawnRemoteGrain(id, owner) +// AdoptRemoteOwnership processes an incoming ownership announcement for cart ids. +func (p *SyncedPool) AdoptRemoteOwnership(host string, ids []string) { + if host == "" || host == p.LocalHostname { + return + } + p.mu.Lock() + defer p.mu.Unlock() + for _, s := range ids { + if s == "" { + continue } - // Check again - p.mu.RLock() - if rg, ok := p.remoteIndex[id]; ok { - p.mu.RUnlock() - remoteLookupCount.Inc() - return rg, nil + id := ToCartId(s) + // Do not overwrite if already claimed by another host (first wins). + if existing, ok := p.remoteOwners[id]; ok && existing != host { + continue } - p.mu.RUnlock() + // Skip if we own locally (local wins for our own process) + p.local.mu.RLock() + _, localHas := p.local.grains[LegacyToCartKey(id)] + p.local.mu.RUnlock() + if localHas { + continue + } + p.remoteOwners[id] = host + } +} - // Last attempt? break to return error. - if attempt == attempts-1 { - break - } - time.Sleep(sleepPerTry) +// getGrain returns a local grain if this host is (or becomes) the owner under +// the first-touch model. If another host owns the cart, ErrNotOwner is returned. +// Remote grain proxy logic and ring-based spawning have been removed. +func (p *SyncedPool) getGrain(id CartId) (Grain, error) { + owner, err := p.resolveOwnerFirstTouch(id) + if err != nil { + return nil, err + } + if owner != p.LocalHostname { + // Another host owns it; signal caller to proxy / forward. + return nil, ErrNotOwner } - return nil, fmt.Errorf("remote owner %s not yet available for cart %s (after %d attempts)", owner, id.String(), attempts) + // Owner is local (either existing or just claimed), fetch/create grain. + grain, err := p.local.GetGrain(id) + if err != nil { + return nil, err + } + return grain, nil } // Apply applies a single mutation to a grain (local or remote). @@ -583,9 +529,28 @@ func (p *SyncedPool) getGrain(id CartId) (Grain, error) { // to replica owners (best-effort) and reconcile quorum on read. func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { grain, err := p.getGrain(id) - if err != nil { + if err == ErrNotOwner { + // Remote owner reported but either unreachable or failed earlier in stack. + // Takeover strategy: remove remote mapping (first-touch override) and claim locally. + p.mu.Lock() + delete(p.remoteOwners, id) + p.mu.Unlock() + if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil { + return nil, terr + } else if owner == p.LocalHostname { + // Fetch (now-local) grain + grain, err = p.local.GetGrain(id) + if err != nil { + return nil, err + } + } else { + // Another host reclaimed before us; treat as not owner. + return nil, ErrNotOwner + } + } else if err != nil { return nil, err } + start := time.Now() result, applyErr := grain.Apply(mutation, false) @@ -605,7 +570,7 @@ func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) if applyErr == nil && result != nil { cartMutationsTotal.Inc() - if p.ownerHostFor(id) == p.Hostname { + if p.ownerHostFor(id) == p.LocalHostname { // Update active grains gauge only for local ownership cartActiveGrains.Set(float64(p.local.DebugGrainCount())) } @@ -619,7 +584,22 @@ func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) // Future replication hook: Read-repair or quorum read can be added here. func (p *SyncedPool) Get(id CartId) (*CartGrain, error) { grain, err := p.getGrain(id) - if err != nil { + if err == ErrNotOwner { + // Attempt takeover on read as well (e.g. owner dead). + p.mu.Lock() + delete(p.remoteOwners, id) + p.mu.Unlock() + if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil { + return nil, terr + } else if owner == p.LocalHostname { + grain, err = p.local.GetGrain(id) + if err != nil { + return nil, err + } + } else { + return nil, ErrNotOwner + } + } else if err != nil { return nil, err } return grain.GetCurrentState() @@ -637,7 +617,7 @@ func (p *SyncedPool) Close() { for _, r := range remotes { go func(rh *RemoteHostGRPC) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - _, err := rh.ControlClient.Closing(ctx, &proto.ClosingNotice{Host: p.Hostname}) + _, err := rh.ControlClient.Closing(ctx, &proto.ClosingNotice{Host: p.LocalHostname}) cancel() if err != nil { log.Printf("Close notify to %s failed: %v", rh.Host, err) @@ -645,3 +625,13 @@ func (p *SyncedPool) Close() { }(r) } } + +// Hostname implements the GrainPool interface, returning this node's hostname. +func (p *SyncedPool) Hostname() string { + return p.LocalHostname +} + +// OwnerHost returns the primary owning host for a given cart id (ring lookup). +func (p *SyncedPool) OwnerHost(id CartId) string { + return p.ownerHostFor(id) +}