package main import ( "context" "fmt" "testing" "time" messages "git.tornberg.me/go-cart-actor/proto" "google.golang.org/grpc" ) // TestMultiNodeOwnershipNegotiation spins up two gRPC servers (nodeA, nodeB), // manually links their SyncedPools (bypassing AddRemote's fixed port assumption), // and verifies that only one node becomes the owner of a new cart while the // other can still apply a mutation via the remote proxy path. // // NOTE: // - We manually inject RemoteHostGRPC entries because AddRemote() hard-codes // port 1337; to run two distinct servers concurrently we need distinct ports. // - This test asserts single ownership consistency rather than the complete // quorum semantics (which depend on real discovery + AddRemote). func TestMultiNodeOwnershipNegotiation(t *testing.T) { // Allocate distinct ports for the two nodes. const ( addrA = "127.0.0.1:18081" addrB = "127.0.0.1:18082" hostA = "nodeA" hostB = "nodeB" ) // Create local grain pools. poolA := NewGrainLocalPool(1024, time.Minute, spawn) poolB := NewGrainLocalPool(1024, time.Minute, spawn) // Create synced pools (no discovery). syncedA, err := NewSyncedPool(poolA, hostA, nil) if err != nil { t.Fatalf("nodeA NewSyncedPool error: %v", err) } syncedB, err := NewSyncedPool(poolB, hostB, nil) if err != nil { t.Fatalf("nodeB NewSyncedPool error: %v", err) } // Start gRPC servers (CartActor + ControlPlane) on different ports. grpcSrvA, err := StartGRPCServer(addrA, poolA, syncedA) if err != nil { t.Fatalf("StartGRPCServer A error: %v", err) } defer grpcSrvA.GracefulStop() grpcSrvB, err := StartGRPCServer(addrB, poolB, syncedB) if err != nil { t.Fatalf("StartGRPCServer B error: %v", err) } defer grpcSrvB.GracefulStop() // Helper to connect one pool to the other's server (manual AddRemote equivalent). link := func(src *SyncedPool, remoteHost, remoteAddr string) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() conn, dialErr := grpc.DialContext(ctx, remoteAddr, grpc.WithInsecure(), grpc.WithBlock()) if dialErr != nil { t.Fatalf("dial %s (%s) failed: %v", remoteHost, remoteAddr, dialErr) } cartClient := messages.NewCartActorClient(conn) controlClient := messages.NewControlPlaneClient(conn) src.mu.Lock() src.remoteHosts[remoteHost] = &RemoteHostGRPC{ Host: remoteHost, Conn: conn, CartClient: cartClient, ControlClient: controlClient, } src.mu.Unlock() } // Cross-link the two pools. link(syncedA, hostB, addrB) link(syncedB, hostA, addrA) // Rebuild rings after manual cross-link so deterministic ownership works immediately. syncedA.ForceRingRefresh() syncedB.ForceRingRefresh() // Allow brief stabilization (control plane pings / no real negotiation needed here). time.Sleep(200 * time.Millisecond) // Create a deterministic cart id for test readability. cartID := ToCartId(fmt.Sprintf("cart-%d", time.Now().UnixNano())) // Mutation payload (ring-determined ownership; no assumption about which node owns). addItem := &messages.AddItem{ ItemId: 1, Quantity: 1, Price: 1500, OrgPrice: 1500, Sku: "sku-test-multi", Name: "Multi Node Test", Image: "/test.png", Stock: 2, Tax: 2500, Country: "se", } // Determine ring owner and set primary / secondary references. ownerHost := syncedA.DebugOwnerHost(cartID) var ownerSynced, otherSynced *SyncedPool var ownerPool, otherPool *GrainLocalPool switch ownerHost { case hostA: ownerSynced, ownerPool = syncedA, poolA otherSynced, otherPool = syncedB, poolB case hostB: ownerSynced, ownerPool = syncedB, poolB otherSynced, otherPool = syncedA, poolA default: t.Fatalf("unexpected ring owner %s (expected %s or %s)", ownerHost, hostA, hostB) } // Apply mutation on the ring-designated owner. if _, err := ownerSynced.Apply(cartID, addItem); err != nil { t.Fatalf("owner %s Apply addItem error: %v", ownerHost, err) } // Validate owner pool has the grain and the other does not. if _, ok := ownerPool.GetGrains()[cartID]; !ok { t.Fatalf("expected owner %s to have local grain", ownerHost) } if _, ok := otherPool.GetGrains()[cartID]; ok { t.Fatalf("non-owner unexpectedly holds local grain") } // Prepare change mutation to be applied from the non-owner (should route remotely). change := &messages.ChangeQuantity{ Id: 1, // line id after first AddItem Quantity: 2, } // Apply remotely via the non-owner. if _, err := otherSynced.Apply(cartID, change); err != nil { t.Fatalf("non-owner remote Apply changeQuantity error: %v", err) } // Remote re-mutation already performed via otherSynced; removed duplicate block. // NodeB local grain assertion: // Only assert absence if nodeB is NOT the ring-designated owner. If nodeB is the owner, // it is expected to have a local grain (previous generic ownership assertions already ran). if ownerHost != hostB { if _, local := poolB.GetGrains()[cartID]; local { t.Fatalf("nodeB unexpectedly created local grain (ownership duplication)") } } // Fetch state from nodeB to ensure we see updated quantity (2). grainStateB, err := syncedB.Get(cartID) if err != nil { t.Fatalf("nodeB Get error: %v", err) } if len(grainStateB.Items) != 1 || grainStateB.Items[0].Quantity != 2 { t.Fatalf("nodeB observed inconsistent state: items=%d qty=%d (expected 1 / 2)", len(grainStateB.Items), func() int { if len(grainStateB.Items) == 0 { return -1 } return grainStateB.Items[0].Quantity }(), ) } // Cross-check from nodeA (authoritative) to ensure state matches. grainStateA, err := syncedA.Get(cartID) if err != nil { t.Fatalf("nodeA Get error: %v", err) } if grainStateA.Items[0].Quantity != 2 { t.Fatalf("nodeA authoritative state mismatch: expected qty=2 got %d", grainStateA.Items[0].Quantity) } }