Complete refactor to new grpc control plane and only http proxy for carts #4

Merged
mats merged 75 commits from refactor/http-proxy into main 2025-10-14 22:31:28 +02:00
10 changed files with 4 additions and 3187 deletions
Showing only changes of commit fb111ebf97 - Show all commits

View File

@@ -19,7 +19,7 @@
MODULE_PATH := git.tornberg.me/go-cart-actor
PROTO_DIR := proto
PROTOS := $(PROTO_DIR)/messages.proto $(PROTO_DIR)/cart_actor.proto $(PROTO_DIR)/control_plane.proto
PROTOS := $(PROTO_DIR)/messages.proto $(PROTO_DIR)/control_plane.proto
# Allow override: make PROTOC=/path/to/protoc
PROTOC ?= protoc

View File

@@ -1,212 +0,0 @@
package main
import (
messages "git.tornberg.me/go-cart-actor/proto"
)
// cart_state_mapper.go
//
// Utilities to translate between internal CartGrain state and the gRPC
// (typed) protobuf representation CartState. This replaces the previous
// JSON blob framing and enables type-safe replies over gRPC, as well as
// internal reuse for HTTP handlers without an extra marshal / unmarshal
// hop (you can marshal CartState directly for JSON responses if desired).
//
// Only the oneway mapping (CartGrain -> CartState) is strictly required
// for mutation / state replies. A reverse helper is included in case
// future features (e.g. snapshot import, replay, or migration) need it.
// ToCartState converts the inmemory CartGrain into a protobuf CartState.
func ToCartState(c *CartGrain) *messages.CartState {
if c == nil {
return nil
}
items := make([]*messages.CartItemState, 0, len(c.Items))
for _, it := range c.Items {
if it == nil {
continue
}
itemDiscountPerUnit := max(0, it.OrgPrice-it.Price)
itemTotalDiscount := itemDiscountPerUnit * int64(it.Quantity)
items = append(items, &messages.CartItemState{
Id: int64(it.Id),
ItemId: int64(it.ItemId),
Sku: it.Sku,
Name: it.Name,
Price: it.Price,
Qty: int32(it.Quantity),
TotalPrice: it.TotalPrice,
TotalTax: it.TotalTax,
OrgPrice: it.OrgPrice,
TaxRate: int32(it.TaxRate),
TotalDiscount: itemTotalDiscount,
Brand: it.Brand,
Category: it.Category,
Category2: it.Category2,
Category3: it.Category3,
Category4: it.Category4,
Category5: it.Category5,
Image: it.Image,
Type: it.ArticleType,
SellerId: it.SellerId,
SellerName: it.SellerName,
Disclaimer: it.Disclaimer,
Outlet: deref(it.Outlet),
StoreId: deref(it.StoreId),
Stock: int32(it.Stock),
})
}
deliveries := make([]*messages.DeliveryState, 0, len(c.Deliveries))
for _, d := range c.Deliveries {
if d == nil {
continue
}
itemIds := make([]int64, 0, len(d.Items))
for _, id := range d.Items {
itemIds = append(itemIds, int64(id))
}
var pp *messages.PickupPoint
if d.PickupPoint != nil {
// Copy to avoid accidental shared mutation (proto points are fine but explicit).
pp = &messages.PickupPoint{
Id: d.PickupPoint.Id,
Name: d.PickupPoint.Name,
Address: d.PickupPoint.Address,
City: d.PickupPoint.City,
Zip: d.PickupPoint.Zip,
Country: d.PickupPoint.Country,
}
}
deliveries = append(deliveries, &messages.DeliveryState{
Id: int64(d.Id),
Provider: d.Provider,
Price: d.Price,
Items: itemIds,
PickupPoint: pp,
})
}
return &messages.CartState{
Id: c.Id.String(),
Items: items,
TotalPrice: c.TotalPrice,
TotalTax: c.TotalTax,
TotalDiscount: c.TotalDiscount,
Deliveries: deliveries,
PaymentInProgress: c.PaymentInProgress,
OrderReference: c.OrderReference,
PaymentStatus: c.PaymentStatus,
}
}
// FromCartState merges a protobuf CartState into an existing CartGrain.
// This is optional and primarily useful for snapshot import or testing.
func FromCartState(cs *messages.CartState, g *CartGrain) *CartGrain {
if cs == nil {
return g
}
if g == nil {
g = &CartGrain{}
}
g.Id = ToCartId(cs.Id)
g.TotalPrice = cs.TotalPrice
g.TotalTax = cs.TotalTax
g.TotalDiscount = cs.TotalDiscount
g.PaymentInProgress = cs.PaymentInProgress
g.OrderReference = cs.OrderReference
g.PaymentStatus = cs.PaymentStatus
// Items
g.Items = g.Items[:0]
for _, it := range cs.Items {
if it == nil {
continue
}
outlet := toPtr(it.Outlet)
storeId := toPtr(it.StoreId)
g.Items = append(g.Items, &CartItem{
Id: int(it.Id),
ItemId: int(it.ItemId),
Sku: it.Sku,
Name: it.Name,
Price: it.Price,
Quantity: int(it.Qty),
TotalPrice: it.TotalPrice,
TotalTax: it.TotalTax,
OrgPrice: it.OrgPrice,
TaxRate: int(it.TaxRate),
Brand: it.Brand,
Category: it.Category,
Category2: it.Category2,
Category3: it.Category3,
Category4: it.Category4,
Category5: it.Category5,
Image: it.Image,
ArticleType: it.Type,
SellerId: it.SellerId,
SellerName: it.SellerName,
Disclaimer: it.Disclaimer,
Outlet: outlet,
StoreId: storeId,
Stock: StockStatus(it.Stock),
// Tax, TaxRate already set via Price / Totals if needed
})
if it.Id > int64(g.lastItemId) {
g.lastItemId = int(it.Id)
}
}
// Deliveries
g.Deliveries = g.Deliveries[:0]
for _, d := range cs.Deliveries {
if d == nil {
continue
}
intIds := make([]int, 0, len(d.Items))
for _, id := range d.Items {
intIds = append(intIds, int(id))
}
var pp *messages.PickupPoint
if d.PickupPoint != nil {
pp = &messages.PickupPoint{
Id: d.PickupPoint.Id,
Name: d.PickupPoint.Name,
Address: d.PickupPoint.Address,
City: d.PickupPoint.City,
Zip: d.PickupPoint.Zip,
Country: d.PickupPoint.Country,
}
}
g.Deliveries = append(g.Deliveries, &CartDelivery{
Id: int(d.Id),
Provider: d.Provider,
Price: d.Price,
Items: intIds,
PickupPoint: pp,
})
if d.Id > int64(g.lastDeliveryId) {
g.lastDeliveryId = int(d.Id)
}
}
return g
}
// Helper to safely de-reference optional string pointers to value or "".
func deref(p *string) string {
if p == nil {
return ""
}
return *p
}
func toPtr(s string) *string {
if s == "" {
return nil
}
return &s
}

View File

@@ -1,115 +0,0 @@
package main
import (
"context"
"fmt"
"testing"
"time"
messages "git.tornberg.me/go-cart-actor/proto"
"google.golang.org/grpc"
)
// TestCartActorMutationAndState validates end-to-end gRPC mutation + state retrieval
// against a locally started gRPC server (single-node scenario).
// This test uses the new per-mutation AddItem RPC (breaking v2 API) to avoid external product fetch logic
// fetching logic (FetchItem) which would require network I/O.
func TestCartActorMutationAndState(t *testing.T) {
// Setup local grain pool + synced pool (no discovery, single host)
pool := NewGrainLocalPool(1024, time.Minute, spawn)
synced, err := NewSyncedPool(pool, "127.0.0.1", nil)
if err != nil {
t.Fatalf("NewSyncedPool error: %v", err)
}
// Start gRPC server (CartActor + ControlPlane) on :1337
grpcSrv, err := StartGRPCServer(":1337", pool, synced)
if err != nil {
t.Fatalf("StartGRPCServer error: %v", err)
}
defer grpcSrv.GracefulStop()
// Dial the local server
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, "127.0.0.1:1337",
grpc.WithInsecure(),
grpc.WithBlock(),
)
if err != nil {
t.Fatalf("grpc.Dial error: %v", err)
}
defer conn.Close()
cartClient := messages.NewCartActorClient(conn)
// Create a short cart id (<=16 chars so it fits into the fixed CartId 16-byte array cleanly)
cartID := fmt.Sprintf("cart-%d", time.Now().UnixNano())
// Build an AddItem payload (bypasses FetchItem to keep test deterministic)
addItem := &messages.AddItem{
ItemId: 1,
Quantity: 1,
Price: 1000,
OrgPrice: 1000,
Sku: "test-sku",
Name: "Test SKU",
Image: "/img.png",
Stock: 2, // InStock
Tax: 2500,
Country: "se",
}
// Issue AddItem RPC directly (breaking v2 API)
addResp, err := cartClient.AddItem(context.Background(), &messages.AddItemRequest{
CartId: cartID,
ClientTimestamp: time.Now().Unix(),
Payload: addItem,
})
if err != nil {
t.Fatalf("AddItem RPC error: %v", err)
}
if addResp.StatusCode != 200 {
t.Fatalf("AddItem returned non-200 status: %d, error: %s", addResp.StatusCode, addResp.GetError())
}
// Validate the response state (from AddItem)
state := addResp.GetState()
if state == nil {
t.Fatalf("AddItem response state is nil")
}
// (Removed obsolete Mutate response handling)
if len(state.Items) != 1 {
t.Fatalf("Expected 1 item after AddItem, got %d", len(state.Items))
}
if state.Items[0].Sku != "test-sku" {
t.Fatalf("Unexpected item SKU: %s", state.Items[0].Sku)
}
// Issue GetState RPC
getResp, err := cartClient.GetState(context.Background(), &messages.StateRequest{
CartId: cartID,
})
if err != nil {
t.Fatalf("GetState RPC error: %v", err)
}
if getResp.StatusCode != 200 {
t.Fatalf("GetState returned non-200 status: %d, error: %s", getResp.StatusCode, getResp.GetError())
}
state2 := getResp.GetState()
if state2 == nil {
t.Fatalf("GetState response state is nil")
}
if len(state2.Items) != 1 {
t.Fatalf("Expected 1 item in GetState, got %d", len(state2.Items))
}
if state2.Items[0].Sku != "test-sku" {
t.Fatalf("Unexpected SKU in GetState: %s", state2.Items[0].Sku)
}
}
// Legacy serialization helper removed (oneof envelope used directly)

View File

@@ -16,7 +16,6 @@ import (
// cartActorGRPCServer implements the CartActor and ControlPlane gRPC services.
// It delegates cart operations to a grain pool and cluster operations to a synced pool.
type cartActorGRPCServer struct {
messages.UnimplementedCartActorServer
messages.UnimplementedControlPlaneServer
pool GrainPool // For cart state mutations and queries
@@ -31,172 +30,6 @@ func NewCartActorGRPCServer(pool GrainPool, syncedPool *SyncedPool) *cartActorGR
}
}
// applyMutation routes a single cart mutation to the target grain (used by per-mutation RPC handlers).
func (s *cartActorGRPCServer) applyMutation(cartID string, mutation interface{}) *messages.CartMutationReply {
// Canonicalize or preserve legacy id (do NOT hash-rewrite legacy textual ids)
cid, _, wasBase62, cerr := CanonicalizeOrLegacy(cartID)
if cerr != nil {
return &messages.CartMutationReply{
StatusCode: 500,
Result: &messages.CartMutationReply_Error{Error: fmt.Sprintf("cart_id canonicalization failed: %v", cerr)},
ServerTimestamp: time.Now().Unix(),
}
}
_ = wasBase62 // placeholder; future: propagate canonical id in reply metadata
legacy := CartIDToLegacy(cid)
grain, err := s.pool.Apply(legacy, mutation)
if err != nil {
return &messages.CartMutationReply{
StatusCode: 500,
Result: &messages.CartMutationReply_Error{Error: err.Error()},
ServerTimestamp: time.Now().Unix(),
}
}
cartState := ToCartState(grain)
return &messages.CartMutationReply{
StatusCode: 200,
Result: &messages.CartMutationReply_State{State: cartState},
ServerTimestamp: time.Now().Unix(),
}
}
func (s *cartActorGRPCServer) AddRequest(ctx context.Context, req *messages.AddRequestRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
func (s *cartActorGRPCServer) AddItem(ctx context.Context, req *messages.AddItemRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
func (s *cartActorGRPCServer) RemoveItem(ctx context.Context, req *messages.RemoveItemRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
func (s *cartActorGRPCServer) RemoveDelivery(ctx context.Context, req *messages.RemoveDeliveryRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
func (s *cartActorGRPCServer) ChangeQuantity(ctx context.Context, req *messages.ChangeQuantityRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
func (s *cartActorGRPCServer) SetDelivery(ctx context.Context, req *messages.SetDeliveryRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
func (s *cartActorGRPCServer) SetPickupPoint(ctx context.Context, req *messages.SetPickupPointRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
/*
Checkout RPC removed. Checkout is handled at the HTTP layer (PoolServer.HandleCheckout).
*/
func (s *cartActorGRPCServer) SetCartItems(ctx context.Context, req *messages.SetCartItemsRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
func (s *cartActorGRPCServer) OrderCompleted(ctx context.Context, req *messages.OrderCompletedRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
// GetState retrieves the current state of a cart grain.
func (s *cartActorGRPCServer) GetState(ctx context.Context, req *messages.StateRequest) (*messages.StateReply, error) {
if req.GetCartId() == "" {
return &messages.StateReply{
StatusCode: 400,
Result: &messages.StateReply_Error{Error: "cart_id is required"},
}, nil
}
// Canonicalize / upgrade incoming cart id (preserve legacy strings)
cid, _, _, cerr := CanonicalizeOrLegacy(req.GetCartId())
if cerr != nil {
return &messages.StateReply{
StatusCode: 500,
Result: &messages.StateReply_Error{Error: fmt.Sprintf("cart_id canonicalization failed: %v", cerr)},
}, nil
}
legacy := CartIDToLegacy(cid)
grain, err := s.pool.Get(legacy)
if err != nil {
return &messages.StateReply{
StatusCode: 500,
Result: &messages.StateReply_Error{Error: err.Error()},
}, nil
}
cartState := ToCartState(grain)
return &messages.StateReply{
StatusCode: 200,
Result: &messages.StateReply_State{State: cartState},
}, nil
}
// ControlPlane: Ping
func (s *cartActorGRPCServer) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) {
// Expose cart owner cookie (first-touch owner = this host) for HTTP gateways translating gRPC metadata.
@@ -269,7 +102,6 @@ func StartGRPCServer(addr string, pool GrainPool, syncedPool *SyncedPool) (*grpc
grpcServer := grpc.NewServer()
server := NewCartActorGRPCServer(pool, syncedPool)
messages.RegisterCartActorServer(grpcServer, server)
messages.RegisterControlPlaneServer(grpcServer, server)
reflection.Register(grpcServer)

View File

@@ -1,180 +0,0 @@
package main
import (
"context"
"fmt"
"testing"
"time"
messages "git.tornberg.me/go-cart-actor/proto"
"google.golang.org/grpc"
)
// TestMultiNodeOwnershipNegotiation spins up two gRPC servers (nodeA, nodeB),
// manually links their SyncedPools (bypassing AddRemote's fixed port assumption),
// and verifies that only one node becomes the owner of a new cart while the
// other can still apply a mutation via the remote proxy path.
//
// NOTE:
// - We manually inject RemoteHostGRPC entries because AddRemote() hard-codes
// port 1337; to run two distinct servers concurrently we need distinct ports.
// - This test asserts single ownership consistency rather than the complete
// quorum semantics (which depend on real discovery + AddRemote).
func TestMultiNodeOwnershipNegotiation(t *testing.T) {
// Allocate distinct ports for the two nodes.
const (
addrA = "127.0.0.1:18081"
addrB = "127.0.0.1:18082"
hostA = "nodeA"
hostB = "nodeB"
)
// Create local grain pools.
poolA := NewGrainLocalPool(1024, time.Minute, spawn)
poolB := NewGrainLocalPool(1024, time.Minute, spawn)
// Create synced pools (no discovery).
syncedA, err := NewSyncedPool(poolA, hostA, nil)
if err != nil {
t.Fatalf("nodeA NewSyncedPool error: %v", err)
}
syncedB, err := NewSyncedPool(poolB, hostB, nil)
if err != nil {
t.Fatalf("nodeB NewSyncedPool error: %v", err)
}
// Start gRPC servers (CartActor + ControlPlane) on different ports.
grpcSrvA, err := StartGRPCServer(addrA, poolA, syncedA)
if err != nil {
t.Fatalf("StartGRPCServer A error: %v", err)
}
defer grpcSrvA.GracefulStop()
grpcSrvB, err := StartGRPCServer(addrB, poolB, syncedB)
if err != nil {
t.Fatalf("StartGRPCServer B error: %v", err)
}
defer grpcSrvB.GracefulStop()
// Helper to connect one pool to the other's server (manual AddRemote equivalent).
link := func(src *SyncedPool, remoteHost, remoteAddr string) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
conn, dialErr := grpc.DialContext(ctx, remoteAddr, grpc.WithInsecure(), grpc.WithBlock())
if dialErr != nil {
t.Fatalf("dial %s (%s) failed: %v", remoteHost, remoteAddr, dialErr)
}
cartClient := messages.NewCartActorClient(conn)
controlClient := messages.NewControlPlaneClient(conn)
src.mu.Lock()
src.remoteHosts[remoteHost] = &RemoteHostGRPC{
Host: remoteHost,
Conn: conn,
CartClient: cartClient,
ControlClient: controlClient,
}
src.mu.Unlock()
}
// Cross-link the two pools.
link(syncedA, hostB, addrB)
link(syncedB, hostA, addrA)
// Ring-based ownership removed; no ring refresh needed.
// Allow brief stabilization (control plane pings / no real negotiation needed here).
time.Sleep(200 * time.Millisecond)
// Create a deterministic cart id for test readability.
cartID := ToCartId(fmt.Sprintf("cart-%d", time.Now().UnixNano()))
// Mutation payload (ring-determined ownership; no assumption about which node owns).
addItem := &messages.AddItem{
ItemId: 1,
Quantity: 1,
Price: 1500,
OrgPrice: 1500,
Sku: "sku-test-multi",
Name: "Multi Node Test",
Image: "/test.png",
Stock: 2,
Tax: 2500,
Country: "se",
}
// Determine ring owner and set primary / secondary references.
ownerHost := syncedA.DebugOwnerHost(cartID)
var ownerSynced, otherSynced *SyncedPool
var ownerPool, otherPool *GrainLocalPool
switch ownerHost {
case hostA:
ownerSynced, ownerPool = syncedA, poolA
otherSynced, otherPool = syncedB, poolB
case hostB:
ownerSynced, ownerPool = syncedB, poolB
otherSynced, otherPool = syncedA, poolA
default:
t.Fatalf("unexpected ring owner %s (expected %s or %s)", ownerHost, hostA, hostB)
}
// Apply mutation on the ring-designated owner.
if _, err := ownerSynced.Apply(cartID, addItem); err != nil {
t.Fatalf("owner %s Apply addItem error: %v", ownerHost, err)
}
// Validate owner pool has the grain and the other does not.
if _, ok := ownerPool.GetGrains()[cartID]; !ok {
t.Fatalf("expected owner %s to have local grain", ownerHost)
}
if _, ok := otherPool.GetGrains()[cartID]; ok {
t.Fatalf("non-owner unexpectedly holds local grain")
}
// Prepare change mutation to be applied from the non-owner (should route remotely).
change := &messages.ChangeQuantity{
Id: 1, // line id after first AddItem
Quantity: 2,
}
// Apply remotely via the non-owner.
if _, err := otherSynced.Apply(cartID, change); err != nil {
t.Fatalf("non-owner remote Apply changeQuantity error: %v", err)
}
// Remote re-mutation already performed via otherSynced; removed duplicate block.
// NodeB local grain assertion:
// Only assert absence if nodeB is NOT the ring-designated owner. If nodeB is the owner,
// it is expected to have a local grain (previous generic ownership assertions already ran).
if ownerHost != hostB {
if _, local := poolB.GetGrains()[cartID]; local {
t.Fatalf("nodeB unexpectedly created local grain (ownership duplication)")
}
}
// Fetch state from nodeB to ensure we see updated quantity (2).
grainStateB, err := syncedB.Get(cartID)
if err != nil {
t.Fatalf("nodeB Get error: %v", err)
}
if len(grainStateB.Items) != 1 || grainStateB.Items[0].Quantity != 2 {
t.Fatalf("nodeB observed inconsistent state: items=%d qty=%d (expected 1 / 2)",
len(grainStateB.Items),
func() int {
if len(grainStateB.Items) == 0 {
return -1
}
return grainStateB.Items[0].Quantity
}(),
)
}
// Cross-check from nodeA (authoritative) to ensure state matches.
grainStateA, err := syncedA.Get(cartID)
if err != nil {
t.Fatalf("nodeA Get error: %v", err)
}
if grainStateA.Items[0].Quantity != 2 {
t.Fatalf("nodeA authoritative state mismatch: expected qty=2 got %d", grainStateA.Items[0].Quantity)
}
}

View File

@@ -1,301 +0,0 @@
package main
import (
"context"
"fmt"
"testing"
"time"
messages "git.tornberg.me/go-cart-actor/proto"
"google.golang.org/grpc"
)
// TestThreeNodeMajorityOwnership validates ring-determined ownership and routing
// in a 3-node cluster (A,B,C) using the consistent hashing ring (no quorum RPC).
// The previous ConfirmOwner / quorum semantics have been removed; ownership is
// deterministic and derived from the ring.
//
// It validates:
// 1. The ring selects exactly one primary owner for a new cart.
// 2. Other nodes (B,C) do NOT create local grains for the cart.
// 3. Remote proxies are installed lazily so remote mutations can route.
// 4. A remote mutation from one non-owner updates state visible on another.
// 5. Authoritative state on the owner matches remote observations.
// 6. (Future) This scaffolds replication tests when RF>1 is enabled.
//
// (Legacy comments about ConfirmOwner acceptance thresholds have been removed.)
// (Function name retained for historical continuity.)
func TestThreeNodeMajorityOwnership(t *testing.T) {
const (
addrA = "127.0.0.1:18181"
addrB = "127.0.0.1:18182"
addrC = "127.0.0.1:18183"
hostA = "nodeA3"
hostB = "nodeB3"
hostC = "nodeC3"
)
// Local grain pools
poolA := NewGrainLocalPool(1024, time.Minute, spawn)
poolB := NewGrainLocalPool(1024, time.Minute, spawn)
poolC := NewGrainLocalPool(1024, time.Minute, spawn)
// Synced pools (no discovery)
syncedA, err := NewSyncedPool(poolA, hostA, nil)
if err != nil {
t.Fatalf("nodeA NewSyncedPool error: %v", err)
}
syncedB, err := NewSyncedPool(poolB, hostB, nil)
if err != nil {
t.Fatalf("nodeB NewSyncedPool error: %v", err)
}
syncedC, err := NewSyncedPool(poolC, hostC, nil)
if err != nil {
t.Fatalf("nodeC NewSyncedPool error: %v", err)
}
// Start gRPC servers
grpcSrvA, err := StartGRPCServer(addrA, poolA, syncedA)
if err != nil {
t.Fatalf("StartGRPCServer A error: %v", err)
}
defer grpcSrvA.GracefulStop()
grpcSrvB, err := StartGRPCServer(addrB, poolB, syncedB)
if err != nil {
t.Fatalf("StartGRPCServer B error: %v", err)
}
defer grpcSrvB.GracefulStop()
grpcSrvC, err := StartGRPCServer(addrC, poolC, syncedC)
if err != nil {
t.Fatalf("StartGRPCServer C error: %v", err)
}
defer grpcSrvC.GracefulStop()
// Helper for manual cross-link (since AddRemote assumes fixed port)
link := func(src *SyncedPool, remoteHost, remoteAddr string) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
conn, dialErr := grpc.DialContext(ctx, remoteAddr, grpc.WithInsecure(), grpc.WithBlock())
if dialErr != nil {
t.Fatalf("dial %s (%s) failed: %v", remoteHost, remoteAddr, dialErr)
}
cartClient := messages.NewCartActorClient(conn)
controlClient := messages.NewControlPlaneClient(conn)
src.mu.Lock()
src.remoteHosts[remoteHost] = &RemoteHostGRPC{
Host: remoteHost,
Conn: conn,
CartClient: cartClient,
ControlClient: controlClient,
}
src.mu.Unlock()
}
// Full mesh (each node knows all others)
link(syncedA, hostB, addrB)
link(syncedA, hostC, addrC)
link(syncedB, hostA, addrA)
link(syncedB, hostC, addrC)
link(syncedC, hostA, addrA)
link(syncedC, hostB, addrB)
// Ring-based ownership removed; no ring refresh needed.
// Allow brief stabilization
time.Sleep(200 * time.Millisecond)
// Deterministic-ish cart id
cartID := ToCartId(fmt.Sprintf("cart3-%d", time.Now().UnixNano()))
addItem := &messages.AddItem{
ItemId: 10,
Quantity: 1,
Price: 5000,
OrgPrice: 5000,
Sku: "sku-3node",
Name: "Three Node Test",
Image: "/t.png",
Stock: 10,
Tax: 2500,
Country: "se",
}
// Determine ring-designated owner (may be any of the three hosts)
ownerPre := syncedA.DebugOwnerHost(cartID)
if ownerPre != hostA && ownerPre != hostB && ownerPre != hostC {
t.Fatalf("ring returned unexpected owner %s (not in set {%s,%s,%s})", ownerPre, hostA, hostB, hostC)
}
var ownerSynced *SyncedPool
var ownerPool *GrainLocalPool
switch ownerPre {
case hostA:
ownerSynced, ownerPool = syncedA, poolA
case hostB:
ownerSynced, ownerPool = syncedB, poolB
case hostC:
ownerSynced, ownerPool = syncedC, poolC
}
// Pick two distinct non-owner nodes for remote mutation assertions
var remote1Synced, remote2Synced *SyncedPool
switch ownerPre {
case hostA:
remote1Synced, remote2Synced = syncedB, syncedC
case hostB:
remote1Synced, remote2Synced = syncedA, syncedC
case hostC:
remote1Synced, remote2Synced = syncedA, syncedB
}
// Apply on the ring-designated owner
if _, err := ownerSynced.Apply(cartID, addItem); err != nil {
t.Fatalf("owner %s Apply addItem error: %v", ownerPre, err)
}
// Small wait for remote proxy spawn (ring ownership already deterministic)
time.Sleep(150 * time.Millisecond)
// Assert only nodeA has local grain
localCount := 0
if _, ok := poolA.GetGrains()[cartID]; ok {
localCount++
}
if _, ok := poolB.GetGrains()[cartID]; ok {
localCount++
}
if _, ok := poolC.GetGrains()[cartID]; ok {
localCount++
}
if localCount != 1 {
t.Fatalf("expected exactly 1 local grain, got %d", localCount)
}
if _, ok := ownerPool.GetGrains()[cartID]; !ok {
t.Fatalf("expected owner %s to hold local grain", ownerPre)
}
// First-touch ownership: remote mutation claims ownership on first access (no remote proxies).
// Issue remote mutation from one non-owner -> ChangeQuantity (increase)
change := &messages.ChangeQuantity{
Id: 1,
Quantity: 3,
}
if _, err := remote1Synced.Apply(cartID, change); err != nil {
t.Fatalf("remote mutation (remote1) changeQuantity error: %v", err)
}
// Validate updated state visible via nodeC
stateC, err := remote2Synced.Get(cartID)
if err != nil {
t.Fatalf("nodeC Get error: %v", err)
}
if len(stateC.Items) != 1 || stateC.Items[0].Quantity != 3 {
t.Fatalf("nodeC observed state mismatch: items=%d qty=%d (expected 1 / 3)",
len(stateC.Items),
func() int {
if len(stateC.Items) == 0 {
return -1
}
return stateC.Items[0].Quantity
}(),
)
}
// Cross-check authoritative nodeA
stateA, err := syncedA.Get(cartID)
if err != nil {
t.Fatalf("nodeA Get error: %v", err)
}
if stateA.Items[0].Quantity != 3 {
t.Fatalf("nodeA authoritative state mismatch: expected qty=3 got %d", stateA.Items[0].Quantity)
}
}
// TestThreeNodeDiscoveryMajorityOwnership (placeholder)
// This test is a scaffold demonstrating how a MockDiscovery would be wired
// once AddRemote supports host:port (currently hard-coded to :1337).
// It is skipped to avoid flakiness / false negatives until the production
// AddRemote logic is enhanced to parse dynamic ports or the test harness
// provides consistent port mapping.
func TestThreeNodeDiscoveryMajorityOwnership(t *testing.T) {
t.Skip("Pending enhancement: AddRemote needs host:port support to fully exercise discovery-based multi-node linking")
// Example skeleton (non-functional with current AddRemote implementation):
//
// md := NewMockDiscovery([]string{"nodeB3", "nodeC3"})
// poolA := NewGrainLocalPool(1024, time.Minute, spawn)
// syncedA, err := NewSyncedPool(poolA, "nodeA3", md)
// if err != nil {
// t.Fatalf("NewSyncedPool with mock discovery error: %v", err)
// }
// // Start server for nodeA (would also need servers for nodeB3/nodeC3 on expected ports)
// // grpcSrvA, _ := StartGRPCServer(":1337", poolA, syncedA)
// // defer grpcSrvA.GracefulStop()
//
// // Dynamically add a host via discovery
// // md.AddHost("nodeB3")
// // time.Sleep(100 * time.Millisecond) // allow AddRemote attempt
//
// // Assertions would verify syncedA.remoteHosts contains "nodeB3"
}
// TestHostRemovalAndErrorWithMockDiscovery validates behavior when:
// 1. Discovery reports a host that cannot be dialed (AddRemote error path)
// 2. That host is then removed (Deleted event) without leaving residual state
// 3. A second failing host is added afterward (ensuring watcher still processes events)
//
// NOTE: Because AddRemote currently hard-codes :1337 and we are NOT starting a
// real server for the bogus hosts, the dial will fail and the remote host should
// never appear in remoteHosts. This intentionally exercises the error logging
// path: "AddRemote: dial ... failed".
func TestHostRemovalAndErrorWithMockDiscovery(t *testing.T) {
// Start a real node A (acts as the observing node)
const addrA = "127.0.0.1:18281"
hostA := "nodeA-md"
poolA := NewGrainLocalPool(128, time.Minute, spawn)
// Mock discovery starts with one bogus host that will fail to connect.
md := NewMockDiscovery([]string{"bogus-host-1"})
syncedA, err := NewSyncedPool(poolA, hostA, md)
if err != nil {
t.Fatalf("NewSyncedPool error: %v", err)
}
grpcSrvA, err := StartGRPCServer(addrA, poolA, syncedA)
if err != nil {
t.Fatalf("StartGRPCServer A error: %v", err)
}
defer grpcSrvA.GracefulStop()
// Kick off watch processing by starting Watch() (NewSyncedPool does this internally
// when discovery is non-nil, but we ensure events channel is active).
// The initial bogus host should trigger AddRemote -> dial failure.
time.Sleep(300 * time.Millisecond)
syncedA.mu.RLock()
if len(syncedA.remoteHosts) != 0 {
syncedA.mu.RUnlock()
t.Fatalf("expected 0 remoteHosts after failing dial, got %d", len(syncedA.remoteHosts))
}
syncedA.mu.RUnlock()
// Remove the bogus host (should not panic; no entry to clean up).
md.RemoveHost("bogus-host-1")
time.Sleep(100 * time.Millisecond)
// Add another bogus host to ensure watcher still alive.
md.AddHost("bogus-host-2")
time.Sleep(300 * time.Millisecond)
syncedA.mu.RLock()
if len(syncedA.remoteHosts) != 0 {
syncedA.mu.RUnlock()
t.Fatalf("expected 0 remoteHosts after second failing dial, got %d", len(syncedA.remoteHosts))
}
syncedA.mu.RUnlock()
// Clean up discovery
md.Close()
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,187 +0,0 @@
syntax = "proto3";
package messages;
option go_package = "git.tornberg.me/go-cart-actor/proto;messages";
import "messages.proto";
// -----------------------------------------------------------------------------
// Cart Actor gRPC API (Breaking v2 - Per-Mutation RPCs)
// -----------------------------------------------------------------------------
// This version removes the previous MutationEnvelope + Mutate RPC.
// Each mutation now has its own request wrapper and dedicated RPC method
// providing simpler, type-focused client stubs and enabling per-mutation
// metrics, auth and rate limiting.
//
// Regenerate Go code after editing:
// protoc --go_out=. --go_opt=paths=source_relative \
// --go-grpc_out=. --go-grpc_opt=paths=source_relative \
// proto/cart_actor.proto proto/messages.proto
//
// Backward compatibility: This is a breaking change (old clients must update).
// -----------------------------------------------------------------------------
// Shared reply for all mutation RPCs.
message CartMutationReply {
int32 status_code = 1; // HTTP-like status (200 success, 4xx client, 5xx server)
oneof result {
CartState state = 2; // Updated cart state on success
string error = 3; // Error message on failure
}
int64 server_timestamp = 4; // Server-assigned Unix timestamp (optional auditing)
}
// Fetch current cart state without mutation.
message StateRequest {
string cart_id = 1;
}
message StateReply {
int32 status_code = 1;
oneof result {
CartState state = 2;
string error = 3;
}
}
// Per-mutation request wrappers. We wrap the existing inner mutation
// messages (defined in messages.proto) to add cart_id + optional metadata
// without altering the inner message definitions.
message AddRequestRequest {
string cart_id = 1;
int64 client_timestamp = 2;
AddRequest payload = 10;
}
message AddItemRequest {
string cart_id = 1;
int64 client_timestamp = 2;
AddItem payload = 10;
}
message RemoveItemRequest {
string cart_id = 1;
int64 client_timestamp = 2;
RemoveItem payload = 10;
}
message RemoveDeliveryRequest {
string cart_id = 1;
int64 client_timestamp = 2;
RemoveDelivery payload = 10;
}
message ChangeQuantityRequest {
string cart_id = 1;
int64 client_timestamp = 2;
ChangeQuantity payload = 10;
}
message SetDeliveryRequest {
string cart_id = 1;
int64 client_timestamp = 2;
SetDelivery payload = 10;
}
message SetPickupPointRequest {
string cart_id = 1;
int64 client_timestamp = 2;
SetPickupPoint payload = 10;
}
message CreateCheckoutOrderRequest {
string cart_id = 1;
int64 client_timestamp = 2;
CreateCheckoutOrder payload = 10;
}
message SetCartItemsRequest {
string cart_id = 1;
int64 client_timestamp = 2;
SetCartRequest payload = 10;
}
message OrderCompletedRequest {
string cart_id = 1;
int64 client_timestamp = 2;
OrderCreated payload = 10;
}
// Excerpt: updated messages for camelCase JSON output
message CartState {
string id = 1; // was cart_id
repeated CartItemState items = 2;
int64 totalPrice = 3; // was total_price
int64 totalTax = 4; // was total_tax
int64 totalDiscount = 5; // was total_discount
repeated DeliveryState deliveries = 6;
bool paymentInProgress = 7; // was payment_in_progress
string orderReference = 8; // was order_reference
string paymentStatus = 9; // was payment_status
bool processing = 10; // NEW (mirrors legacy CartGrain.processing)
}
message CartItemState {
int64 id = 1;
int64 itemId = 2; // was source_item_id
string sku = 3;
string name = 4;
int64 price = 5; // was unit_price
int32 qty = 6; // was quantity
int64 totalPrice = 7; // was total_price
int64 totalTax = 8; // was total_tax
int64 orgPrice = 9; // was org_price
int32 taxRate = 10; // was tax_rate
int64 totalDiscount = 11;
string brand = 12;
string category = 13;
string category2 = 14;
string category3 = 15;
string category4 = 16;
string category5 = 17;
string image = 18;
string type = 19; // was article_type
string sellerId = 20; // was seller_id
string sellerName = 21; // was seller_name
string disclaimer = 22;
string outlet = 23;
string storeId = 24; // was store_id
int32 stock = 25;
}
message DeliveryState {
int64 id = 1;
string provider = 2;
int64 price = 3;
repeated int64 items = 4; // was item_ids
PickupPoint pickupPoint = 5; // was pickup_point
}
// (CheckoutRequest / CheckoutReply removed - checkout handled at HTTP layer)
// -----------------------------------------------------------------------------
// Service definition (per-mutation RPCs + checkout)
// -----------------------------------------------------------------------------
service CartActor {
rpc AddRequest(AddRequestRequest) returns (CartMutationReply);
rpc AddItem(AddItemRequest) returns (CartMutationReply);
rpc RemoveItem(RemoveItemRequest) returns (CartMutationReply);
rpc RemoveDelivery(RemoveDeliveryRequest) returns (CartMutationReply);
rpc ChangeQuantity(ChangeQuantityRequest) returns (CartMutationReply);
rpc SetDelivery(SetDeliveryRequest) returns (CartMutationReply);
rpc SetPickupPoint(SetPickupPointRequest) returns (CartMutationReply);
// (Checkout RPC removed - handled externally)
rpc SetCartItems(SetCartItemsRequest) returns (CartMutationReply);
rpc OrderCompleted(OrderCompletedRequest) returns (CartMutationReply);
rpc GetState(StateRequest) returns (StateReply);
}
// -----------------------------------------------------------------------------
// Future enhancements:
// * BatchMutate RPC (repeated heterogeneous mutations)
// * Streaming state updates (WatchState)
// * Versioning / optimistic concurrency control
// -----------------------------------------------------------------------------

View File

@@ -1,473 +0,0 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v3.21.12
// source: cart_actor.proto
package messages
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
CartActor_AddRequest_FullMethodName = "/messages.CartActor/AddRequest"
CartActor_AddItem_FullMethodName = "/messages.CartActor/AddItem"
CartActor_RemoveItem_FullMethodName = "/messages.CartActor/RemoveItem"
CartActor_RemoveDelivery_FullMethodName = "/messages.CartActor/RemoveDelivery"
CartActor_ChangeQuantity_FullMethodName = "/messages.CartActor/ChangeQuantity"
CartActor_SetDelivery_FullMethodName = "/messages.CartActor/SetDelivery"
CartActor_SetPickupPoint_FullMethodName = "/messages.CartActor/SetPickupPoint"
CartActor_SetCartItems_FullMethodName = "/messages.CartActor/SetCartItems"
CartActor_OrderCompleted_FullMethodName = "/messages.CartActor/OrderCompleted"
CartActor_GetState_FullMethodName = "/messages.CartActor/GetState"
)
// CartActorClient is the client API for CartActor service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
//
// -----------------------------------------------------------------------------
// Service definition (per-mutation RPCs + checkout)
// -----------------------------------------------------------------------------
type CartActorClient interface {
AddRequest(ctx context.Context, in *AddRequestRequest, opts ...grpc.CallOption) (*CartMutationReply, error)
AddItem(ctx context.Context, in *AddItemRequest, opts ...grpc.CallOption) (*CartMutationReply, error)
RemoveItem(ctx context.Context, in *RemoveItemRequest, opts ...grpc.CallOption) (*CartMutationReply, error)
RemoveDelivery(ctx context.Context, in *RemoveDeliveryRequest, opts ...grpc.CallOption) (*CartMutationReply, error)
ChangeQuantity(ctx context.Context, in *ChangeQuantityRequest, opts ...grpc.CallOption) (*CartMutationReply, error)
SetDelivery(ctx context.Context, in *SetDeliveryRequest, opts ...grpc.CallOption) (*CartMutationReply, error)
SetPickupPoint(ctx context.Context, in *SetPickupPointRequest, opts ...grpc.CallOption) (*CartMutationReply, error)
// (Checkout RPC removed - handled externally)
SetCartItems(ctx context.Context, in *SetCartItemsRequest, opts ...grpc.CallOption) (*CartMutationReply, error)
OrderCompleted(ctx context.Context, in *OrderCompletedRequest, opts ...grpc.CallOption) (*CartMutationReply, error)
GetState(ctx context.Context, in *StateRequest, opts ...grpc.CallOption) (*StateReply, error)
}
type cartActorClient struct {
cc grpc.ClientConnInterface
}
func NewCartActorClient(cc grpc.ClientConnInterface) CartActorClient {
return &cartActorClient{cc}
}
func (c *cartActorClient) AddRequest(ctx context.Context, in *AddRequestRequest, opts ...grpc.CallOption) (*CartMutationReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CartMutationReply)
err := c.cc.Invoke(ctx, CartActor_AddRequest_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *cartActorClient) AddItem(ctx context.Context, in *AddItemRequest, opts ...grpc.CallOption) (*CartMutationReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CartMutationReply)
err := c.cc.Invoke(ctx, CartActor_AddItem_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *cartActorClient) RemoveItem(ctx context.Context, in *RemoveItemRequest, opts ...grpc.CallOption) (*CartMutationReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CartMutationReply)
err := c.cc.Invoke(ctx, CartActor_RemoveItem_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *cartActorClient) RemoveDelivery(ctx context.Context, in *RemoveDeliveryRequest, opts ...grpc.CallOption) (*CartMutationReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CartMutationReply)
err := c.cc.Invoke(ctx, CartActor_RemoveDelivery_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *cartActorClient) ChangeQuantity(ctx context.Context, in *ChangeQuantityRequest, opts ...grpc.CallOption) (*CartMutationReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CartMutationReply)
err := c.cc.Invoke(ctx, CartActor_ChangeQuantity_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *cartActorClient) SetDelivery(ctx context.Context, in *SetDeliveryRequest, opts ...grpc.CallOption) (*CartMutationReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CartMutationReply)
err := c.cc.Invoke(ctx, CartActor_SetDelivery_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *cartActorClient) SetPickupPoint(ctx context.Context, in *SetPickupPointRequest, opts ...grpc.CallOption) (*CartMutationReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CartMutationReply)
err := c.cc.Invoke(ctx, CartActor_SetPickupPoint_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *cartActorClient) SetCartItems(ctx context.Context, in *SetCartItemsRequest, opts ...grpc.CallOption) (*CartMutationReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CartMutationReply)
err := c.cc.Invoke(ctx, CartActor_SetCartItems_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *cartActorClient) OrderCompleted(ctx context.Context, in *OrderCompletedRequest, opts ...grpc.CallOption) (*CartMutationReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CartMutationReply)
err := c.cc.Invoke(ctx, CartActor_OrderCompleted_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *cartActorClient) GetState(ctx context.Context, in *StateRequest, opts ...grpc.CallOption) (*StateReply, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(StateReply)
err := c.cc.Invoke(ctx, CartActor_GetState_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// CartActorServer is the server API for CartActor service.
// All implementations must embed UnimplementedCartActorServer
// for forward compatibility.
//
// -----------------------------------------------------------------------------
// Service definition (per-mutation RPCs + checkout)
// -----------------------------------------------------------------------------
type CartActorServer interface {
AddRequest(context.Context, *AddRequestRequest) (*CartMutationReply, error)
AddItem(context.Context, *AddItemRequest) (*CartMutationReply, error)
RemoveItem(context.Context, *RemoveItemRequest) (*CartMutationReply, error)
RemoveDelivery(context.Context, *RemoveDeliveryRequest) (*CartMutationReply, error)
ChangeQuantity(context.Context, *ChangeQuantityRequest) (*CartMutationReply, error)
SetDelivery(context.Context, *SetDeliveryRequest) (*CartMutationReply, error)
SetPickupPoint(context.Context, *SetPickupPointRequest) (*CartMutationReply, error)
// (Checkout RPC removed - handled externally)
SetCartItems(context.Context, *SetCartItemsRequest) (*CartMutationReply, error)
OrderCompleted(context.Context, *OrderCompletedRequest) (*CartMutationReply, error)
GetState(context.Context, *StateRequest) (*StateReply, error)
mustEmbedUnimplementedCartActorServer()
}
// UnimplementedCartActorServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedCartActorServer struct{}
func (UnimplementedCartActorServer) AddRequest(context.Context, *AddRequestRequest) (*CartMutationReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddRequest not implemented")
}
func (UnimplementedCartActorServer) AddItem(context.Context, *AddItemRequest) (*CartMutationReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddItem not implemented")
}
func (UnimplementedCartActorServer) RemoveItem(context.Context, *RemoveItemRequest) (*CartMutationReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method RemoveItem not implemented")
}
func (UnimplementedCartActorServer) RemoveDelivery(context.Context, *RemoveDeliveryRequest) (*CartMutationReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method RemoveDelivery not implemented")
}
func (UnimplementedCartActorServer) ChangeQuantity(context.Context, *ChangeQuantityRequest) (*CartMutationReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method ChangeQuantity not implemented")
}
func (UnimplementedCartActorServer) SetDelivery(context.Context, *SetDeliveryRequest) (*CartMutationReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method SetDelivery not implemented")
}
func (UnimplementedCartActorServer) SetPickupPoint(context.Context, *SetPickupPointRequest) (*CartMutationReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method SetPickupPoint not implemented")
}
func (UnimplementedCartActorServer) SetCartItems(context.Context, *SetCartItemsRequest) (*CartMutationReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method SetCartItems not implemented")
}
func (UnimplementedCartActorServer) OrderCompleted(context.Context, *OrderCompletedRequest) (*CartMutationReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method OrderCompleted not implemented")
}
func (UnimplementedCartActorServer) GetState(context.Context, *StateRequest) (*StateReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetState not implemented")
}
func (UnimplementedCartActorServer) mustEmbedUnimplementedCartActorServer() {}
func (UnimplementedCartActorServer) testEmbeddedByValue() {}
// UnsafeCartActorServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to CartActorServer will
// result in compilation errors.
type UnsafeCartActorServer interface {
mustEmbedUnimplementedCartActorServer()
}
func RegisterCartActorServer(s grpc.ServiceRegistrar, srv CartActorServer) {
// If the following call pancis, it indicates UnimplementedCartActorServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&CartActor_ServiceDesc, srv)
}
func _CartActor_AddRequest_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AddRequestRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CartActorServer).AddRequest(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: CartActor_AddRequest_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CartActorServer).AddRequest(ctx, req.(*AddRequestRequest))
}
return interceptor(ctx, in, info, handler)
}
func _CartActor_AddItem_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AddItemRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CartActorServer).AddItem(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: CartActor_AddItem_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CartActorServer).AddItem(ctx, req.(*AddItemRequest))
}
return interceptor(ctx, in, info, handler)
}
func _CartActor_RemoveItem_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RemoveItemRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CartActorServer).RemoveItem(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: CartActor_RemoveItem_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CartActorServer).RemoveItem(ctx, req.(*RemoveItemRequest))
}
return interceptor(ctx, in, info, handler)
}
func _CartActor_RemoveDelivery_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RemoveDeliveryRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CartActorServer).RemoveDelivery(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: CartActor_RemoveDelivery_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CartActorServer).RemoveDelivery(ctx, req.(*RemoveDeliveryRequest))
}
return interceptor(ctx, in, info, handler)
}
func _CartActor_ChangeQuantity_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ChangeQuantityRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CartActorServer).ChangeQuantity(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: CartActor_ChangeQuantity_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CartActorServer).ChangeQuantity(ctx, req.(*ChangeQuantityRequest))
}
return interceptor(ctx, in, info, handler)
}
func _CartActor_SetDelivery_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SetDeliveryRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CartActorServer).SetDelivery(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: CartActor_SetDelivery_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CartActorServer).SetDelivery(ctx, req.(*SetDeliveryRequest))
}
return interceptor(ctx, in, info, handler)
}
func _CartActor_SetPickupPoint_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SetPickupPointRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CartActorServer).SetPickupPoint(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: CartActor_SetPickupPoint_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CartActorServer).SetPickupPoint(ctx, req.(*SetPickupPointRequest))
}
return interceptor(ctx, in, info, handler)
}
func _CartActor_SetCartItems_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SetCartItemsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CartActorServer).SetCartItems(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: CartActor_SetCartItems_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CartActorServer).SetCartItems(ctx, req.(*SetCartItemsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _CartActor_OrderCompleted_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(OrderCompletedRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CartActorServer).OrderCompleted(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: CartActor_OrderCompleted_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CartActorServer).OrderCompleted(ctx, req.(*OrderCompletedRequest))
}
return interceptor(ctx, in, info, handler)
}
func _CartActor_GetState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StateRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CartActorServer).GetState(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: CartActor_GetState_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CartActorServer).GetState(ctx, req.(*StateRequest))
}
return interceptor(ctx, in, info, handler)
}
// CartActor_ServiceDesc is the grpc.ServiceDesc for CartActor service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var CartActor_ServiceDesc = grpc.ServiceDesc{
ServiceName: "messages.CartActor",
HandlerType: (*CartActorServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "AddRequest",
Handler: _CartActor_AddRequest_Handler,
},
{
MethodName: "AddItem",
Handler: _CartActor_AddItem_Handler,
},
{
MethodName: "RemoveItem",
Handler: _CartActor_RemoveItem_Handler,
},
{
MethodName: "RemoveDelivery",
Handler: _CartActor_RemoveDelivery_Handler,
},
{
MethodName: "ChangeQuantity",
Handler: _CartActor_ChangeQuantity_Handler,
},
{
MethodName: "SetDelivery",
Handler: _CartActor_SetDelivery_Handler,
},
{
MethodName: "SetPickupPoint",
Handler: _CartActor_SetPickupPoint_Handler,
},
{
MethodName: "SetCartItems",
Handler: _CartActor_SetCartItems_Handler,
},
{
MethodName: "OrderCompleted",
Handler: _CartActor_OrderCompleted_Handler,
},
{
MethodName: "GetState",
Handler: _CartActor_GetState_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "cart_actor.proto",
}

View File

@@ -46,7 +46,6 @@ type SyncedPool struct {
type RemoteHostGRPC struct {
Host string
Conn *grpc.ClientConn
CartClient proto.CartActorClient
ControlClient proto.ControlPlaneClient
MissedPings int
}
@@ -152,7 +151,6 @@ func (p *SyncedPool) AddRemote(host string) {
return
}
cartClient := proto.NewCartActorClient(conn)
controlClient := proto.NewControlPlaneClient(conn)
// Health check (Ping) with limited retries
@@ -174,9 +172,9 @@ func (p *SyncedPool) AddRemote(host string) {
}
remote := &RemoteHostGRPC{
Host: host,
Conn: conn,
CartClient: cartClient,
Host: host,
Conn: conn,
ControlClient: controlClient,
MissedPings: 0,
}