Files
go-cart-actor/multi_node_three_test.go
matst80 159253b8b0
Some checks failed
Build and Publish / BuildAndDeploy (push) Successful in 3m6s
Build and Publish / BuildAndDeployAmd64 (push) Has been cancelled
more refactoring
2025-10-10 13:22:36 +00:00

305 lines
9.7 KiB
Go

package main
import (
"context"
"fmt"
"testing"
"time"
messages "git.tornberg.me/go-cart-actor/proto"
"google.golang.org/grpc"
)
// TestThreeNodeMajorityOwnership validates ring-determined ownership and routing
// in a 3-node cluster (A,B,C) using the consistent hashing ring (no quorum RPC).
// The previous ConfirmOwner / quorum semantics have been removed; ownership is
// deterministic and derived from the ring.
//
// It validates:
// 1. The ring selects exactly one primary owner for a new cart.
// 2. Other nodes (B,C) do NOT create local grains for the cart.
// 3. Remote proxies are installed lazily so remote mutations can route.
// 4. A remote mutation from one non-owner updates state visible on another.
// 5. Authoritative state on the owner matches remote observations.
// 6. (Future) This scaffolds replication tests when RF>1 is enabled.
//
// (Legacy comments about ConfirmOwner acceptance thresholds have been removed.)
// (Function name retained for historical continuity.)
func TestThreeNodeMajorityOwnership(t *testing.T) {
const (
addrA = "127.0.0.1:18181"
addrB = "127.0.0.1:18182"
addrC = "127.0.0.1:18183"
hostA = "nodeA3"
hostB = "nodeB3"
hostC = "nodeC3"
)
// Local grain pools
poolA := NewGrainLocalPool(1024, time.Minute, spawn)
poolB := NewGrainLocalPool(1024, time.Minute, spawn)
poolC := NewGrainLocalPool(1024, time.Minute, spawn)
// Synced pools (no discovery)
syncedA, err := NewSyncedPool(poolA, hostA, nil)
if err != nil {
t.Fatalf("nodeA NewSyncedPool error: %v", err)
}
syncedB, err := NewSyncedPool(poolB, hostB, nil)
if err != nil {
t.Fatalf("nodeB NewSyncedPool error: %v", err)
}
syncedC, err := NewSyncedPool(poolC, hostC, nil)
if err != nil {
t.Fatalf("nodeC NewSyncedPool error: %v", err)
}
// Start gRPC servers
grpcSrvA, err := StartGRPCServer(addrA, poolA, syncedA)
if err != nil {
t.Fatalf("StartGRPCServer A error: %v", err)
}
defer grpcSrvA.GracefulStop()
grpcSrvB, err := StartGRPCServer(addrB, poolB, syncedB)
if err != nil {
t.Fatalf("StartGRPCServer B error: %v", err)
}
defer grpcSrvB.GracefulStop()
grpcSrvC, err := StartGRPCServer(addrC, poolC, syncedC)
if err != nil {
t.Fatalf("StartGRPCServer C error: %v", err)
}
defer grpcSrvC.GracefulStop()
// Helper for manual cross-link (since AddRemote assumes fixed port)
link := func(src *SyncedPool, remoteHost, remoteAddr string) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
conn, dialErr := grpc.DialContext(ctx, remoteAddr, grpc.WithInsecure(), grpc.WithBlock())
if dialErr != nil {
t.Fatalf("dial %s (%s) failed: %v", remoteHost, remoteAddr, dialErr)
}
cartClient := messages.NewCartActorClient(conn)
controlClient := messages.NewControlPlaneClient(conn)
src.mu.Lock()
src.remoteHosts[remoteHost] = &RemoteHostGRPC{
Host: remoteHost,
Conn: conn,
CartClient: cartClient,
ControlClient: controlClient,
}
src.mu.Unlock()
}
// Full mesh (each node knows all others)
link(syncedA, hostB, addrB)
link(syncedA, hostC, addrC)
link(syncedB, hostA, addrA)
link(syncedB, hostC, addrC)
link(syncedC, hostA, addrA)
link(syncedC, hostB, addrB)
// Rebuild rings after manual linking so ownership resolution is immediate.
syncedA.ForceRingRefresh()
syncedB.ForceRingRefresh()
syncedC.ForceRingRefresh()
// Allow brief stabilization
time.Sleep(200 * time.Millisecond)
// Deterministic-ish cart id
cartID := ToCartId(fmt.Sprintf("cart3-%d", time.Now().UnixNano()))
addItem := &messages.AddItem{
ItemId: 10,
Quantity: 1,
Price: 5000,
OrgPrice: 5000,
Sku: "sku-3node",
Name: "Three Node Test",
Image: "/t.png",
Stock: 10,
Tax: 2500,
Country: "se",
}
// Determine ring-designated owner (may be any of the three hosts)
ownerPre := syncedA.DebugOwnerHost(cartID)
if ownerPre != hostA && ownerPre != hostB && ownerPre != hostC {
t.Fatalf("ring returned unexpected owner %s (not in set {%s,%s,%s})", ownerPre, hostA, hostB, hostC)
}
var ownerSynced *SyncedPool
var ownerPool *GrainLocalPool
switch ownerPre {
case hostA:
ownerSynced, ownerPool = syncedA, poolA
case hostB:
ownerSynced, ownerPool = syncedB, poolB
case hostC:
ownerSynced, ownerPool = syncedC, poolC
}
// Pick two distinct non-owner nodes for remote mutation assertions
var remote1Synced, remote2Synced *SyncedPool
switch ownerPre {
case hostA:
remote1Synced, remote2Synced = syncedB, syncedC
case hostB:
remote1Synced, remote2Synced = syncedA, syncedC
case hostC:
remote1Synced, remote2Synced = syncedA, syncedB
}
// Apply on the ring-designated owner
if _, err := ownerSynced.Apply(cartID, addItem); err != nil {
t.Fatalf("owner %s Apply addItem error: %v", ownerPre, err)
}
// Small wait for remote proxy spawn (ring ownership already deterministic)
time.Sleep(150 * time.Millisecond)
// Assert only nodeA has local grain
localCount := 0
if _, ok := poolA.GetGrains()[cartID]; ok {
localCount++
}
if _, ok := poolB.GetGrains()[cartID]; ok {
localCount++
}
if _, ok := poolC.GetGrains()[cartID]; ok {
localCount++
}
if localCount != 1 {
t.Fatalf("expected exactly 1 local grain, got %d", localCount)
}
if _, ok := ownerPool.GetGrains()[cartID]; !ok {
t.Fatalf("expected owner %s to hold local grain", ownerPre)
}
// Remote proxies may not pre-exist; first remote mutation will trigger SpawnRemoteGrain lazily.
// Issue remote mutation from one non-owner -> ChangeQuantity (increase)
change := &messages.ChangeQuantity{
Id: 1,
Quantity: 3,
}
if _, err := remote1Synced.Apply(cartID, change); err != nil {
t.Fatalf("remote mutation (remote1) changeQuantity error: %v", err)
}
// Validate updated state visible via nodeC
stateC, err := remote2Synced.Get(cartID)
if err != nil {
t.Fatalf("nodeC Get error: %v", err)
}
if len(stateC.Items) != 1 || stateC.Items[0].Quantity != 3 {
t.Fatalf("nodeC observed state mismatch: items=%d qty=%d (expected 1 / 3)",
len(stateC.Items),
func() int {
if len(stateC.Items) == 0 {
return -1
}
return stateC.Items[0].Quantity
}(),
)
}
// Cross-check authoritative nodeA
stateA, err := syncedA.Get(cartID)
if err != nil {
t.Fatalf("nodeA Get error: %v", err)
}
if stateA.Items[0].Quantity != 3 {
t.Fatalf("nodeA authoritative state mismatch: expected qty=3 got %d", stateA.Items[0].Quantity)
}
}
// TestThreeNodeDiscoveryMajorityOwnership (placeholder)
// This test is a scaffold demonstrating how a MockDiscovery would be wired
// once AddRemote supports host:port (currently hard-coded to :1337).
// It is skipped to avoid flakiness / false negatives until the production
// AddRemote logic is enhanced to parse dynamic ports or the test harness
// provides consistent port mapping.
func TestThreeNodeDiscoveryMajorityOwnership(t *testing.T) {
t.Skip("Pending enhancement: AddRemote needs host:port support to fully exercise discovery-based multi-node linking")
// Example skeleton (non-functional with current AddRemote implementation):
//
// md := NewMockDiscovery([]string{"nodeB3", "nodeC3"})
// poolA := NewGrainLocalPool(1024, time.Minute, spawn)
// syncedA, err := NewSyncedPool(poolA, "nodeA3", md)
// if err != nil {
// t.Fatalf("NewSyncedPool with mock discovery error: %v", err)
// }
// // Start server for nodeA (would also need servers for nodeB3/nodeC3 on expected ports)
// // grpcSrvA, _ := StartGRPCServer(":1337", poolA, syncedA)
// // defer grpcSrvA.GracefulStop()
//
// // Dynamically add a host via discovery
// // md.AddHost("nodeB3")
// // time.Sleep(100 * time.Millisecond) // allow AddRemote attempt
//
// // Assertions would verify syncedA.remoteHosts contains "nodeB3"
}
// TestHostRemovalAndErrorWithMockDiscovery validates behavior when:
// 1. Discovery reports a host that cannot be dialed (AddRemote error path)
// 2. That host is then removed (Deleted event) without leaving residual state
// 3. A second failing host is added afterward (ensuring watcher still processes events)
//
// NOTE: Because AddRemote currently hard-codes :1337 and we are NOT starting a
// real server for the bogus hosts, the dial will fail and the remote host should
// never appear in remoteHosts. This intentionally exercises the error logging
// path: "AddRemote: dial ... failed".
func TestHostRemovalAndErrorWithMockDiscovery(t *testing.T) {
// Start a real node A (acts as the observing node)
const addrA = "127.0.0.1:18281"
hostA := "nodeA-md"
poolA := NewGrainLocalPool(128, time.Minute, spawn)
// Mock discovery starts with one bogus host that will fail to connect.
md := NewMockDiscovery([]string{"bogus-host-1"})
syncedA, err := NewSyncedPool(poolA, hostA, md)
if err != nil {
t.Fatalf("NewSyncedPool error: %v", err)
}
grpcSrvA, err := StartGRPCServer(addrA, poolA, syncedA)
if err != nil {
t.Fatalf("StartGRPCServer A error: %v", err)
}
defer grpcSrvA.GracefulStop()
// Kick off watch processing by starting Watch() (NewSyncedPool does this internally
// when discovery is non-nil, but we ensure events channel is active).
// The initial bogus host should trigger AddRemote -> dial failure.
time.Sleep(300 * time.Millisecond)
syncedA.mu.RLock()
if len(syncedA.remoteHosts) != 0 {
syncedA.mu.RUnlock()
t.Fatalf("expected 0 remoteHosts after failing dial, got %d", len(syncedA.remoteHosts))
}
syncedA.mu.RUnlock()
// Remove the bogus host (should not panic; no entry to clean up).
md.RemoveHost("bogus-host-1")
time.Sleep(100 * time.Millisecond)
// Add another bogus host to ensure watcher still alive.
md.AddHost("bogus-host-2")
time.Sleep(300 * time.Millisecond)
syncedA.mu.RLock()
if len(syncedA.remoteHosts) != 0 {
syncedA.mu.RUnlock()
t.Fatalf("expected 0 remoteHosts after second failing dial, got %d", len(syncedA.remoteHosts))
}
syncedA.mu.RUnlock()
// Clean up discovery
md.Close()
}