342 lines
10 KiB
Go
342 lines
10 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
proto "git.tornberg.me/go-cart-actor/proto" // generated package name is 'messages'; aliased as proto for consistency
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// RemoteGrainGRPC is the gRPC-backed implementation of a remote grain.
|
|
// It mirrors the previous RemoteGrain (TCP/frame based) while using the
|
|
// new CartActor gRPC service. It implements the Grain interface so that
|
|
// SyncedPool can remain largely unchanged when swapping transport layers.
|
|
type RemoteGrainGRPC struct {
|
|
Id CartId
|
|
Host string
|
|
client proto.CartActorClient
|
|
// Optional: keep the underlying conn so higher-level code can close if needed
|
|
conn *grpc.ClientConn
|
|
|
|
// Per-call timeout settings (tunable)
|
|
mutateTimeout time.Duration
|
|
stateTimeout time.Duration
|
|
}
|
|
|
|
// NewRemoteGrainGRPC constructs a remote grain adapter from an existing gRPC client.
|
|
func NewRemoteGrainGRPC(id CartId, host string, client proto.CartActorClient) *RemoteGrainGRPC {
|
|
return &RemoteGrainGRPC{
|
|
Id: id,
|
|
Host: host,
|
|
client: client,
|
|
mutateTimeout: 800 * time.Millisecond,
|
|
stateTimeout: 400 * time.Millisecond,
|
|
}
|
|
}
|
|
|
|
// NewRemoteGrainGRPCWithConn dials the target and creates the gRPC client.
|
|
// target should be host:port (where the CartActor service is exposed).
|
|
func NewRemoteGrainGRPCWithConn(id CartId, host string, target string, dialOpts ...grpc.DialOption) (*RemoteGrainGRPC, error) {
|
|
// NOTE: insecure for initial migration; should be replaced with TLS later.
|
|
baseOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()}
|
|
baseOpts = append(baseOpts, dialOpts...)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
conn, err := grpc.DialContext(ctx, target, baseOpts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
client := proto.NewCartActorClient(conn)
|
|
return &RemoteGrainGRPC{
|
|
Id: id,
|
|
Host: host,
|
|
client: client,
|
|
conn: conn,
|
|
mutateTimeout: 800 * time.Millisecond,
|
|
stateTimeout: 400 * time.Millisecond,
|
|
}, nil
|
|
}
|
|
|
|
func (g *RemoteGrainGRPC) GetId() CartId {
|
|
return g.Id
|
|
}
|
|
|
|
// Apply executes a cart mutation via per-mutation RPCs (breaking v2 API)
|
|
// and returns a *CartGrain reconstructed from the CartMutationReply state.
|
|
func (g *RemoteGrainGRPC) Apply(content interface{}, isReplay bool) (*CartGrain, error) {
|
|
if isReplay {
|
|
return nil, fmt.Errorf("replay not supported for remote grains")
|
|
}
|
|
if content == nil {
|
|
return nil, fmt.Errorf("nil mutation content")
|
|
}
|
|
|
|
ts := time.Now().Unix()
|
|
|
|
var invoke func(ctx context.Context) (*proto.CartMutationReply, error)
|
|
|
|
switch m := content.(type) {
|
|
case *proto.AddRequest:
|
|
invoke = func(ctx context.Context) (*proto.CartMutationReply, error) {
|
|
return g.client.AddRequest(ctx, &proto.AddRequestRequest{
|
|
CartId: g.Id.String(),
|
|
ClientTimestamp: ts,
|
|
Payload: m,
|
|
})
|
|
}
|
|
case *proto.AddItem:
|
|
invoke = func(ctx context.Context) (*proto.CartMutationReply, error) {
|
|
return g.client.AddItem(ctx, &proto.AddItemRequest{
|
|
CartId: g.Id.String(),
|
|
ClientTimestamp: ts,
|
|
Payload: m,
|
|
})
|
|
}
|
|
case *proto.RemoveItem:
|
|
invoke = func(ctx context.Context) (*proto.CartMutationReply, error) {
|
|
return g.client.RemoveItem(ctx, &proto.RemoveItemRequest{
|
|
CartId: g.Id.String(),
|
|
ClientTimestamp: ts,
|
|
Payload: m,
|
|
})
|
|
}
|
|
case *proto.RemoveDelivery:
|
|
invoke = func(ctx context.Context) (*proto.CartMutationReply, error) {
|
|
return g.client.RemoveDelivery(ctx, &proto.RemoveDeliveryRequest{
|
|
CartId: g.Id.String(),
|
|
ClientTimestamp: ts,
|
|
Payload: m,
|
|
})
|
|
}
|
|
case *proto.ChangeQuantity:
|
|
invoke = func(ctx context.Context) (*proto.CartMutationReply, error) {
|
|
return g.client.ChangeQuantity(ctx, &proto.ChangeQuantityRequest{
|
|
CartId: g.Id.String(),
|
|
ClientTimestamp: ts,
|
|
Payload: m,
|
|
})
|
|
}
|
|
case *proto.SetDelivery:
|
|
invoke = func(ctx context.Context) (*proto.CartMutationReply, error) {
|
|
return g.client.SetDelivery(ctx, &proto.SetDeliveryRequest{
|
|
CartId: g.Id.String(),
|
|
ClientTimestamp: ts,
|
|
Payload: m,
|
|
})
|
|
}
|
|
case *proto.SetPickupPoint:
|
|
invoke = func(ctx context.Context) (*proto.CartMutationReply, error) {
|
|
return g.client.SetPickupPoint(ctx, &proto.SetPickupPointRequest{
|
|
CartId: g.Id.String(),
|
|
ClientTimestamp: ts,
|
|
Payload: m,
|
|
})
|
|
}
|
|
case *proto.CreateCheckoutOrder:
|
|
return nil, fmt.Errorf("CreateCheckoutOrder deprecated: checkout is handled via HTTP endpoint (HandleCheckout)")
|
|
case *proto.SetCartRequest:
|
|
invoke = func(ctx context.Context) (*proto.CartMutationReply, error) {
|
|
return g.client.SetCartItems(ctx, &proto.SetCartItemsRequest{
|
|
CartId: g.Id.String(),
|
|
ClientTimestamp: ts,
|
|
Payload: m,
|
|
})
|
|
}
|
|
case *proto.OrderCreated:
|
|
invoke = func(ctx context.Context) (*proto.CartMutationReply, error) {
|
|
return g.client.OrderCompleted(ctx, &proto.OrderCompletedRequest{
|
|
CartId: g.Id.String(),
|
|
ClientTimestamp: ts,
|
|
Payload: m,
|
|
})
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("unsupported mutation type %T", content)
|
|
}
|
|
|
|
if invoke == nil {
|
|
return nil, fmt.Errorf("no invocation mapped for mutation %T", content)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), g.mutateTimeout)
|
|
defer cancel()
|
|
|
|
resp, err := invoke(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
if e := resp.GetError(); e != "" {
|
|
return nil, fmt.Errorf("remote mutation failed %d: %s", resp.StatusCode, e)
|
|
}
|
|
return nil, fmt.Errorf("remote mutation failed %d", resp.StatusCode)
|
|
}
|
|
state := resp.GetState()
|
|
if state == nil {
|
|
return nil, fmt.Errorf("mutation reply missing state on success")
|
|
}
|
|
// Reconstruct a lightweight CartGrain (only fields we expose internally)
|
|
grain := &CartGrain{
|
|
Id: ToCartId(state.Id),
|
|
TotalPrice: state.TotalPrice,
|
|
TotalTax: state.TotalTax,
|
|
TotalDiscount: state.TotalDiscount,
|
|
PaymentInProgress: state.PaymentInProgress,
|
|
OrderReference: state.OrderReference,
|
|
PaymentStatus: state.PaymentStatus,
|
|
}
|
|
// Items
|
|
for _, it := range state.Items {
|
|
if it == nil {
|
|
continue
|
|
}
|
|
outlet := toPtr(it.Outlet)
|
|
storeId := toPtr(it.StoreId)
|
|
grain.Items = append(grain.Items, &CartItem{
|
|
Id: int(it.Id),
|
|
ItemId: int(it.ItemId),
|
|
Sku: it.Sku,
|
|
Name: it.Name,
|
|
Price: it.Price,
|
|
Quantity: int(it.Qty),
|
|
TotalPrice: it.TotalPrice,
|
|
TotalTax: it.TotalTax,
|
|
OrgPrice: it.OrgPrice,
|
|
TaxRate: int(it.TaxRate),
|
|
Brand: it.Brand,
|
|
Category: it.Category,
|
|
Category2: it.Category2,
|
|
Category3: it.Category3,
|
|
Category4: it.Category4,
|
|
Category5: it.Category5,
|
|
Image: it.Image,
|
|
ArticleType: it.Type,
|
|
SellerId: it.SellerId,
|
|
SellerName: it.SellerName,
|
|
Disclaimer: it.Disclaimer,
|
|
Outlet: outlet,
|
|
StoreId: storeId,
|
|
Stock: StockStatus(it.Stock),
|
|
})
|
|
}
|
|
// Deliveries
|
|
for _, d := range state.Deliveries {
|
|
if d == nil {
|
|
continue
|
|
}
|
|
intIds := make([]int, 0, len(d.Items))
|
|
for _, id := range d.Items {
|
|
intIds = append(intIds, int(id))
|
|
}
|
|
grain.Deliveries = append(grain.Deliveries, &CartDelivery{
|
|
Id: int(d.Id),
|
|
Provider: d.Provider,
|
|
Price: d.Price,
|
|
Items: intIds,
|
|
PickupPoint: d.PickupPoint,
|
|
})
|
|
}
|
|
|
|
return grain, nil
|
|
}
|
|
|
|
// GetCurrentState retrieves the current cart state using the typed StateReply oneof.
|
|
func (g *RemoteGrainGRPC) GetCurrentState() (*CartGrain, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), g.stateTimeout)
|
|
defer cancel()
|
|
resp, err := g.client.GetState(ctx, &proto.StateRequest{CartId: g.Id.String()})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
if e := resp.GetError(); e != "" {
|
|
return nil, fmt.Errorf("remote get state failed %d: %s", resp.StatusCode, e)
|
|
}
|
|
return nil, fmt.Errorf("remote get state failed %d", resp.StatusCode)
|
|
}
|
|
state := resp.GetState()
|
|
if state == nil {
|
|
return nil, fmt.Errorf("state reply missing state on success")
|
|
}
|
|
grain := &CartGrain{
|
|
Id: ToCartId(state.Id),
|
|
TotalPrice: state.TotalPrice,
|
|
TotalTax: state.TotalTax,
|
|
TotalDiscount: state.TotalDiscount,
|
|
PaymentInProgress: state.PaymentInProgress,
|
|
OrderReference: state.OrderReference,
|
|
PaymentStatus: state.PaymentStatus,
|
|
}
|
|
for _, it := range state.Items {
|
|
if it == nil {
|
|
continue
|
|
}
|
|
outlet := toPtr(it.Outlet)
|
|
storeId := toPtr(it.StoreId)
|
|
grain.Items = append(grain.Items, &CartItem{
|
|
Id: int(it.Id),
|
|
ItemId: int(it.ItemId),
|
|
Sku: it.Sku,
|
|
Name: it.Name,
|
|
Price: it.Price,
|
|
Quantity: int(it.Qty),
|
|
TotalPrice: it.TotalPrice,
|
|
TotalTax: it.TotalTax,
|
|
OrgPrice: it.OrgPrice,
|
|
TaxRate: int(it.TaxRate),
|
|
Brand: it.Brand,
|
|
Category: it.Category,
|
|
Category2: it.Category2,
|
|
Category3: it.Category3,
|
|
Category4: it.Category4,
|
|
Category5: it.Category5,
|
|
Image: it.Image,
|
|
ArticleType: it.Type,
|
|
SellerId: it.SellerId,
|
|
SellerName: it.SellerName,
|
|
Disclaimer: it.Disclaimer,
|
|
Outlet: outlet,
|
|
StoreId: storeId,
|
|
Stock: StockStatus(it.Stock),
|
|
})
|
|
}
|
|
for _, d := range state.Deliveries {
|
|
if d == nil {
|
|
continue
|
|
}
|
|
intIds := make([]int, 0, len(d.Items))
|
|
for _, id := range d.Items {
|
|
intIds = append(intIds, int(id))
|
|
}
|
|
grain.Deliveries = append(grain.Deliveries, &CartDelivery{
|
|
Id: int(d.Id),
|
|
Provider: d.Provider,
|
|
Price: d.Price,
|
|
Items: intIds,
|
|
PickupPoint: d.PickupPoint,
|
|
})
|
|
}
|
|
|
|
return grain, nil
|
|
}
|
|
|
|
// Close closes the underlying gRPC connection if this adapter created it.
|
|
func (g *RemoteGrainGRPC) Close() error {
|
|
if g.conn != nil {
|
|
return g.conn.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Debug helper to log operations (optional).
|
|
func (g *RemoteGrainGRPC) logf(format string, args ...interface{}) {
|
|
log.Printf("[remote-grain-grpc host=%s id=%s] %s", g.Host, g.Id.String(), fmt.Sprintf(format, args...))
|
|
}
|