diff --git a/discovery.go b/discovery.go index 20952a9..9681929 100644 --- a/discovery.go +++ b/discovery.go @@ -2,6 +2,7 @@ package main import ( "context" + "sync" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -75,3 +76,97 @@ func NewK8sDiscovery(client *kubernetes.Clientset) *K8sDiscovery { client: client, } } + +// MockDiscovery is an in-memory Discovery implementation for tests. +// It allows deterministic injection of host additions/removals without +// depending on Kubernetes API machinery. +type MockDiscovery struct { + mu sync.RWMutex + hosts []string + events chan HostChange + closed bool + started bool +} + +// NewMockDiscovery creates a mock discovery with an initial host list. +func NewMockDiscovery(initial []string) *MockDiscovery { + cp := make([]string, len(initial)) + copy(cp, initial) + return &MockDiscovery{ + hosts: cp, + events: make(chan HostChange, 32), + } +} + +// Discover returns the current host snapshot. +func (m *MockDiscovery) Discover() ([]string, error) { + m.mu.RLock() + defer m.mu.RUnlock() + cp := make([]string, len(m.hosts)) + copy(cp, m.hosts) + return cp, nil +} + +// Watch returns a channel that will receive HostChange events. +// The channel is buffered; AddHost/RemoveHost push events non-blockingly. +func (m *MockDiscovery) Watch() (<-chan HostChange, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.closed { + return nil, context.Canceled + } + m.started = true + return m.events, nil +} + +// AddHost inserts a new host (if absent) and emits an Added event. +func (m *MockDiscovery) AddHost(host string) { + m.mu.Lock() + defer m.mu.Unlock() + if m.closed { + return + } + for _, h := range m.hosts { + if h == host { + return + } + } + m.hosts = append(m.hosts, host) + if m.started { + m.events <- HostChange{Host: host, Type: watch.Added} + } +} + +// RemoveHost removes a host (if present) and emits a Deleted event. +func (m *MockDiscovery) RemoveHost(host string) { + m.mu.Lock() + defer m.mu.Unlock() + if m.closed { + return + } + idx := -1 + for i, h := range m.hosts { + if h == host { + idx = i + break + } + } + if idx == -1 { + return + } + m.hosts = append(m.hosts[:idx], m.hosts[idx+1:]...) + if m.started { + m.events <- HostChange{Host: host, Type: watch.Deleted} + } +} + +// Close closes the event channel (idempotent). +func (m *MockDiscovery) Close() { + m.mu.Lock() + defer m.mu.Unlock() + if m.closed { + return + } + m.closed = true + close(m.events) +} diff --git a/grpc_server.go b/grpc_server.go index 06c220f..6f82604 100644 --- a/grpc_server.go +++ b/grpc_server.go @@ -177,6 +177,93 @@ func (s *cartActorGRPCServer) GetState(ctx context.Context, req *messages.StateR }, nil } +// ControlPlane: Ping +func (s *cartActorGRPCServer) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) { + return &messages.PingReply{ + Host: s.syncedPool.Hostname, + UnixTime: time.Now().Unix(), + }, nil +} + +// ControlPlane: Negotiate (merge host views) +func (s *cartActorGRPCServer) Negotiate(ctx context.Context, req *messages.NegotiateRequest) (*messages.NegotiateReply, error) { + hostSet := make(map[string]struct{}) + // Caller view + for _, h := range req.GetKnownHosts() { + if h != "" { + hostSet[h] = struct{}{} + } + } + // This host + hostSet[s.syncedPool.Hostname] = struct{}{} + // Known remotes + s.syncedPool.mu.RLock() + for h := range s.syncedPool.remoteHosts { + hostSet[h] = struct{}{} + } + s.syncedPool.mu.RUnlock() + + out := make([]string, 0, len(hostSet)) + for h := range hostSet { + out = append(out, h) + } + return &messages.NegotiateReply{Hosts: out}, nil +} + +// ControlPlane: GetCartIds (locally owned carts only) +func (s *cartActorGRPCServer) GetCartIds(ctx context.Context, _ *messages.Empty) (*messages.CartIdsReply, error) { + ids := make([]string, 0, len(s.syncedPool.local.grains)) + s.syncedPool.local.mu.RLock() + for id, g := range s.syncedPool.local.grains { + if g != nil { + ids = append(ids, id.String()) + } + } + s.syncedPool.local.mu.RUnlock() + return &messages.CartIdsReply{CartIds: ids}, nil +} + +// ControlPlane: ConfirmOwner (simple always-accept implementation) +// Future enhancement: add fencing / versioning & validate current holder. +func (s *cartActorGRPCServer) ConfirmOwner(ctx context.Context, req *messages.OwnerChangeRequest) (*messages.OwnerChangeAck, error) { + if req.GetCartId() == "" || req.GetNewHost() == "" { + return &messages.OwnerChangeAck{ + Accepted: false, + Message: "cart_id and new_host required", + }, nil + } + // If we are *not* the new host and currently have a local grain, we: + // 1. Drop any local grain (relinquish ownership) + // 2. Spawn (or refresh) a remote proxy pointing to the new owner so + // subsequent mutations from this node route correctly. + if req.GetNewHost() != s.syncedPool.Hostname { + cid := ToCartId(req.GetCartId()) + // Drop local ownership if present. + s.syncedPool.local.mu.Lock() + delete(s.syncedPool.local.grains, cid) + s.syncedPool.local.mu.Unlock() + + // Ensure a remote proxy exists for the new owner. SpawnRemoteGrain will + // no-op if host unknown and attempt AddRemote asynchronously. + s.syncedPool.SpawnRemoteGrain(cid, req.GetNewHost()) + } + return &messages.OwnerChangeAck{ + Accepted: true, + Message: "accepted", + }, nil +} + +// ControlPlane: Closing (peer shutdown notification) +func (s *cartActorGRPCServer) Closing(ctx context.Context, req *messages.ClosingNotice) (*messages.OwnerChangeAck, error) { + if req.GetHost() != "" { + s.syncedPool.RemoveHost(req.GetHost()) + } + return &messages.OwnerChangeAck{ + Accepted: true, + Message: "removed host", + }, nil +} + // StartGRPCServer configures and starts the unified gRPC server on the given address. // It registers both the CartActor and ControlPlane services. func StartGRPCServer(addr string, pool GrainPool, syncedPool *SyncedPool) (*grpc.Server, error) { diff --git a/main.go b/main.go index a5e33a3..b9fbcae 100644 --- a/main.go +++ b/main.go @@ -377,8 +377,8 @@ func main() { done <- true }() - log.Print("Server started at port 8083") - go http.ListenAndServe(":8083", mux) + log.Print("Server started at port 8080") + go http.ListenAndServe(":8080", mux) <-done } diff --git a/multi_node_ownership_test.go b/multi_node_ownership_test.go new file mode 100644 index 0000000..3400f0b --- /dev/null +++ b/multi_node_ownership_test.go @@ -0,0 +1,156 @@ +package main + +import ( + "context" + "fmt" + "testing" + "time" + + messages "git.tornberg.me/go-cart-actor/proto" + "google.golang.org/grpc" +) + +// TestMultiNodeOwnershipNegotiation spins up two gRPC servers (nodeA, nodeB), +// manually links their SyncedPools (bypassing AddRemote's fixed port assumption), +// and verifies that only one node becomes the owner of a new cart while the +// other can still apply a mutation via the remote proxy path. +// +// NOTE: +// - We manually inject RemoteHostGRPC entries because AddRemote() hard-codes +// port 1337; to run two distinct servers concurrently we need distinct ports. +// - This test asserts single ownership consistency rather than the complete +// quorum semantics (which depend on real discovery + AddRemote). +func TestMultiNodeOwnershipNegotiation(t *testing.T) { + // Allocate distinct ports for the two nodes. + const ( + addrA = "127.0.0.1:18081" + addrB = "127.0.0.1:18082" + hostA = "nodeA" + hostB = "nodeB" + ) + + // Create local grain pools. + poolA := NewGrainLocalPool(1024, time.Minute, spawn) + poolB := NewGrainLocalPool(1024, time.Minute, spawn) + + // Create synced pools (no discovery). + syncedA, err := NewSyncedPool(poolA, hostA, nil) + if err != nil { + t.Fatalf("nodeA NewSyncedPool error: %v", err) + } + syncedB, err := NewSyncedPool(poolB, hostB, nil) + if err != nil { + t.Fatalf("nodeB NewSyncedPool error: %v", err) + } + + // Start gRPC servers (CartActor + ControlPlane) on different ports. + grpcSrvA, err := StartGRPCServer(addrA, poolA, syncedA) + if err != nil { + t.Fatalf("StartGRPCServer A error: %v", err) + } + defer grpcSrvA.GracefulStop() + + grpcSrvB, err := StartGRPCServer(addrB, poolB, syncedB) + if err != nil { + t.Fatalf("StartGRPCServer B error: %v", err) + } + defer grpcSrvB.GracefulStop() + + // Helper to connect one pool to the other's server (manual AddRemote equivalent). + link := func(src *SyncedPool, remoteHost, remoteAddr string) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + conn, dialErr := grpc.DialContext(ctx, remoteAddr, grpc.WithInsecure(), grpc.WithBlock()) + if dialErr != nil { + t.Fatalf("dial %s (%s) failed: %v", remoteHost, remoteAddr, dialErr) + } + cartClient := messages.NewCartActorClient(conn) + controlClient := messages.NewControlPlaneClient(conn) + + src.mu.Lock() + src.remoteHosts[remoteHost] = &RemoteHostGRPC{ + Host: remoteHost, + Conn: conn, + CartClient: cartClient, + ControlClient: controlClient, + } + src.mu.Unlock() + } + + // Cross-link the two pools. + link(syncedA, hostB, addrB) + link(syncedB, hostA, addrA) + + // Allow brief stabilization (control plane pings / no real negotiation needed here). + time.Sleep(200 * time.Millisecond) + + // Create a deterministic cart id for test readability. + cartID := ToCartId(fmt.Sprintf("cart-%d", time.Now().UnixNano())) + + // Mutation payload (local ownership claim expected on nodeA). + addItem := &messages.AddItem{ + ItemId: 1, + Quantity: 1, + Price: 1500, + OrgPrice: 1500, + Sku: "sku-test-multi", + Name: "Multi Node Test", + Image: "/test.png", + Stock: 2, + Tax: 2500, + Country: "se", + } + + // Apply mutation on nodeA (should create local grain + claim ownership). + if _, err := syncedA.Apply(cartID, addItem); err != nil { + t.Fatalf("nodeA Apply addItem error: %v", err) + } + + // Validate nodeA local pool has the grain. + if _, ok := poolA.grains[cartID]; !ok { + t.Fatalf("nodeA expected local grain ownership but grain missing") + } + + // Attempt to mutate same cart from nodeB (should route remotely, not create local duplication). + change := &messages.ChangeQuantity{ + Id: 1, // line id is 1 after first AddItem + Quantity: 2, + } + + // Apply on nodeB; if ownership logic works, this will call remote RPC and succeed without creating a local grain. + if _, err := syncedB.Apply(cartID, change); err != nil { + t.Fatalf("nodeB remote Apply changeQuantity error: %v", err) + } + + // NodeB should NOT have a local grain (ownership), but may or may not have a remote proxy + // entry in remoteIndex depending on internal propagation. We assert it does NOT hold local. + if _, local := poolB.grains[cartID]; local { + t.Fatalf("nodeB unexpectedly created local grain (ownership duplication)") + } + + // Fetch state from nodeB to ensure we see updated quantity (2). + grainStateB, err := syncedB.Get(cartID) + if err != nil { + t.Fatalf("nodeB Get error: %v", err) + } + if len(grainStateB.Items) != 1 || grainStateB.Items[0].Quantity != 2 { + t.Fatalf("nodeB observed inconsistent state: items=%d qty=%d (expected 1 / 2)", + len(grainStateB.Items), + func() int { + if len(grainStateB.Items) == 0 { + return -1 + } + return grainStateB.Items[0].Quantity + }(), + ) + } + + // Cross-check from nodeA (authoritative) to ensure state matches. + grainStateA, err := syncedA.Get(cartID) + if err != nil { + t.Fatalf("nodeA Get error: %v", err) + } + if grainStateA.Items[0].Quantity != 2 { + t.Fatalf("nodeA authoritative state mismatch: expected qty=2 got %d", grainStateA.Items[0].Quantity) + } +} diff --git a/multi_node_three_test.go b/multi_node_three_test.go new file mode 100644 index 0000000..7a1b5b2 --- /dev/null +++ b/multi_node_three_test.go @@ -0,0 +1,290 @@ +package main + +import ( + "context" + "fmt" + "testing" + "time" + + messages "git.tornberg.me/go-cart-actor/proto" + "google.golang.org/grpc" +) + +// TestThreeNodeMajorityOwnership exercises the revised majority quorum semantics +// with a 3-node cluster (A,B,C). After the quorum refactor, a 3-node cluster +// (all=2 remotes) now requires only floor((all+1)/2) = 1 remote acceptance +// instead of unanimity. Since our current ConfirmOwner implementation always +// accepts, we mainly validate: +// +// 1. Ownership is established on the first node that mutates (nodeA). +// 2. Other nodes (B,C) do NOT create local grains for the cart. +// 3. Remote proxies are installed on B and C (so they can route mutations). +// 4. A remote mutation from nodeB updates state visible from nodeC. +// +// NOTE: ConfirmOwner currently always accepts, so we cannot directly observe +// a reduced acceptance threshold here without introducing a test hook that +// can force a rejection. This test still validates that multi-node routing +// works under the new quorum rule for N=3 (where previously unanimity was required). +func TestThreeNodeMajorityOwnership(t *testing.T) { + const ( + addrA = "127.0.0.1:18181" + addrB = "127.0.0.1:18182" + addrC = "127.0.0.1:18183" + hostA = "nodeA3" + hostB = "nodeB3" + hostC = "nodeC3" + ) + + // Local grain pools + poolA := NewGrainLocalPool(1024, time.Minute, spawn) + poolB := NewGrainLocalPool(1024, time.Minute, spawn) + poolC := NewGrainLocalPool(1024, time.Minute, spawn) + + // Synced pools (no discovery) + syncedA, err := NewSyncedPool(poolA, hostA, nil) + if err != nil { + t.Fatalf("nodeA NewSyncedPool error: %v", err) + } + syncedB, err := NewSyncedPool(poolB, hostB, nil) + if err != nil { + t.Fatalf("nodeB NewSyncedPool error: %v", err) + } + syncedC, err := NewSyncedPool(poolC, hostC, nil) + if err != nil { + t.Fatalf("nodeC NewSyncedPool error: %v", err) + } + + // Start gRPC servers + grpcSrvA, err := StartGRPCServer(addrA, poolA, syncedA) + if err != nil { + t.Fatalf("StartGRPCServer A error: %v", err) + } + defer grpcSrvA.GracefulStop() + grpcSrvB, err := StartGRPCServer(addrB, poolB, syncedB) + if err != nil { + t.Fatalf("StartGRPCServer B error: %v", err) + } + defer grpcSrvB.GracefulStop() + grpcSrvC, err := StartGRPCServer(addrC, poolC, syncedC) + if err != nil { + t.Fatalf("StartGRPCServer C error: %v", err) + } + defer grpcSrvC.GracefulStop() + + // Helper for manual cross-link (since AddRemote assumes fixed port) + link := func(src *SyncedPool, remoteHost, remoteAddr string) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + conn, dialErr := grpc.DialContext(ctx, remoteAddr, grpc.WithInsecure(), grpc.WithBlock()) + if dialErr != nil { + t.Fatalf("dial %s (%s) failed: %v", remoteHost, remoteAddr, dialErr) + } + cartClient := messages.NewCartActorClient(conn) + controlClient := messages.NewControlPlaneClient(conn) + + src.mu.Lock() + src.remoteHosts[remoteHost] = &RemoteHostGRPC{ + Host: remoteHost, + Conn: conn, + CartClient: cartClient, + ControlClient: controlClient, + } + src.mu.Unlock() + } + + // Full mesh (each node knows all others) + link(syncedA, hostB, addrB) + link(syncedA, hostC, addrC) + + link(syncedB, hostA, addrA) + link(syncedB, hostC, addrC) + + link(syncedC, hostA, addrA) + link(syncedC, hostB, addrB) + + // Allow brief stabilization + time.Sleep(200 * time.Millisecond) + + // Deterministic-ish cart id + cartID := ToCartId(fmt.Sprintf("cart3-%d", time.Now().UnixNano())) + + addItem := &messages.AddItem{ + ItemId: 10, + Quantity: 1, + Price: 5000, + OrgPrice: 5000, + Sku: "sku-3node", + Name: "Three Node Test", + Image: "/t.png", + Stock: 10, + Tax: 2500, + Country: "se", + } + + // Apply on nodeA (ownership should establish here) + if _, err := syncedA.Apply(cartID, addItem); err != nil { + t.Fatalf("nodeA Apply addItem error: %v", err) + } + + // Small wait for ConfirmOwner RPC propagation & remote proxy spawn + time.Sleep(150 * time.Millisecond) + + // Assert only nodeA has local grain + localCount := 0 + if _, ok := poolA.grains[cartID]; ok { + localCount++ + } + if _, ok := poolB.grains[cartID]; ok { + localCount++ + } + if _, ok := poolC.grains[cartID]; ok { + localCount++ + } + if localCount != 1 { + t.Fatalf("expected exactly 1 local grain, got %d", localCount) + } + if _, ok := poolA.grains[cartID]; !ok { + t.Fatalf("expected nodeA to own cart locally") + } + + // Verify nodeB and nodeC have remote proxies (best-effort; if not present yet, wait briefly) + waitForRemote := func(sp *SyncedPool, label string) { + deadline := time.Now().Add(500 * time.Millisecond) + for { + sp.mu.RLock() + _, remoteOk := sp.remoteIndex[cartID] + sp.mu.RUnlock() + if remoteOk { + return + } + if time.Now().After(deadline) { + t.Fatalf("%s expected remote proxy for cart not found (timeout)", label) + } + time.Sleep(25 * time.Millisecond) + } + } + waitForRemote(syncedB, "nodeB") + waitForRemote(syncedC, "nodeC") + + // Issue remote mutation from nodeB -> ChangeQuantity (increase) + change := &messages.ChangeQuantity{ + Id: 1, + Quantity: 3, + } + if _, err := syncedB.Apply(cartID, change); err != nil { + t.Fatalf("nodeB remote Apply changeQuantity error: %v", err) + } + + // Validate updated state visible via nodeC + stateC, err := syncedC.Get(cartID) + if err != nil { + t.Fatalf("nodeC Get error: %v", err) + } + if len(stateC.Items) != 1 || stateC.Items[0].Quantity != 3 { + t.Fatalf("nodeC observed state mismatch: items=%d qty=%d (expected 1 / 3)", + len(stateC.Items), + func() int { + if len(stateC.Items) == 0 { + return -1 + } + return stateC.Items[0].Quantity + }(), + ) + } + + // Cross-check authoritative nodeA + stateA, err := syncedA.Get(cartID) + if err != nil { + t.Fatalf("nodeA Get error: %v", err) + } + if stateA.Items[0].Quantity != 3 { + t.Fatalf("nodeA authoritative state mismatch: expected qty=3 got %d", stateA.Items[0].Quantity) + } +} + +// TestThreeNodeDiscoveryMajorityOwnership (placeholder) +// This test is a scaffold demonstrating how a MockDiscovery would be wired +// once AddRemote supports host:port (currently hard-coded to :1337). +// It is skipped to avoid flakiness / false negatives until the production +// AddRemote logic is enhanced to parse dynamic ports or the test harness +// provides consistent port mapping. +func TestThreeNodeDiscoveryMajorityOwnership(t *testing.T) { + t.Skip("Pending enhancement: AddRemote needs host:port support to fully exercise discovery-based multi-node linking") + // Example skeleton (non-functional with current AddRemote implementation): + // + // md := NewMockDiscovery([]string{"nodeB3", "nodeC3"}) + // poolA := NewGrainLocalPool(1024, time.Minute, spawn) + // syncedA, err := NewSyncedPool(poolA, "nodeA3", md) + // if err != nil { + // t.Fatalf("NewSyncedPool with mock discovery error: %v", err) + // } + // // Start server for nodeA (would also need servers for nodeB3/nodeC3 on expected ports) + // // grpcSrvA, _ := StartGRPCServer(":1337", poolA, syncedA) + // // defer grpcSrvA.GracefulStop() + // + // // Dynamically add a host via discovery + // // md.AddHost("nodeB3") + // // time.Sleep(100 * time.Millisecond) // allow AddRemote attempt + // + // // Assertions would verify syncedA.remoteHosts contains "nodeB3" +} + +// TestHostRemovalAndErrorWithMockDiscovery validates behavior when: +// 1. Discovery reports a host that cannot be dialed (AddRemote error path) +// 2. That host is then removed (Deleted event) without leaving residual state +// 3. A second failing host is added afterward (ensuring watcher still processes events) +// +// NOTE: Because AddRemote currently hard-codes :1337 and we are NOT starting a +// real server for the bogus hosts, the dial will fail and the remote host should +// never appear in remoteHosts. This intentionally exercises the error logging +// path: "AddRemote: dial ... failed". +func TestHostRemovalAndErrorWithMockDiscovery(t *testing.T) { + // Start a real node A (acts as the observing node) + const addrA = "127.0.0.1:18281" + hostA := "nodeA-md" + + poolA := NewGrainLocalPool(128, time.Minute, spawn) + + // Mock discovery starts with one bogus host that will fail to connect. + md := NewMockDiscovery([]string{"bogus-host-1"}) + syncedA, err := NewSyncedPool(poolA, hostA, md) + if err != nil { + t.Fatalf("NewSyncedPool error: %v", err) + } + + grpcSrvA, err := StartGRPCServer(addrA, poolA, syncedA) + if err != nil { + t.Fatalf("StartGRPCServer A error: %v", err) + } + defer grpcSrvA.GracefulStop() + + // Kick off watch processing by starting Watch() (NewSyncedPool does this internally + // when discovery is non-nil, but we ensure events channel is active). + // The initial bogus host should trigger AddRemote -> dial failure. + time.Sleep(300 * time.Millisecond) + + syncedA.mu.RLock() + if len(syncedA.remoteHosts) != 0 { + syncedA.mu.RUnlock() + t.Fatalf("expected 0 remoteHosts after failing dial, got %d", len(syncedA.remoteHosts)) + } + syncedA.mu.RUnlock() + + // Remove the bogus host (should not panic; no entry to clean up). + md.RemoveHost("bogus-host-1") + time.Sleep(100 * time.Millisecond) + + // Add another bogus host to ensure watcher still alive. + md.AddHost("bogus-host-2") + time.Sleep(300 * time.Millisecond) + + syncedA.mu.RLock() + if len(syncedA.remoteHosts) != 0 { + syncedA.mu.RUnlock() + t.Fatalf("expected 0 remoteHosts after second failing dial, got %d", len(syncedA.remoteHosts)) + } + syncedA.mu.RUnlock() + + // Clean up discovery + md.Close() +} diff --git a/synced-pool.go b/synced-pool.go index 50535b1..62fe49f 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -368,7 +368,9 @@ func (p *SyncedPool) RequestOwnership(id CartId) error { ok := 0 all := 0 remotes := p.GetHealthyRemotes() + log.Printf("RequestOwnership start id=%s host=%s healthyRemotes=%d", id, p.Hostname, len(remotes)) for _, r := range remotes { + log.Printf("RequestOwnership sending ConfirmOwner to host=%s id=%s", r.Host, id) ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond) reply, err := r.ControlClient.ConfirmOwner(ctx, &proto.OwnerChangeRequest{ CartId: id.String(), @@ -377,20 +379,32 @@ func (p *SyncedPool) RequestOwnership(id CartId) error { cancel() all++ if err != nil || reply == nil || !reply.Accepted { - log.Printf("ConfirmOwner failure from %s for %s: %v (reply=%v)", r.Host, id, err, reply) + log.Printf("RequestOwnership negative/failed response from host=%s id=%s err=%v reply=%v", r.Host, id, err, reply) continue } ok++ + log.Printf("RequestOwnership accept from host=%s id=%s (ok=%d all=%d)", r.Host, id, ok, all) } - // Quorum rule mirrors legacy: - // - If fewer than 3 total, require all. - // - Else require majority (ok >= all/2). - if (all < 3 && ok < all) || ok < (all/2) { + // Quorum rule (majority semantics): + // - Let N = all remotes + 1 (self) + // - We require ok + 1 (implicit self vote) >= floor(N/2)+1 + // => ok >= floor(N/2) + // - Examples: + // N=2 (all=1): threshold=1 (need 1 remote) + // N=3 (all=2): threshold=1 (need 1 remote; previously required 2) + // N=4 (all=3): threshold=2 + // N=5 (all=4): threshold=2 + // - This change allows faster ownership under partial remote availability in small clusters. + log.Printf("RequestOwnership quorum evaluation id=%s host=%s ok=%d all=%d", id, p.Hostname, ok, all) + threshold := (all + 1) / 2 // floor(N/2) + if ok < threshold { p.removeLocalGrain(id) - return fmt.Errorf("quorum not reached (ok=%d all=%d)", ok, all) + log.Printf("RequestOwnership failed quorum id=%s host=%s ok=%d all=%d threshold=%d", id, p.Hostname, ok, all, threshold) + return fmt.Errorf("quorum not reached (ok=%d all=%d threshold=%d)", ok, all, threshold) } grainSyncCount.Inc() + log.Printf("RequestOwnership success id=%s host=%s ok=%d all=%d threshold=%d", id, p.Hostname, ok, all, threshold) return nil } @@ -400,7 +414,9 @@ func (p *SyncedPool) removeLocalGrain(id CartId) { p.mu.Unlock() } -// getGrain returns a local or remote grain; if absent, attempts ownership. +// getGrain returns a local or remote grain. If absent, it synchronously attempts +// to acquire ownership before spawning a local grain to eliminate the race where +// the first mutation applies before peers have installed remote proxies. func (p *SyncedPool) getGrain(id CartId) (Grain, error) { p.mu.RLock() localGrain, isLocal := p.local.grains[id] @@ -415,10 +431,20 @@ func (p *SyncedPool) getGrain(id CartId) (Grain, error) { return remoteGrain, nil } - // Attempt to claim ownership (async semantics preserved) - go p.RequestOwnership(id) + // Synchronously attempt to claim ownership. If this fails (quorum not reached) + // we re-check for a newly appeared remote proxy (another node became owner). + if err := p.RequestOwnership(id); err != nil { + p.mu.RLock() + if rg, ok := p.remoteIndex[id]; ok { + p.mu.RUnlock() + remoteLookupCount.Inc() + return rg, nil + } + p.mu.RUnlock() + return nil, err + } - // Create local grain (lazy spawn) - may be rolled back by quorum failure + // Ownership acquired; now lazily spawn the local grain. grain, err := p.local.GetGrain(id) if err != nil { return nil, err