380 lines
10 KiB
Go
380 lines
10 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"time"
|
|
|
|
proto "git.tornberg.me/go-cart-actor/proto" // underlying generated package name is 'messages'
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// Metrics
|
|
// -----------------------------------------------------------------------------
|
|
|
|
var (
|
|
grpcMutateDuration = promauto.NewHistogram(prometheus.HistogramOpts{
|
|
Name: "cart_grpc_mutate_duration_seconds",
|
|
Help: "Duration of CartActor.Mutate RPCs",
|
|
Buckets: prometheus.DefBuckets,
|
|
})
|
|
grpcMutateErrors = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_grpc_mutate_errors_total",
|
|
Help: "Total number of failed CartActor.Mutate RPCs",
|
|
})
|
|
grpcStateDuration = promauto.NewHistogram(prometheus.HistogramOpts{
|
|
Name: "cart_grpc_get_state_duration_seconds",
|
|
Help: "Duration of CartActor.GetState RPCs",
|
|
Buckets: prometheus.DefBuckets,
|
|
})
|
|
grpcControlDuration = promauto.NewHistogram(prometheus.HistogramOpts{
|
|
Name: "cart_grpc_control_duration_seconds",
|
|
Help: "Duration of ControlPlane RPCs",
|
|
Buckets: prometheus.DefBuckets,
|
|
})
|
|
grpcControlErrors = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_grpc_control_errors_total",
|
|
Help: "Total number of failed ControlPlane RPCs",
|
|
})
|
|
)
|
|
|
|
// timeTrack wraps a closure and records duration into the supplied histogram.
|
|
func timeTrack(hist prometheus.Observer, fn func() error) (err error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
hist.Observe(time.Since(start).Seconds())
|
|
}()
|
|
return fn()
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// CartActor Service Implementation
|
|
// -----------------------------------------------------------------------------
|
|
|
|
type cartActorService struct {
|
|
proto.UnimplementedCartActorServer
|
|
pool GrainPool
|
|
}
|
|
|
|
func newCartActorService(pool GrainPool) *cartActorService {
|
|
return &cartActorService{pool: pool}
|
|
}
|
|
|
|
func (s *cartActorService) Mutate(ctx context.Context, req *proto.MutationRequest) (*proto.MutationReply, error) {
|
|
var reply *proto.MutationReply
|
|
err := timeTrack(grpcMutateDuration, func() error {
|
|
if req == nil {
|
|
return status.Error(codes.InvalidArgument, "request is nil")
|
|
}
|
|
if req.CartId == "" {
|
|
return status.Error(codes.InvalidArgument, "cart_id is empty")
|
|
}
|
|
mt := uint16(req.Type.Number())
|
|
handler, ok := Handlers[mt]
|
|
if !ok {
|
|
return status.Errorf(codes.InvalidArgument, "unknown mutation type %d", mt)
|
|
}
|
|
content, err := handler.Read(req.Payload)
|
|
if err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "decode payload: %v", err)
|
|
}
|
|
|
|
ts := req.ClientTimestamp
|
|
if ts == 0 {
|
|
ts = time.Now().Unix()
|
|
}
|
|
msg := Message{
|
|
Type: mt,
|
|
TimeStamp: &ts,
|
|
Content: content,
|
|
}
|
|
|
|
frame, err := s.pool.Process(ToCartId(req.CartId), msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reply = &proto.MutationReply{
|
|
StatusCode: int32(frame.StatusCode),
|
|
Payload: frame.Payload,
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
grpcMutateErrors.Inc()
|
|
return nil, err
|
|
}
|
|
return reply, nil
|
|
}
|
|
|
|
func (s *cartActorService) GetState(ctx context.Context, req *proto.StateRequest) (*proto.StateReply, error) {
|
|
var reply *proto.StateReply
|
|
err := timeTrack(grpcStateDuration, func() error {
|
|
if req == nil || req.CartId == "" {
|
|
return status.Error(codes.InvalidArgument, "cart_id is empty")
|
|
}
|
|
frame, err := s.pool.Get(ToCartId(req.CartId))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reply = &proto.StateReply{
|
|
StatusCode: int32(frame.StatusCode),
|
|
Payload: frame.Payload,
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return reply, nil
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// ControlPlane Service Implementation
|
|
// -----------------------------------------------------------------------------
|
|
|
|
// controlPlaneService directly leverages SyncedPool internals (same package).
|
|
// NOTE: This is a transitional adapter; once the legacy frame-based code is
|
|
// removed, related fields/methods in SyncedPool can be slimmed.
|
|
type controlPlaneService struct {
|
|
proto.UnimplementedControlPlaneServer
|
|
pool *SyncedPool
|
|
}
|
|
|
|
func newControlPlaneService(pool *SyncedPool) *controlPlaneService {
|
|
return &controlPlaneService{pool: pool}
|
|
}
|
|
|
|
func (s *controlPlaneService) Ping(ctx context.Context, _ *proto.Empty) (*proto.PingReply, error) {
|
|
var reply *proto.PingReply
|
|
err := timeTrack(grpcControlDuration, func() error {
|
|
reply = &proto.PingReply{
|
|
Host: s.pool.Hostname,
|
|
UnixTime: time.Now().Unix(),
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
grpcControlErrors.Inc()
|
|
return nil, err
|
|
}
|
|
return reply, nil
|
|
}
|
|
|
|
func (s *controlPlaneService) Negotiate(ctx context.Context, req *proto.NegotiateRequest) (*proto.NegotiateReply, error) {
|
|
var reply *proto.NegotiateReply
|
|
err := timeTrack(grpcControlDuration, func() error {
|
|
if req == nil {
|
|
return status.Error(codes.InvalidArgument, "request is nil")
|
|
}
|
|
// Add unknown hosts
|
|
for _, host := range req.KnownHosts {
|
|
if host == "" || host == s.pool.Hostname {
|
|
continue
|
|
}
|
|
if !s.pool.IsKnown(host) {
|
|
go s.pool.AddRemote(host)
|
|
}
|
|
}
|
|
// Build healthy host list
|
|
hosts := make([]string, 0)
|
|
for _, r := range s.pool.GetHealthyRemotes() {
|
|
hosts = append(hosts, r.Host)
|
|
}
|
|
hosts = append(hosts, s.pool.Hostname)
|
|
reply = &proto.NegotiateReply{
|
|
Hosts: hosts,
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
grpcControlErrors.Inc()
|
|
return nil, err
|
|
}
|
|
return reply, nil
|
|
}
|
|
|
|
func (s *controlPlaneService) GetCartIds(ctx context.Context, _ *proto.Empty) (*proto.CartIdsReply, error) {
|
|
var reply *proto.CartIdsReply
|
|
err := timeTrack(grpcControlDuration, func() error {
|
|
s.pool.mu.RLock()
|
|
defer s.pool.mu.RUnlock()
|
|
ids := make([]string, 0, len(s.pool.local.grains))
|
|
for id, g := range s.pool.local.grains {
|
|
if g == nil {
|
|
continue
|
|
}
|
|
if id.String() == "" {
|
|
continue
|
|
}
|
|
ids = append(ids, id.String())
|
|
}
|
|
reply = &proto.CartIdsReply{
|
|
CartIds: ids,
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
grpcControlErrors.Inc()
|
|
return nil, err
|
|
}
|
|
return reply, nil
|
|
}
|
|
|
|
func (s *controlPlaneService) ConfirmOwner(ctx context.Context, req *proto.OwnerChangeRequest) (*proto.OwnerChangeAck, error) {
|
|
var reply *proto.OwnerChangeAck
|
|
err := timeTrack(grpcControlDuration, func() error {
|
|
if req == nil || req.CartId == "" || req.NewHost == "" {
|
|
return status.Error(codes.InvalidArgument, "cart_id or new_host missing")
|
|
}
|
|
id := ToCartId(req.CartId)
|
|
newHost := req.NewHost
|
|
|
|
// Mirror GrainOwnerChangeHandler semantics
|
|
log.Printf("gRPC ConfirmOwner: cart %s newHost=%s", id, newHost)
|
|
for _, r := range s.pool.remoteHosts {
|
|
if r.Host == newHost && r.IsHealthy() {
|
|
go s.pool.SpawnRemoteGrain(id, newHost)
|
|
break
|
|
}
|
|
}
|
|
go s.pool.AddRemote(newHost)
|
|
|
|
reply = &proto.OwnerChangeAck{
|
|
Accepted: true,
|
|
Message: "ok",
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
grpcControlErrors.Inc()
|
|
return nil, err
|
|
}
|
|
return reply, nil
|
|
}
|
|
|
|
func (s *controlPlaneService) Closing(ctx context.Context, notice *proto.ClosingNotice) (*proto.OwnerChangeAck, error) {
|
|
var reply *proto.OwnerChangeAck
|
|
err := timeTrack(grpcControlDuration, func() error {
|
|
if notice == nil || notice.Host == "" {
|
|
return status.Error(codes.InvalidArgument, "host missing")
|
|
}
|
|
host := notice.Host
|
|
s.pool.mu.RLock()
|
|
_, exists := s.pool.remoteHosts[host]
|
|
s.pool.mu.RUnlock()
|
|
if exists {
|
|
go s.pool.RemoveHost(host)
|
|
}
|
|
reply = &proto.OwnerChangeAck{
|
|
Accepted: true,
|
|
Message: "removed",
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
grpcControlErrors.Inc()
|
|
return nil, err
|
|
}
|
|
return reply, nil
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// Server Bootstrap
|
|
// -----------------------------------------------------------------------------
|
|
|
|
type GRPCServer struct {
|
|
server *grpc.Server
|
|
lis net.Listener
|
|
addr string
|
|
}
|
|
|
|
// StartGRPCServer sets up a gRPC server hosting both CartActor and ControlPlane services.
|
|
// addr example: ":1337" (for combined) OR run two servers if you want separate ports.
|
|
// For the migration we can host both on the same listener to reduce open ports.
|
|
func StartGRPCServer(addr string, pool GrainPool, synced *SyncedPool, opts ...grpc.ServerOption) (*GRPCServer, error) {
|
|
if pool == nil {
|
|
return nil, errors.New("nil grain pool")
|
|
}
|
|
if synced == nil {
|
|
return nil, errors.New("nil synced pool")
|
|
}
|
|
|
|
lis, err := net.Listen("tcp", addr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listen %s: %w", addr, err)
|
|
}
|
|
|
|
grpcServer := grpc.NewServer(opts...)
|
|
proto.RegisterCartActorServer(grpcServer, newCartActorService(pool))
|
|
proto.RegisterControlPlaneServer(grpcServer, newControlPlaneService(synced))
|
|
|
|
go func() {
|
|
log.Printf("gRPC server listening on %s", addr)
|
|
if serveErr := grpcServer.Serve(lis); serveErr != nil {
|
|
log.Printf("gRPC server stopped: %v", serveErr)
|
|
}
|
|
}()
|
|
|
|
return &GRPCServer{
|
|
server: grpcServer,
|
|
lis: lis,
|
|
addr: addr,
|
|
}, nil
|
|
}
|
|
|
|
// GracefulStop stops the server gracefully.
|
|
func (s *GRPCServer) GracefulStop() {
|
|
if s == nil || s.server == nil {
|
|
return
|
|
}
|
|
s.server.GracefulStop()
|
|
}
|
|
|
|
// Addr returns the bound address.
|
|
func (s *GRPCServer) Addr() string {
|
|
if s == nil {
|
|
return ""
|
|
}
|
|
return s.addr
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// Client Dial Helpers (used later by refactored remote grain + control plane)
|
|
// -----------------------------------------------------------------------------
|
|
|
|
// DialRemote establishes (or reuses externally) a gRPC client connection.
|
|
func DialRemote(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
|
dialOpts := []grpc.DialOption{
|
|
grpc.WithInsecure(), // NOTE: Intentional for initial migration; replace with TLS / mTLS later.
|
|
grpc.WithBlock(),
|
|
}
|
|
dialOpts = append(dialOpts, opts...)
|
|
ctxDial, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
conn, err := grpc.DialContext(ctxDial, target, dialOpts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return conn, nil
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// Utility for converting internal errors to gRPC status (if needed later).
|
|
// -----------------------------------------------------------------------------
|
|
|
|
func grpcError(err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
// Extend mapping if we add richer error types.
|
|
return status.Error(codes.Internal, err.Error())
|
|
}
|