package main import ( "context" "fmt" "testing" "time" messages "git.tornberg.me/go-cart-actor/proto" "google.golang.org/grpc" ) // TestThreeNodeMajorityOwnership validates ring-determined ownership and routing // in a 3-node cluster (A,B,C) using the consistent hashing ring (no quorum RPC). // The previous ConfirmOwner / quorum semantics have been removed; ownership is // deterministic and derived from the ring. // // It validates: // 1. The ring selects exactly one primary owner for a new cart. // 2. Other nodes (B,C) do NOT create local grains for the cart. // 3. Remote proxies are installed lazily so remote mutations can route. // 4. A remote mutation from one non-owner updates state visible on another. // 5. Authoritative state on the owner matches remote observations. // 6. (Future) This scaffolds replication tests when RF>1 is enabled. // // (Legacy comments about ConfirmOwner acceptance thresholds have been removed.) // (Function name retained for historical continuity.) func TestThreeNodeMajorityOwnership(t *testing.T) { const ( addrA = "127.0.0.1:18181" addrB = "127.0.0.1:18182" addrC = "127.0.0.1:18183" hostA = "nodeA3" hostB = "nodeB3" hostC = "nodeC3" ) // Local grain pools poolA := NewGrainLocalPool(1024, time.Minute, spawn) poolB := NewGrainLocalPool(1024, time.Minute, spawn) poolC := NewGrainLocalPool(1024, time.Minute, spawn) // Synced pools (no discovery) syncedA, err := NewSyncedPool(poolA, hostA, nil) if err != nil { t.Fatalf("nodeA NewSyncedPool error: %v", err) } syncedB, err := NewSyncedPool(poolB, hostB, nil) if err != nil { t.Fatalf("nodeB NewSyncedPool error: %v", err) } syncedC, err := NewSyncedPool(poolC, hostC, nil) if err != nil { t.Fatalf("nodeC NewSyncedPool error: %v", err) } // Start gRPC servers grpcSrvA, err := StartGRPCServer(addrA, poolA, syncedA) if err != nil { t.Fatalf("StartGRPCServer A error: %v", err) } defer grpcSrvA.GracefulStop() grpcSrvB, err := StartGRPCServer(addrB, poolB, syncedB) if err != nil { t.Fatalf("StartGRPCServer B error: %v", err) } defer grpcSrvB.GracefulStop() grpcSrvC, err := StartGRPCServer(addrC, poolC, syncedC) if err != nil { t.Fatalf("StartGRPCServer C error: %v", err) } defer grpcSrvC.GracefulStop() // Helper for manual cross-link (since AddRemote assumes fixed port) link := func(src *SyncedPool, remoteHost, remoteAddr string) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() conn, dialErr := grpc.DialContext(ctx, remoteAddr, grpc.WithInsecure(), grpc.WithBlock()) if dialErr != nil { t.Fatalf("dial %s (%s) failed: %v", remoteHost, remoteAddr, dialErr) } cartClient := messages.NewCartActorClient(conn) controlClient := messages.NewControlPlaneClient(conn) src.mu.Lock() src.remoteHosts[remoteHost] = &RemoteHostGRPC{ Host: remoteHost, Conn: conn, CartClient: cartClient, ControlClient: controlClient, } src.mu.Unlock() } // Full mesh (each node knows all others) link(syncedA, hostB, addrB) link(syncedA, hostC, addrC) link(syncedB, hostA, addrA) link(syncedB, hostC, addrC) link(syncedC, hostA, addrA) link(syncedC, hostB, addrB) // Rebuild rings after manual linking so ownership resolution is immediate. syncedA.ForceRingRefresh() syncedB.ForceRingRefresh() syncedC.ForceRingRefresh() // Allow brief stabilization time.Sleep(200 * time.Millisecond) // Deterministic-ish cart id cartID := ToCartId(fmt.Sprintf("cart3-%d", time.Now().UnixNano())) addItem := &messages.AddItem{ ItemId: 10, Quantity: 1, Price: 5000, OrgPrice: 5000, Sku: "sku-3node", Name: "Three Node Test", Image: "/t.png", Stock: 10, Tax: 2500, Country: "se", } // Determine ring-designated owner (may be any of the three hosts) ownerPre := syncedA.DebugOwnerHost(cartID) if ownerPre != hostA && ownerPre != hostB && ownerPre != hostC { t.Fatalf("ring returned unexpected owner %s (not in set {%s,%s,%s})", ownerPre, hostA, hostB, hostC) } var ownerSynced *SyncedPool var ownerPool *GrainLocalPool switch ownerPre { case hostA: ownerSynced, ownerPool = syncedA, poolA case hostB: ownerSynced, ownerPool = syncedB, poolB case hostC: ownerSynced, ownerPool = syncedC, poolC } // Pick two distinct non-owner nodes for remote mutation assertions var remote1Synced, remote2Synced *SyncedPool switch ownerPre { case hostA: remote1Synced, remote2Synced = syncedB, syncedC case hostB: remote1Synced, remote2Synced = syncedA, syncedC case hostC: remote1Synced, remote2Synced = syncedA, syncedB } // Apply on the ring-designated owner if _, err := ownerSynced.Apply(cartID, addItem); err != nil { t.Fatalf("owner %s Apply addItem error: %v", ownerPre, err) } // Small wait for remote proxy spawn (ring ownership already deterministic) time.Sleep(150 * time.Millisecond) // Assert only nodeA has local grain localCount := 0 if _, ok := poolA.GetGrains()[cartID]; ok { localCount++ } if _, ok := poolB.GetGrains()[cartID]; ok { localCount++ } if _, ok := poolC.GetGrains()[cartID]; ok { localCount++ } if localCount != 1 { t.Fatalf("expected exactly 1 local grain, got %d", localCount) } if _, ok := ownerPool.GetGrains()[cartID]; !ok { t.Fatalf("expected owner %s to hold local grain", ownerPre) } // Remote proxies may not pre-exist; first remote mutation will trigger SpawnRemoteGrain lazily. // Issue remote mutation from one non-owner -> ChangeQuantity (increase) change := &messages.ChangeQuantity{ Id: 1, Quantity: 3, } if _, err := remote1Synced.Apply(cartID, change); err != nil { t.Fatalf("remote mutation (remote1) changeQuantity error: %v", err) } // Validate updated state visible via nodeC stateC, err := remote2Synced.Get(cartID) if err != nil { t.Fatalf("nodeC Get error: %v", err) } if len(stateC.Items) != 1 || stateC.Items[0].Quantity != 3 { t.Fatalf("nodeC observed state mismatch: items=%d qty=%d (expected 1 / 3)", len(stateC.Items), func() int { if len(stateC.Items) == 0 { return -1 } return stateC.Items[0].Quantity }(), ) } // Cross-check authoritative nodeA stateA, err := syncedA.Get(cartID) if err != nil { t.Fatalf("nodeA Get error: %v", err) } if stateA.Items[0].Quantity != 3 { t.Fatalf("nodeA authoritative state mismatch: expected qty=3 got %d", stateA.Items[0].Quantity) } } // TestThreeNodeDiscoveryMajorityOwnership (placeholder) // This test is a scaffold demonstrating how a MockDiscovery would be wired // once AddRemote supports host:port (currently hard-coded to :1337). // It is skipped to avoid flakiness / false negatives until the production // AddRemote logic is enhanced to parse dynamic ports or the test harness // provides consistent port mapping. func TestThreeNodeDiscoveryMajorityOwnership(t *testing.T) { t.Skip("Pending enhancement: AddRemote needs host:port support to fully exercise discovery-based multi-node linking") // Example skeleton (non-functional with current AddRemote implementation): // // md := NewMockDiscovery([]string{"nodeB3", "nodeC3"}) // poolA := NewGrainLocalPool(1024, time.Minute, spawn) // syncedA, err := NewSyncedPool(poolA, "nodeA3", md) // if err != nil { // t.Fatalf("NewSyncedPool with mock discovery error: %v", err) // } // // Start server for nodeA (would also need servers for nodeB3/nodeC3 on expected ports) // // grpcSrvA, _ := StartGRPCServer(":1337", poolA, syncedA) // // defer grpcSrvA.GracefulStop() // // // Dynamically add a host via discovery // // md.AddHost("nodeB3") // // time.Sleep(100 * time.Millisecond) // allow AddRemote attempt // // // Assertions would verify syncedA.remoteHosts contains "nodeB3" } // TestHostRemovalAndErrorWithMockDiscovery validates behavior when: // 1. Discovery reports a host that cannot be dialed (AddRemote error path) // 2. That host is then removed (Deleted event) without leaving residual state // 3. A second failing host is added afterward (ensuring watcher still processes events) // // NOTE: Because AddRemote currently hard-codes :1337 and we are NOT starting a // real server for the bogus hosts, the dial will fail and the remote host should // never appear in remoteHosts. This intentionally exercises the error logging // path: "AddRemote: dial ... failed". func TestHostRemovalAndErrorWithMockDiscovery(t *testing.T) { // Start a real node A (acts as the observing node) const addrA = "127.0.0.1:18281" hostA := "nodeA-md" poolA := NewGrainLocalPool(128, time.Minute, spawn) // Mock discovery starts with one bogus host that will fail to connect. md := NewMockDiscovery([]string{"bogus-host-1"}) syncedA, err := NewSyncedPool(poolA, hostA, md) if err != nil { t.Fatalf("NewSyncedPool error: %v", err) } grpcSrvA, err := StartGRPCServer(addrA, poolA, syncedA) if err != nil { t.Fatalf("StartGRPCServer A error: %v", err) } defer grpcSrvA.GracefulStop() // Kick off watch processing by starting Watch() (NewSyncedPool does this internally // when discovery is non-nil, but we ensure events channel is active). // The initial bogus host should trigger AddRemote -> dial failure. time.Sleep(300 * time.Millisecond) syncedA.mu.RLock() if len(syncedA.remoteHosts) != 0 { syncedA.mu.RUnlock() t.Fatalf("expected 0 remoteHosts after failing dial, got %d", len(syncedA.remoteHosts)) } syncedA.mu.RUnlock() // Remove the bogus host (should not panic; no entry to clean up). md.RemoveHost("bogus-host-1") time.Sleep(100 * time.Millisecond) // Add another bogus host to ensure watcher still alive. md.AddHost("bogus-host-2") time.Sleep(300 * time.Millisecond) syncedA.mu.RLock() if len(syncedA.remoteHosts) != 0 { syncedA.mu.RUnlock() t.Fatalf("expected 0 remoteHosts after second failing dial, got %d", len(syncedA.remoteHosts)) } syncedA.mu.RUnlock() // Clean up discovery md.Close() }