package main import ( "context" "errors" "fmt" "log" "net" "time" proto "git.tornberg.me/go-cart-actor/proto" // underlying generated package name is 'messages' "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // ----------------------------------------------------------------------------- // Metrics // ----------------------------------------------------------------------------- var ( grpcMutateDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "cart_grpc_mutate_duration_seconds", Help: "Duration of CartActor.Mutate RPCs", Buckets: prometheus.DefBuckets, }) grpcMutateErrors = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grpc_mutate_errors_total", Help: "Total number of failed CartActor.Mutate RPCs", }) grpcStateDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "cart_grpc_get_state_duration_seconds", Help: "Duration of CartActor.GetState RPCs", Buckets: prometheus.DefBuckets, }) grpcControlDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "cart_grpc_control_duration_seconds", Help: "Duration of ControlPlane RPCs", Buckets: prometheus.DefBuckets, }) grpcControlErrors = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grpc_control_errors_total", Help: "Total number of failed ControlPlane RPCs", }) ) // timeTrack wraps a closure and records duration into the supplied histogram. func timeTrack(hist prometheus.Observer, fn func() error) (err error) { start := time.Now() defer func() { hist.Observe(time.Since(start).Seconds()) }() return fn() } // ----------------------------------------------------------------------------- // CartActor Service Implementation // ----------------------------------------------------------------------------- type cartActorService struct { proto.UnimplementedCartActorServer pool GrainPool } func newCartActorService(pool GrainPool) *cartActorService { return &cartActorService{pool: pool} } func (s *cartActorService) Mutate(ctx context.Context, req *proto.MutationRequest) (*proto.MutationReply, error) { var reply *proto.MutationReply err := timeTrack(grpcMutateDuration, func() error { if req == nil { return status.Error(codes.InvalidArgument, "request is nil") } if req.CartId == "" { return status.Error(codes.InvalidArgument, "cart_id is empty") } mt := uint16(req.Type.Number()) handler, ok := Handlers[mt] if !ok { return status.Errorf(codes.InvalidArgument, "unknown mutation type %d", mt) } content, err := handler.Read(req.Payload) if err != nil { return status.Errorf(codes.InvalidArgument, "decode payload: %v", err) } ts := req.ClientTimestamp if ts == 0 { ts = time.Now().Unix() } msg := Message{ Type: mt, TimeStamp: &ts, Content: content, } frame, err := s.pool.Process(ToCartId(req.CartId), msg) if err != nil { return err } reply = &proto.MutationReply{ StatusCode: int32(frame.StatusCode), Payload: frame.Payload, } return nil }) if err != nil { grpcMutateErrors.Inc() return nil, err } return reply, nil } func (s *cartActorService) GetState(ctx context.Context, req *proto.StateRequest) (*proto.StateReply, error) { var reply *proto.StateReply err := timeTrack(grpcStateDuration, func() error { if req == nil || req.CartId == "" { return status.Error(codes.InvalidArgument, "cart_id is empty") } frame, err := s.pool.Get(ToCartId(req.CartId)) if err != nil { return err } reply = &proto.StateReply{ StatusCode: int32(frame.StatusCode), Payload: frame.Payload, } return nil }) if err != nil { return nil, err } return reply, nil } // ----------------------------------------------------------------------------- // ControlPlane Service Implementation // ----------------------------------------------------------------------------- // controlPlaneService directly leverages SyncedPool internals (same package). // NOTE: This is a transitional adapter; once the legacy frame-based code is // removed, related fields/methods in SyncedPool can be slimmed. type controlPlaneService struct { proto.UnimplementedControlPlaneServer pool *SyncedPool } func newControlPlaneService(pool *SyncedPool) *controlPlaneService { return &controlPlaneService{pool: pool} } func (s *controlPlaneService) Ping(ctx context.Context, _ *proto.Empty) (*proto.PingReply, error) { var reply *proto.PingReply err := timeTrack(grpcControlDuration, func() error { reply = &proto.PingReply{ Host: s.pool.Hostname, UnixTime: time.Now().Unix(), } return nil }) if err != nil { grpcControlErrors.Inc() return nil, err } return reply, nil } func (s *controlPlaneService) Negotiate(ctx context.Context, req *proto.NegotiateRequest) (*proto.NegotiateReply, error) { var reply *proto.NegotiateReply err := timeTrack(grpcControlDuration, func() error { if req == nil { return status.Error(codes.InvalidArgument, "request is nil") } // Add unknown hosts for _, host := range req.KnownHosts { if host == "" || host == s.pool.Hostname { continue } if !s.pool.IsKnown(host) { go s.pool.AddRemote(host) } } // Build healthy host list hosts := make([]string, 0) for _, r := range s.pool.GetHealthyRemotes() { hosts = append(hosts, r.Host) } hosts = append(hosts, s.pool.Hostname) reply = &proto.NegotiateReply{ Hosts: hosts, } return nil }) if err != nil { grpcControlErrors.Inc() return nil, err } return reply, nil } func (s *controlPlaneService) GetCartIds(ctx context.Context, _ *proto.Empty) (*proto.CartIdsReply, error) { var reply *proto.CartIdsReply err := timeTrack(grpcControlDuration, func() error { s.pool.mu.RLock() defer s.pool.mu.RUnlock() ids := make([]string, 0, len(s.pool.local.grains)) for id, g := range s.pool.local.grains { if g == nil { continue } if id.String() == "" { continue } ids = append(ids, id.String()) } reply = &proto.CartIdsReply{ CartIds: ids, } return nil }) if err != nil { grpcControlErrors.Inc() return nil, err } return reply, nil } func (s *controlPlaneService) ConfirmOwner(ctx context.Context, req *proto.OwnerChangeRequest) (*proto.OwnerChangeAck, error) { var reply *proto.OwnerChangeAck err := timeTrack(grpcControlDuration, func() error { if req == nil || req.CartId == "" || req.NewHost == "" { return status.Error(codes.InvalidArgument, "cart_id or new_host missing") } id := ToCartId(req.CartId) newHost := req.NewHost // Mirror GrainOwnerChangeHandler semantics log.Printf("gRPC ConfirmOwner: cart %s newHost=%s", id, newHost) for _, r := range s.pool.remoteHosts { if r.Host == newHost && r.IsHealthy() { go s.pool.SpawnRemoteGrain(id, newHost) break } } go s.pool.AddRemote(newHost) reply = &proto.OwnerChangeAck{ Accepted: true, Message: "ok", } return nil }) if err != nil { grpcControlErrors.Inc() return nil, err } return reply, nil } func (s *controlPlaneService) Closing(ctx context.Context, notice *proto.ClosingNotice) (*proto.OwnerChangeAck, error) { var reply *proto.OwnerChangeAck err := timeTrack(grpcControlDuration, func() error { if notice == nil || notice.Host == "" { return status.Error(codes.InvalidArgument, "host missing") } host := notice.Host s.pool.mu.RLock() _, exists := s.pool.remoteHosts[host] s.pool.mu.RUnlock() if exists { go s.pool.RemoveHost(host) } reply = &proto.OwnerChangeAck{ Accepted: true, Message: "removed", } return nil }) if err != nil { grpcControlErrors.Inc() return nil, err } return reply, nil } // ----------------------------------------------------------------------------- // Server Bootstrap // ----------------------------------------------------------------------------- type GRPCServer struct { server *grpc.Server lis net.Listener addr string } // StartGRPCServer sets up a gRPC server hosting both CartActor and ControlPlane services. // addr example: ":1337" (for combined) OR run two servers if you want separate ports. // For the migration we can host both on the same listener to reduce open ports. func StartGRPCServer(addr string, pool GrainPool, synced *SyncedPool, opts ...grpc.ServerOption) (*GRPCServer, error) { if pool == nil { return nil, errors.New("nil grain pool") } if synced == nil { return nil, errors.New("nil synced pool") } lis, err := net.Listen("tcp", addr) if err != nil { return nil, fmt.Errorf("listen %s: %w", addr, err) } grpcServer := grpc.NewServer(opts...) proto.RegisterCartActorServer(grpcServer, newCartActorService(pool)) proto.RegisterControlPlaneServer(grpcServer, newControlPlaneService(synced)) go func() { log.Printf("gRPC server listening on %s", addr) if serveErr := grpcServer.Serve(lis); serveErr != nil { log.Printf("gRPC server stopped: %v", serveErr) } }() return &GRPCServer{ server: grpcServer, lis: lis, addr: addr, }, nil } // GracefulStop stops the server gracefully. func (s *GRPCServer) GracefulStop() { if s == nil || s.server == nil { return } s.server.GracefulStop() } // Addr returns the bound address. func (s *GRPCServer) Addr() string { if s == nil { return "" } return s.addr } // ----------------------------------------------------------------------------- // Client Dial Helpers (used later by refactored remote grain + control plane) // ----------------------------------------------------------------------------- // DialRemote establishes (or reuses externally) a gRPC client connection. func DialRemote(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { dialOpts := []grpc.DialOption{ grpc.WithInsecure(), // NOTE: Intentional for initial migration; replace with TLS / mTLS later. grpc.WithBlock(), } dialOpts = append(dialOpts, opts...) ctxDial, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() conn, err := grpc.DialContext(ctxDial, target, dialOpts...) if err != nil { return nil, err } return conn, nil } // ----------------------------------------------------------------------------- // Utility for converting internal errors to gRPC status (if needed later). // ----------------------------------------------------------------------------- func grpcError(err error) error { if err == nil { return nil } // Extend mapping if we add richer error types. return status.Error(codes.Internal, err.Error()) }