more changes
This commit is contained in:
454
grpc_server.go
454
grpc_server.go
@@ -2,378 +2,138 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
proto "git.tornberg.me/go-cart-actor/proto" // underlying generated package name is 'messages'
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
messages "git.tornberg.me/go-cart-actor/proto"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/grpc/reflection"
|
||||
)
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Metrics
|
||||
// -----------------------------------------------------------------------------
|
||||
// cartActorGRPCServer implements the CartActor and ControlPlane gRPC services.
|
||||
// It delegates cart operations to a grain pool and cluster operations to a synced pool.
|
||||
type cartActorGRPCServer struct {
|
||||
messages.UnimplementedCartActorServer
|
||||
messages.UnimplementedControlPlaneServer
|
||||
|
||||
var (
|
||||
grpcMutateDuration = promauto.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: "cart_grpc_mutate_duration_seconds",
|
||||
Help: "Duration of CartActor.Mutate RPCs",
|
||||
Buckets: prometheus.DefBuckets,
|
||||
})
|
||||
grpcMutateErrors = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "cart_grpc_mutate_errors_total",
|
||||
Help: "Total number of failed CartActor.Mutate RPCs",
|
||||
})
|
||||
grpcStateDuration = promauto.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: "cart_grpc_get_state_duration_seconds",
|
||||
Help: "Duration of CartActor.GetState RPCs",
|
||||
Buckets: prometheus.DefBuckets,
|
||||
})
|
||||
grpcControlDuration = promauto.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: "cart_grpc_control_duration_seconds",
|
||||
Help: "Duration of ControlPlane RPCs",
|
||||
Buckets: prometheus.DefBuckets,
|
||||
})
|
||||
grpcControlErrors = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "cart_grpc_control_errors_total",
|
||||
Help: "Total number of failed ControlPlane RPCs",
|
||||
})
|
||||
)
|
||||
|
||||
// timeTrack wraps a closure and records duration into the supplied histogram.
|
||||
func timeTrack(hist prometheus.Observer, fn func() error) (err error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
hist.Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
return fn()
|
||||
pool GrainPool // For cart state mutations and queries
|
||||
syncedPool *SyncedPool // For cluster membership and control
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// CartActor Service Implementation
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
type cartActorService struct {
|
||||
proto.UnimplementedCartActorServer
|
||||
pool GrainPool
|
||||
// NewCartActorGRPCServer creates and initializes the server.
|
||||
func NewCartActorGRPCServer(pool GrainPool, syncedPool *SyncedPool) *cartActorGRPCServer {
|
||||
return &cartActorGRPCServer{
|
||||
pool: pool,
|
||||
syncedPool: syncedPool,
|
||||
}
|
||||
}
|
||||
|
||||
func newCartActorService(pool GrainPool) *cartActorService {
|
||||
return &cartActorService{pool: pool}
|
||||
}
|
||||
// Mutate applies a mutation from an envelope to the corresponding cart grain.
|
||||
func (s *cartActorGRPCServer) Mutate(ctx context.Context, envelope *messages.MutationEnvelope) (*messages.MutationReply, error) {
|
||||
if envelope.GetCartId() == "" {
|
||||
return &messages.MutationReply{
|
||||
StatusCode: 400,
|
||||
Result: &messages.MutationReply_Error{Error: "cart_id is required"},
|
||||
}, nil
|
||||
}
|
||||
cartID := ToCartId(envelope.GetCartId())
|
||||
|
||||
func (s *cartActorService) Mutate(ctx context.Context, req *proto.MutationRequest) (*proto.MutationReply, error) {
|
||||
var reply *proto.MutationReply
|
||||
err := timeTrack(grpcMutateDuration, func() error {
|
||||
if req == nil {
|
||||
return status.Error(codes.InvalidArgument, "request is nil")
|
||||
}
|
||||
if req.CartId == "" {
|
||||
return status.Error(codes.InvalidArgument, "cart_id is empty")
|
||||
}
|
||||
mt := uint16(req.Type.Number())
|
||||
handler, ok := Handlers[mt]
|
||||
if !ok {
|
||||
return status.Errorf(codes.InvalidArgument, "unknown mutation type %d", mt)
|
||||
}
|
||||
content, err := handler.Read(req.Payload)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.InvalidArgument, "decode payload: %v", err)
|
||||
}
|
||||
var mutation interface{}
|
||||
switch m := envelope.Mutation.(type) {
|
||||
case *messages.MutationEnvelope_AddRequest:
|
||||
mutation = m.AddRequest
|
||||
case *messages.MutationEnvelope_AddItem:
|
||||
mutation = m.AddItem
|
||||
case *messages.MutationEnvelope_RemoveItem:
|
||||
mutation = m.RemoveItem
|
||||
case *messages.MutationEnvelope_RemoveDelivery:
|
||||
mutation = m.RemoveDelivery
|
||||
case *messages.MutationEnvelope_ChangeQuantity:
|
||||
mutation = m.ChangeQuantity
|
||||
case *messages.MutationEnvelope_SetDelivery:
|
||||
mutation = m.SetDelivery
|
||||
case *messages.MutationEnvelope_SetPickupPoint:
|
||||
mutation = m.SetPickupPoint
|
||||
case *messages.MutationEnvelope_CreateCheckoutOrder:
|
||||
mutation = m.CreateCheckoutOrder
|
||||
case *messages.MutationEnvelope_SetCartItems:
|
||||
mutation = m.SetCartItems
|
||||
case *messages.MutationEnvelope_OrderCompleted:
|
||||
mutation = m.OrderCompleted
|
||||
default:
|
||||
return &messages.MutationReply{
|
||||
StatusCode: 400,
|
||||
Result: &messages.MutationReply_Error{Error: fmt.Sprintf("unsupported mutation type: %T", m)},
|
||||
}, nil
|
||||
}
|
||||
|
||||
ts := req.ClientTimestamp
|
||||
if ts == 0 {
|
||||
ts = time.Now().Unix()
|
||||
}
|
||||
msg := Message{
|
||||
Type: mt,
|
||||
TimeStamp: &ts,
|
||||
Content: content,
|
||||
}
|
||||
|
||||
frame, err := s.pool.Process(ToCartId(req.CartId), msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reply = &proto.MutationReply{
|
||||
StatusCode: int32(frame.StatusCode),
|
||||
Payload: frame.Payload,
|
||||
}
|
||||
return nil
|
||||
})
|
||||
// Delegate the mutation to the grain pool.
|
||||
// The pool is responsible for routing it to the correct grain (local or remote).
|
||||
grain, err := s.pool.Process(cartID, mutation)
|
||||
if err != nil {
|
||||
grpcMutateErrors.Inc()
|
||||
return nil, err
|
||||
}
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
func (s *cartActorService) GetState(ctx context.Context, req *proto.StateRequest) (*proto.StateReply, error) {
|
||||
var reply *proto.StateReply
|
||||
err := timeTrack(grpcStateDuration, func() error {
|
||||
if req == nil || req.CartId == "" {
|
||||
return status.Error(codes.InvalidArgument, "cart_id is empty")
|
||||
}
|
||||
frame, err := s.pool.Get(ToCartId(req.CartId))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reply = &proto.StateReply{
|
||||
StatusCode: int32(frame.StatusCode),
|
||||
Payload: frame.Payload,
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// ControlPlane Service Implementation
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
// controlPlaneService directly leverages SyncedPool internals (same package).
|
||||
// NOTE: This is a transitional adapter; once the legacy frame-based code is
|
||||
// removed, related fields/methods in SyncedPool can be slimmed.
|
||||
type controlPlaneService struct {
|
||||
proto.UnimplementedControlPlaneServer
|
||||
pool *SyncedPool
|
||||
}
|
||||
|
||||
func newControlPlaneService(pool *SyncedPool) *controlPlaneService {
|
||||
return &controlPlaneService{pool: pool}
|
||||
}
|
||||
|
||||
func (s *controlPlaneService) Ping(ctx context.Context, _ *proto.Empty) (*proto.PingReply, error) {
|
||||
var reply *proto.PingReply
|
||||
err := timeTrack(grpcControlDuration, func() error {
|
||||
reply = &proto.PingReply{
|
||||
Host: s.pool.Hostname,
|
||||
UnixTime: time.Now().Unix(),
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
grpcControlErrors.Inc()
|
||||
return nil, err
|
||||
}
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
func (s *controlPlaneService) Negotiate(ctx context.Context, req *proto.NegotiateRequest) (*proto.NegotiateReply, error) {
|
||||
var reply *proto.NegotiateReply
|
||||
err := timeTrack(grpcControlDuration, func() error {
|
||||
if req == nil {
|
||||
return status.Error(codes.InvalidArgument, "request is nil")
|
||||
}
|
||||
// Add unknown hosts
|
||||
for _, host := range req.KnownHosts {
|
||||
if host == "" || host == s.pool.Hostname {
|
||||
continue
|
||||
}
|
||||
if !s.pool.IsKnown(host) {
|
||||
go s.pool.AddRemote(host)
|
||||
}
|
||||
}
|
||||
// Build healthy host list
|
||||
hosts := make([]string, 0)
|
||||
for _, r := range s.pool.GetHealthyRemotes() {
|
||||
hosts = append(hosts, r.Host)
|
||||
}
|
||||
hosts = append(hosts, s.pool.Hostname)
|
||||
reply = &proto.NegotiateReply{
|
||||
Hosts: hosts,
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
grpcControlErrors.Inc()
|
||||
return nil, err
|
||||
}
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
func (s *controlPlaneService) GetCartIds(ctx context.Context, _ *proto.Empty) (*proto.CartIdsReply, error) {
|
||||
var reply *proto.CartIdsReply
|
||||
err := timeTrack(grpcControlDuration, func() error {
|
||||
s.pool.mu.RLock()
|
||||
defer s.pool.mu.RUnlock()
|
||||
ids := make([]string, 0, len(s.pool.local.grains))
|
||||
for id, g := range s.pool.local.grains {
|
||||
if g == nil {
|
||||
continue
|
||||
}
|
||||
if id.String() == "" {
|
||||
continue
|
||||
}
|
||||
ids = append(ids, id.String())
|
||||
}
|
||||
reply = &proto.CartIdsReply{
|
||||
CartIds: ids,
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
grpcControlErrors.Inc()
|
||||
return nil, err
|
||||
}
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
func (s *controlPlaneService) ConfirmOwner(ctx context.Context, req *proto.OwnerChangeRequest) (*proto.OwnerChangeAck, error) {
|
||||
var reply *proto.OwnerChangeAck
|
||||
err := timeTrack(grpcControlDuration, func() error {
|
||||
if req == nil || req.CartId == "" || req.NewHost == "" {
|
||||
return status.Error(codes.InvalidArgument, "cart_id or new_host missing")
|
||||
}
|
||||
id := ToCartId(req.CartId)
|
||||
newHost := req.NewHost
|
||||
|
||||
// Mirror GrainOwnerChangeHandler semantics
|
||||
log.Printf("gRPC ConfirmOwner: cart %s newHost=%s", id, newHost)
|
||||
for _, r := range s.pool.remoteHosts {
|
||||
if r.Host == newHost && r.IsHealthy() {
|
||||
go s.pool.SpawnRemoteGrain(id, newHost)
|
||||
break
|
||||
}
|
||||
}
|
||||
go s.pool.AddRemote(newHost)
|
||||
|
||||
reply = &proto.OwnerChangeAck{
|
||||
Accepted: true,
|
||||
Message: "ok",
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
grpcControlErrors.Inc()
|
||||
return nil, err
|
||||
}
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
func (s *controlPlaneService) Closing(ctx context.Context, notice *proto.ClosingNotice) (*proto.OwnerChangeAck, error) {
|
||||
var reply *proto.OwnerChangeAck
|
||||
err := timeTrack(grpcControlDuration, func() error {
|
||||
if notice == nil || notice.Host == "" {
|
||||
return status.Error(codes.InvalidArgument, "host missing")
|
||||
}
|
||||
host := notice.Host
|
||||
s.pool.mu.RLock()
|
||||
_, exists := s.pool.remoteHosts[host]
|
||||
s.pool.mu.RUnlock()
|
||||
if exists {
|
||||
go s.pool.RemoveHost(host)
|
||||
}
|
||||
reply = &proto.OwnerChangeAck{
|
||||
Accepted: true,
|
||||
Message: "removed",
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
grpcControlErrors.Inc()
|
||||
return nil, err
|
||||
}
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Server Bootstrap
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
type GRPCServer struct {
|
||||
server *grpc.Server
|
||||
lis net.Listener
|
||||
addr string
|
||||
}
|
||||
|
||||
// StartGRPCServer sets up a gRPC server hosting both CartActor and ControlPlane services.
|
||||
// addr example: ":1337" (for combined) OR run two servers if you want separate ports.
|
||||
// For the migration we can host both on the same listener to reduce open ports.
|
||||
func StartGRPCServer(addr string, pool GrainPool, synced *SyncedPool, opts ...grpc.ServerOption) (*GRPCServer, error) {
|
||||
if pool == nil {
|
||||
return nil, errors.New("nil grain pool")
|
||||
}
|
||||
if synced == nil {
|
||||
return nil, errors.New("nil synced pool")
|
||||
return &messages.MutationReply{
|
||||
StatusCode: 500,
|
||||
Result: &messages.MutationReply_Error{Error: err.Error()},
|
||||
}, nil
|
||||
}
|
||||
|
||||
lis, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listen %s: %w", addr, err)
|
||||
}
|
||||
// Map the internal grain state to the protobuf representation.
|
||||
cartState := ToCartState(grain)
|
||||
|
||||
grpcServer := grpc.NewServer(opts...)
|
||||
proto.RegisterCartActorServer(grpcServer, newCartActorService(pool))
|
||||
proto.RegisterControlPlaneServer(grpcServer, newControlPlaneService(synced))
|
||||
|
||||
go func() {
|
||||
log.Printf("gRPC server listening on %s", addr)
|
||||
if serveErr := grpcServer.Serve(lis); serveErr != nil {
|
||||
log.Printf("gRPC server stopped: %v", serveErr)
|
||||
}
|
||||
}()
|
||||
|
||||
return &GRPCServer{
|
||||
server: grpcServer,
|
||||
lis: lis,
|
||||
addr: addr,
|
||||
return &messages.MutationReply{
|
||||
StatusCode: 200,
|
||||
Result: &messages.MutationReply_State{State: cartState},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GracefulStop stops the server gracefully.
|
||||
func (s *GRPCServer) GracefulStop() {
|
||||
if s == nil || s.server == nil {
|
||||
return
|
||||
// GetState retrieves the current state of a cart grain.
|
||||
func (s *cartActorGRPCServer) GetState(ctx context.Context, req *messages.StateRequest) (*messages.StateReply, error) {
|
||||
if req.GetCartId() == "" {
|
||||
return &messages.StateReply{
|
||||
StatusCode: 400,
|
||||
Result: &messages.StateReply_Error{Error: "cart_id is required"},
|
||||
}, nil
|
||||
}
|
||||
s.server.GracefulStop()
|
||||
}
|
||||
cartID := ToCartId(req.GetCartId())
|
||||
|
||||
// Addr returns the bound address.
|
||||
func (s *GRPCServer) Addr() string {
|
||||
if s == nil {
|
||||
return ""
|
||||
}
|
||||
return s.addr
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Client Dial Helpers (used later by refactored remote grain + control plane)
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
// DialRemote establishes (or reuses externally) a gRPC client connection.
|
||||
func DialRemote(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||
dialOpts := []grpc.DialOption{
|
||||
grpc.WithInsecure(), // NOTE: Intentional for initial migration; replace with TLS / mTLS later.
|
||||
grpc.WithBlock(),
|
||||
}
|
||||
dialOpts = append(dialOpts, opts...)
|
||||
ctxDial, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
conn, err := grpc.DialContext(ctxDial, target, dialOpts...)
|
||||
grain, err := s.pool.Get(cartID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return &messages.StateReply{
|
||||
StatusCode: 500,
|
||||
Result: &messages.StateReply_Error{Error: err.Error()},
|
||||
}, nil
|
||||
}
|
||||
return conn, nil
|
||||
|
||||
cartState := ToCartState(grain)
|
||||
|
||||
return &messages.StateReply{
|
||||
StatusCode: 200,
|
||||
Result: &messages.StateReply_State{State: cartState},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Utility for converting internal errors to gRPC status (if needed later).
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
func grpcError(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
// StartGRPCServer configures and starts the unified gRPC server on the given address.
|
||||
// It registers both the CartActor and ControlPlane services.
|
||||
func StartGRPCServer(addr string, pool GrainPool, syncedPool *SyncedPool) (*grpc.Server, error) {
|
||||
lis, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to listen: %w", err)
|
||||
}
|
||||
// Extend mapping if we add richer error types.
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
grpcServer := grpc.NewServer()
|
||||
server := NewCartActorGRPCServer(pool, syncedPool)
|
||||
|
||||
messages.RegisterCartActorServer(grpcServer, server)
|
||||
messages.RegisterControlPlaneServer(grpcServer, server)
|
||||
reflection.Register(grpcServer)
|
||||
|
||||
log.Printf("gRPC server listening on %s", addr)
|
||||
go func() {
|
||||
if err := grpcServer.Serve(lis); err != nil {
|
||||
log.Fatalf("failed to serve gRPC: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return grpcServer, nil
|
||||
}
|
||||
Reference in New Issue
Block a user