package main import ( "context" "fmt" "log" "time" proto "git.tornberg.me/go-cart-actor/proto" // generated package name is 'messages'; aliased as proto for consistency "google.golang.org/grpc" ) // RemoteGrainGRPC is the gRPC-backed implementation of a remote grain. // It mirrors the previous RemoteGrain (TCP/frame based) while using the // new CartActor gRPC service. It implements the Grain interface so that // SyncedPool can remain largely unchanged when swapping transport layers. type RemoteGrainGRPC struct { Id CartId Host string client proto.CartActorClient // Optional: keep the underlying conn so higher-level code can close if needed conn *grpc.ClientConn // Per-call timeout settings (tunable) mutateTimeout time.Duration stateTimeout time.Duration } // NewRemoteGrainGRPC constructs a remote grain adapter from an existing gRPC client. func NewRemoteGrainGRPC(id CartId, host string, client proto.CartActorClient) *RemoteGrainGRPC { return &RemoteGrainGRPC{ Id: id, Host: host, client: client, mutateTimeout: 800 * time.Millisecond, stateTimeout: 400 * time.Millisecond, } } // NewRemoteGrainGRPCWithConn dials the target and creates the gRPC client. // target should be host:port (where the CartActor service is exposed). func NewRemoteGrainGRPCWithConn(id CartId, host string, target string, dialOpts ...grpc.DialOption) (*RemoteGrainGRPC, error) { // NOTE: insecure for initial migration; should be replaced with TLS later. baseOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()} baseOpts = append(baseOpts, dialOpts...) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() conn, err := grpc.DialContext(ctx, target, baseOpts...) if err != nil { return nil, err } client := proto.NewCartActorClient(conn) return &RemoteGrainGRPC{ Id: id, Host: host, client: client, conn: conn, mutateTimeout: 800 * time.Millisecond, stateTimeout: 400 * time.Millisecond, }, nil } func (g *RemoteGrainGRPC) GetId() CartId { return g.Id } // Apply executes a cart mutation via per-mutation RPCs (breaking v2 API) // and returns a *CartGrain reconstructed from the CartMutationReply state. func (g *RemoteGrainGRPC) Apply(content interface{}, isReplay bool) (*CartGrain, error) { if isReplay { return nil, fmt.Errorf("replay not supported for remote grains") } if content == nil { return nil, fmt.Errorf("nil mutation content") } ts := time.Now().Unix() var invoke func(ctx context.Context) (*proto.CartMutationReply, error) switch m := content.(type) { case *proto.AddRequest: invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { return g.client.AddRequest(ctx, &proto.AddRequestRequest{ CartId: g.Id.String(), ClientTimestamp: ts, Payload: m, }) } case *proto.AddItem: invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { return g.client.AddItem(ctx, &proto.AddItemRequest{ CartId: g.Id.String(), ClientTimestamp: ts, Payload: m, }) } case *proto.RemoveItem: invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { return g.client.RemoveItem(ctx, &proto.RemoveItemRequest{ CartId: g.Id.String(), ClientTimestamp: ts, Payload: m, }) } case *proto.RemoveDelivery: invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { return g.client.RemoveDelivery(ctx, &proto.RemoveDeliveryRequest{ CartId: g.Id.String(), ClientTimestamp: ts, Payload: m, }) } case *proto.ChangeQuantity: invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { return g.client.ChangeQuantity(ctx, &proto.ChangeQuantityRequest{ CartId: g.Id.String(), ClientTimestamp: ts, Payload: m, }) } case *proto.SetDelivery: invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { return g.client.SetDelivery(ctx, &proto.SetDeliveryRequest{ CartId: g.Id.String(), ClientTimestamp: ts, Payload: m, }) } case *proto.SetPickupPoint: invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { return g.client.SetPickupPoint(ctx, &proto.SetPickupPointRequest{ CartId: g.Id.String(), ClientTimestamp: ts, Payload: m, }) } case *proto.CreateCheckoutOrder: return nil, fmt.Errorf("CreateCheckoutOrder deprecated: checkout is handled via HTTP endpoint (HandleCheckout)") case *proto.SetCartRequest: invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { return g.client.SetCartItems(ctx, &proto.SetCartItemsRequest{ CartId: g.Id.String(), ClientTimestamp: ts, Payload: m, }) } case *proto.OrderCreated: invoke = func(ctx context.Context) (*proto.CartMutationReply, error) { return g.client.OrderCompleted(ctx, &proto.OrderCompletedRequest{ CartId: g.Id.String(), ClientTimestamp: ts, Payload: m, }) } default: return nil, fmt.Errorf("unsupported mutation type %T", content) } if invoke == nil { return nil, fmt.Errorf("no invocation mapped for mutation %T", content) } ctx, cancel := context.WithTimeout(context.Background(), g.mutateTimeout) defer cancel() resp, err := invoke(ctx) if err != nil { return nil, err } if resp.StatusCode < 200 || resp.StatusCode >= 300 { if e := resp.GetError(); e != "" { return nil, fmt.Errorf("remote mutation failed %d: %s", resp.StatusCode, e) } return nil, fmt.Errorf("remote mutation failed %d", resp.StatusCode) } state := resp.GetState() if state == nil { return nil, fmt.Errorf("mutation reply missing state on success") } // Reconstruct a lightweight CartGrain (only fields we expose internally) grain := &CartGrain{ Id: ToCartId(state.CartId), TotalPrice: state.TotalPrice, TotalTax: state.TotalTax, TotalDiscount: state.TotalDiscount, PaymentInProgress: state.PaymentInProgress, OrderReference: state.OrderReference, PaymentStatus: state.PaymentStatus, } // Items for _, it := range state.Items { if it == nil { continue } outlet := toPtr(it.Outlet) storeId := toPtr(it.StoreId) grain.Items = append(grain.Items, &CartItem{ Id: int(it.Id), ItemId: int(it.SourceItemId), Sku: it.Sku, Name: it.Name, Price: it.UnitPrice, Quantity: int(it.Quantity), TotalPrice: it.TotalPrice, TotalTax: it.TotalTax, OrgPrice: it.OrgPrice, TaxRate: int(it.TaxRate), Brand: it.Brand, Category: it.Category, Category2: it.Category2, Category3: it.Category3, Category4: it.Category4, Category5: it.Category5, Image: it.Image, ArticleType: it.ArticleType, SellerId: it.SellerId, SellerName: it.SellerName, Disclaimer: it.Disclaimer, Outlet: outlet, StoreId: storeId, Stock: StockStatus(it.Stock), }) } // Deliveries for _, d := range state.Deliveries { if d == nil { continue } intIds := make([]int, 0, len(d.ItemIds)) for _, id := range d.ItemIds { intIds = append(intIds, int(id)) } grain.Deliveries = append(grain.Deliveries, &CartDelivery{ Id: int(d.Id), Provider: d.Provider, Price: d.Price, Items: intIds, PickupPoint: d.PickupPoint, }) } return grain, nil } // GetCurrentState retrieves the current cart state using the typed StateReply oneof. func (g *RemoteGrainGRPC) GetCurrentState() (*CartGrain, error) { ctx, cancel := context.WithTimeout(context.Background(), g.stateTimeout) defer cancel() resp, err := g.client.GetState(ctx, &proto.StateRequest{CartId: g.Id.String()}) if err != nil { return nil, err } if resp.StatusCode < 200 || resp.StatusCode >= 300 { if e := resp.GetError(); e != "" { return nil, fmt.Errorf("remote get state failed %d: %s", resp.StatusCode, e) } return nil, fmt.Errorf("remote get state failed %d", resp.StatusCode) } state := resp.GetState() if state == nil { return nil, fmt.Errorf("state reply missing state on success") } grain := &CartGrain{ Id: ToCartId(state.CartId), TotalPrice: state.TotalPrice, TotalTax: state.TotalTax, TotalDiscount: state.TotalDiscount, PaymentInProgress: state.PaymentInProgress, OrderReference: state.OrderReference, PaymentStatus: state.PaymentStatus, } for _, it := range state.Items { if it == nil { continue } outlet := toPtr(it.Outlet) storeId := toPtr(it.StoreId) grain.Items = append(grain.Items, &CartItem{ Id: int(it.Id), ItemId: int(it.SourceItemId), Sku: it.Sku, Name: it.Name, Price: it.UnitPrice, Quantity: int(it.Quantity), TotalPrice: it.TotalPrice, TotalTax: it.TotalTax, OrgPrice: it.OrgPrice, TaxRate: int(it.TaxRate), Brand: it.Brand, Category: it.Category, Category2: it.Category2, Category3: it.Category3, Category4: it.Category4, Category5: it.Category5, Image: it.Image, ArticleType: it.ArticleType, SellerId: it.SellerId, SellerName: it.SellerName, Disclaimer: it.Disclaimer, Outlet: outlet, StoreId: storeId, Stock: StockStatus(it.Stock), }) } for _, d := range state.Deliveries { if d == nil { continue } intIds := make([]int, 0, len(d.ItemIds)) for _, id := range d.ItemIds { intIds = append(intIds, int(id)) } grain.Deliveries = append(grain.Deliveries, &CartDelivery{ Id: int(d.Id), Provider: d.Provider, Price: d.Price, Items: intIds, PickupPoint: d.PickupPoint, }) } return grain, nil } // Close closes the underlying gRPC connection if this adapter created it. func (g *RemoteGrainGRPC) Close() error { if g.conn != nil { return g.conn.Close() } return nil } // Debug helper to log operations (optional). func (g *RemoteGrainGRPC) logf(format string, args ...interface{}) { log.Printf("[remote-grain-grpc host=%s id=%s] %s", g.Host, g.Id.String(), fmt.Sprintf(format, args...)) }