package main import ( "bytes" "context" "fmt" "log" "time" proto "git.tornberg.me/go-cart-actor/proto" // generated package name is 'messages'; aliased as proto for consistency "google.golang.org/grpc" ) // RemoteGrainGRPC is the gRPC-backed implementation of a remote grain. // It mirrors the previous RemoteGrain (TCP/frame based) while using the // new CartActor gRPC service. It implements the Grain interface so that // SyncedPool can remain largely unchanged when swapping transport layers. type RemoteGrainGRPC struct { Id CartId Host string client proto.CartActorClient // Optional: keep the underlying conn so higher-level code can close if needed conn *grpc.ClientConn // Per-call timeout settings (tunable) mutateTimeout time.Duration stateTimeout time.Duration } // NewRemoteGrainGRPC constructs a remote grain adapter from an existing gRPC client. func NewRemoteGrainGRPC(id CartId, host string, client proto.CartActorClient) *RemoteGrainGRPC { return &RemoteGrainGRPC{ Id: id, Host: host, client: client, mutateTimeout: 800 * time.Millisecond, stateTimeout: 400 * time.Millisecond, } } // NewRemoteGrainGRPCWithConn dials the target and creates the gRPC client. // target should be host:port (where the CartActor service is exposed). func NewRemoteGrainGRPCWithConn(id CartId, host string, target string, dialOpts ...grpc.DialOption) (*RemoteGrainGRPC, error) { // NOTE: insecure for initial migration; should be replaced with TLS later. baseOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()} baseOpts = append(baseOpts, dialOpts...) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() conn, err := grpc.DialContext(ctx, target, baseOpts...) if err != nil { return nil, err } client := proto.NewCartActorClient(conn) return &RemoteGrainGRPC{ Id: id, Host: host, client: client, conn: conn, mutateTimeout: 800 * time.Millisecond, stateTimeout: 400 * time.Millisecond, }, nil } func (g *RemoteGrainGRPC) GetId() CartId { return g.Id } // HandleMessage serializes the underlying mutation proto (without legacy message header) // and invokes the CartActor.Mutate RPC. It wraps the reply into a FrameWithPayload // for compatibility with existing higher-level code paths. func (g *RemoteGrainGRPC) HandleMessage(message *Message, isReplay bool) (*FrameWithPayload, error) { if message == nil { return nil, fmt.Errorf("nil message") } if isReplay { // Remote replay not expected; ignore to keep parity with old implementation. return nil, fmt.Errorf("replay not supported for remote grains") } handler, err := GetMessageHandler(message.Type) if err != nil { return nil, err } // Ensure timestamp set (legacy behavior) if message.TimeStamp == nil { ts := time.Now().Unix() message.TimeStamp = &ts } // Marshal underlying proto payload only (no StorableMessageHeader) var buf bytes.Buffer err = handler.Write(message, &buf) if err != nil { return nil, fmt.Errorf("encode mutation payload: %w", err) } req := &proto.MutationRequest{ CartId: g.Id.String(), Type: proto.MutationType(message.Type), // numeric mapping preserved Payload: buf.Bytes(), ClientTimestamp: *message.TimeStamp, } ctx, cancel := context.WithTimeout(context.Background(), g.mutateTimeout) defer cancel() resp, err := g.client.Mutate(ctx, req) if err != nil { return nil, err } frame := MakeFrameWithPayload(RemoteHandleMutationReply, StatusCode(resp.StatusCode), resp.Payload) return &frame, nil } // GetCurrentState calls CartActor.GetState and returns a FrameWithPayload // shaped like the legacy RemoteGetStateReply. func (g *RemoteGrainGRPC) GetCurrentState() (*FrameWithPayload, error) { ctx, cancel := context.WithTimeout(context.Background(), g.stateTimeout) defer cancel() resp, err := g.client.GetState(ctx, &proto.StateRequest{ CartId: g.Id.String(), }) if err != nil { return nil, err } frame := MakeFrameWithPayload(RemoteGetStateReply, StatusCode(resp.StatusCode), resp.Payload) return &frame, nil } // Close closes the underlying gRPC connection if this adapter created it. func (g *RemoteGrainGRPC) Close() error { if g.conn != nil { return g.conn.Close() } return nil } // Debug helper to log operations (optional). func (g *RemoteGrainGRPC) logf(format string, args ...interface{}) { log.Printf("[remote-grain-grpc host=%s id=%s] %s", g.Host, g.Id.String(), fmt.Sprintf(format, args...)) }