Files
go-cart-actor/grpc_server.go
2025-10-10 06:45:23 +00:00

380 lines
10 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"log"
"net"
"time"
proto "git.tornberg.me/go-cart-actor/proto" // underlying generated package name is 'messages'
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// -----------------------------------------------------------------------------
// Metrics
// -----------------------------------------------------------------------------
var (
grpcMutateDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "cart_grpc_mutate_duration_seconds",
Help: "Duration of CartActor.Mutate RPCs",
Buckets: prometheus.DefBuckets,
})
grpcMutateErrors = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grpc_mutate_errors_total",
Help: "Total number of failed CartActor.Mutate RPCs",
})
grpcStateDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "cart_grpc_get_state_duration_seconds",
Help: "Duration of CartActor.GetState RPCs",
Buckets: prometheus.DefBuckets,
})
grpcControlDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "cart_grpc_control_duration_seconds",
Help: "Duration of ControlPlane RPCs",
Buckets: prometheus.DefBuckets,
})
grpcControlErrors = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grpc_control_errors_total",
Help: "Total number of failed ControlPlane RPCs",
})
)
// timeTrack wraps a closure and records duration into the supplied histogram.
func timeTrack(hist prometheus.Observer, fn func() error) (err error) {
start := time.Now()
defer func() {
hist.Observe(time.Since(start).Seconds())
}()
return fn()
}
// -----------------------------------------------------------------------------
// CartActor Service Implementation
// -----------------------------------------------------------------------------
type cartActorService struct {
proto.UnimplementedCartActorServer
pool GrainPool
}
func newCartActorService(pool GrainPool) *cartActorService {
return &cartActorService{pool: pool}
}
func (s *cartActorService) Mutate(ctx context.Context, req *proto.MutationRequest) (*proto.MutationReply, error) {
var reply *proto.MutationReply
err := timeTrack(grpcMutateDuration, func() error {
if req == nil {
return status.Error(codes.InvalidArgument, "request is nil")
}
if req.CartId == "" {
return status.Error(codes.InvalidArgument, "cart_id is empty")
}
mt := uint16(req.Type.Number())
handler, ok := Handlers[mt]
if !ok {
return status.Errorf(codes.InvalidArgument, "unknown mutation type %d", mt)
}
content, err := handler.Read(req.Payload)
if err != nil {
return status.Errorf(codes.InvalidArgument, "decode payload: %v", err)
}
ts := req.ClientTimestamp
if ts == 0 {
ts = time.Now().Unix()
}
msg := Message{
Type: mt,
TimeStamp: &ts,
Content: content,
}
frame, err := s.pool.Process(ToCartId(req.CartId), msg)
if err != nil {
return err
}
reply = &proto.MutationReply{
StatusCode: int32(frame.StatusCode),
Payload: frame.Payload,
}
return nil
})
if err != nil {
grpcMutateErrors.Inc()
return nil, err
}
return reply, nil
}
func (s *cartActorService) GetState(ctx context.Context, req *proto.StateRequest) (*proto.StateReply, error) {
var reply *proto.StateReply
err := timeTrack(grpcStateDuration, func() error {
if req == nil || req.CartId == "" {
return status.Error(codes.InvalidArgument, "cart_id is empty")
}
frame, err := s.pool.Get(ToCartId(req.CartId))
if err != nil {
return err
}
reply = &proto.StateReply{
StatusCode: int32(frame.StatusCode),
Payload: frame.Payload,
}
return nil
})
if err != nil {
return nil, err
}
return reply, nil
}
// -----------------------------------------------------------------------------
// ControlPlane Service Implementation
// -----------------------------------------------------------------------------
// controlPlaneService directly leverages SyncedPool internals (same package).
// NOTE: This is a transitional adapter; once the legacy frame-based code is
// removed, related fields/methods in SyncedPool can be slimmed.
type controlPlaneService struct {
proto.UnimplementedControlPlaneServer
pool *SyncedPool
}
func newControlPlaneService(pool *SyncedPool) *controlPlaneService {
return &controlPlaneService{pool: pool}
}
func (s *controlPlaneService) Ping(ctx context.Context, _ *proto.Empty) (*proto.PingReply, error) {
var reply *proto.PingReply
err := timeTrack(grpcControlDuration, func() error {
reply = &proto.PingReply{
Host: s.pool.Hostname,
UnixTime: time.Now().Unix(),
}
return nil
})
if err != nil {
grpcControlErrors.Inc()
return nil, err
}
return reply, nil
}
func (s *controlPlaneService) Negotiate(ctx context.Context, req *proto.NegotiateRequest) (*proto.NegotiateReply, error) {
var reply *proto.NegotiateReply
err := timeTrack(grpcControlDuration, func() error {
if req == nil {
return status.Error(codes.InvalidArgument, "request is nil")
}
// Add unknown hosts
for _, host := range req.KnownHosts {
if host == "" || host == s.pool.Hostname {
continue
}
if !s.pool.IsKnown(host) {
go s.pool.AddRemote(host)
}
}
// Build healthy host list
hosts := make([]string, 0)
for _, r := range s.pool.GetHealthyRemotes() {
hosts = append(hosts, r.Host)
}
hosts = append(hosts, s.pool.Hostname)
reply = &proto.NegotiateReply{
Hosts: hosts,
}
return nil
})
if err != nil {
grpcControlErrors.Inc()
return nil, err
}
return reply, nil
}
func (s *controlPlaneService) GetCartIds(ctx context.Context, _ *proto.Empty) (*proto.CartIdsReply, error) {
var reply *proto.CartIdsReply
err := timeTrack(grpcControlDuration, func() error {
s.pool.mu.RLock()
defer s.pool.mu.RUnlock()
ids := make([]string, 0, len(s.pool.local.grains))
for id, g := range s.pool.local.grains {
if g == nil {
continue
}
if id.String() == "" {
continue
}
ids = append(ids, id.String())
}
reply = &proto.CartIdsReply{
CartIds: ids,
}
return nil
})
if err != nil {
grpcControlErrors.Inc()
return nil, err
}
return reply, nil
}
func (s *controlPlaneService) ConfirmOwner(ctx context.Context, req *proto.OwnerChangeRequest) (*proto.OwnerChangeAck, error) {
var reply *proto.OwnerChangeAck
err := timeTrack(grpcControlDuration, func() error {
if req == nil || req.CartId == "" || req.NewHost == "" {
return status.Error(codes.InvalidArgument, "cart_id or new_host missing")
}
id := ToCartId(req.CartId)
newHost := req.NewHost
// Mirror GrainOwnerChangeHandler semantics
log.Printf("gRPC ConfirmOwner: cart %s newHost=%s", id, newHost)
for _, r := range s.pool.remoteHosts {
if r.Host == newHost && r.IsHealthy() {
go s.pool.SpawnRemoteGrain(id, newHost)
break
}
}
go s.pool.AddRemote(newHost)
reply = &proto.OwnerChangeAck{
Accepted: true,
Message: "ok",
}
return nil
})
if err != nil {
grpcControlErrors.Inc()
return nil, err
}
return reply, nil
}
func (s *controlPlaneService) Closing(ctx context.Context, notice *proto.ClosingNotice) (*proto.OwnerChangeAck, error) {
var reply *proto.OwnerChangeAck
err := timeTrack(grpcControlDuration, func() error {
if notice == nil || notice.Host == "" {
return status.Error(codes.InvalidArgument, "host missing")
}
host := notice.Host
s.pool.mu.RLock()
_, exists := s.pool.remoteHosts[host]
s.pool.mu.RUnlock()
if exists {
go s.pool.RemoveHost(host)
}
reply = &proto.OwnerChangeAck{
Accepted: true,
Message: "removed",
}
return nil
})
if err != nil {
grpcControlErrors.Inc()
return nil, err
}
return reply, nil
}
// -----------------------------------------------------------------------------
// Server Bootstrap
// -----------------------------------------------------------------------------
type GRPCServer struct {
server *grpc.Server
lis net.Listener
addr string
}
// StartGRPCServer sets up a gRPC server hosting both CartActor and ControlPlane services.
// addr example: ":1337" (for combined) OR run two servers if you want separate ports.
// For the migration we can host both on the same listener to reduce open ports.
func StartGRPCServer(addr string, pool GrainPool, synced *SyncedPool, opts ...grpc.ServerOption) (*GRPCServer, error) {
if pool == nil {
return nil, errors.New("nil grain pool")
}
if synced == nil {
return nil, errors.New("nil synced pool")
}
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("listen %s: %w", addr, err)
}
grpcServer := grpc.NewServer(opts...)
proto.RegisterCartActorServer(grpcServer, newCartActorService(pool))
proto.RegisterControlPlaneServer(grpcServer, newControlPlaneService(synced))
go func() {
log.Printf("gRPC server listening on %s", addr)
if serveErr := grpcServer.Serve(lis); serveErr != nil {
log.Printf("gRPC server stopped: %v", serveErr)
}
}()
return &GRPCServer{
server: grpcServer,
lis: lis,
addr: addr,
}, nil
}
// GracefulStop stops the server gracefully.
func (s *GRPCServer) GracefulStop() {
if s == nil || s.server == nil {
return
}
s.server.GracefulStop()
}
// Addr returns the bound address.
func (s *GRPCServer) Addr() string {
if s == nil {
return ""
}
return s.addr
}
// -----------------------------------------------------------------------------
// Client Dial Helpers (used later by refactored remote grain + control plane)
// -----------------------------------------------------------------------------
// DialRemote establishes (or reuses externally) a gRPC client connection.
func DialRemote(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
dialOpts := []grpc.DialOption{
grpc.WithInsecure(), // NOTE: Intentional for initial migration; replace with TLS / mTLS later.
grpc.WithBlock(),
}
dialOpts = append(dialOpts, opts...)
ctxDial, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctxDial, target, dialOpts...)
if err != nil {
return nil, err
}
return conn, nil
}
// -----------------------------------------------------------------------------
// Utility for converting internal errors to gRPC status (if needed later).
// -----------------------------------------------------------------------------
func grpcError(err error) error {
if err == nil {
return nil
}
// Extend mapping if we add richer error types.
return status.Error(codes.Internal, err.Error())
}