package actor import ( "context" "fmt" "log" "net" "time" messages "git.k6n.net/go-cart-actor/pkg/messages" "go.opentelemetry.io/contrib/bridges/otelslog" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) // ControlServer implements the ControlPlane gRPC services. // It delegates to a grain pool and cluster operations to a synced pool. type ControlServer[V any] struct { messages.UnimplementedControlPlaneServer pool GrainPool[V] } const name = "grpc_server" var ( tracer = otel.Tracer(name) meter = otel.Meter(name) logger = otelslog.NewLogger(name) pingCalls metric.Int64Counter negotiateCalls metric.Int64Counter getLocalActorIdsCalls metric.Int64Counter announceOwnershipCalls metric.Int64Counter announceExpiryCalls metric.Int64Counter closingCalls metric.Int64Counter ) func init() { var err error pingCalls, err = meter.Int64Counter("grpc.ping_calls", metric.WithDescription("Number of ping calls"), metric.WithUnit("{calls}")) if err != nil { panic(err) } negotiateCalls, err = meter.Int64Counter("grpc.negotiate_calls", metric.WithDescription("Number of negotiate calls"), metric.WithUnit("{calls}")) if err != nil { panic(err) } getLocalActorIdsCalls, err = meter.Int64Counter("grpc.get_local_actor_ids_calls", metric.WithDescription("Number of get local actor ids calls"), metric.WithUnit("{calls}")) if err != nil { panic(err) } announceOwnershipCalls, err = meter.Int64Counter("grpc.announce_ownership_calls", metric.WithDescription("Number of announce ownership calls"), metric.WithUnit("{calls}")) if err != nil { panic(err) } announceExpiryCalls, err = meter.Int64Counter("grpc.announce_expiry_calls", metric.WithDescription("Number of announce expiry calls"), metric.WithUnit("{calls}")) if err != nil { panic(err) } closingCalls, err = meter.Int64Counter("grpc.closing_calls", metric.WithDescription("Number of closing calls"), metric.WithUnit("{calls}")) if err != nil { panic(err) } } func (s *ControlServer[V]) AnnounceOwnership(ctx context.Context, req *messages.OwnershipAnnounce) (*messages.OwnerChangeAck, error) { ctx, span := tracer.Start(ctx, "grpc_announce_ownership") defer span.End() span.SetAttributes( attribute.String("component", "controlplane"), attribute.String("host", req.Host), attribute.Int("id_count", len(req.Ids)), ) logger.InfoContext(ctx, "announce ownership", "host", req.Host, "id_count", len(req.Ids)) announceOwnershipCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", req.Host))) err := s.pool.HandleOwnershipChange(req.Host, req.Ids) if err != nil { span.RecordError(err) return &messages.OwnerChangeAck{ Accepted: false, Message: "owner change failed", }, err } log.Printf("Ack count: %d", len(req.Ids)) return &messages.OwnerChangeAck{ Accepted: true, Message: "ownership announced", }, nil } func (s *ControlServer[V]) AnnounceExpiry(ctx context.Context, req *messages.ExpiryAnnounce) (*messages.OwnerChangeAck, error) { ctx, span := tracer.Start(ctx, "grpc_announce_expiry") defer span.End() span.SetAttributes( attribute.String("component", "controlplane"), attribute.String("host", req.Host), attribute.Int("id_count", len(req.Ids)), ) logger.InfoContext(ctx, "announce expiry", "host", req.Host, "id_count", len(req.Ids)) announceExpiryCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", req.Host))) err := s.pool.HandleRemoteExpiry(req.Host, req.Ids) if err != nil { span.RecordError(err) } return &messages.OwnerChangeAck{ Accepted: err == nil, Message: "expiry acknowledged", }, nil } // ControlPlane: Ping func (s *ControlServer[V]) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) { host := s.pool.Hostname() pingCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", host))) // log.Printf("got ping") return &messages.PingReply{ Host: host, UnixTime: time.Now().Unix(), }, nil } // ControlPlane: Negotiate (merge host views) func (s *ControlServer[V]) Negotiate(ctx context.Context, req *messages.NegotiateRequest) (*messages.NegotiateReply, error) { ctx, span := tracer.Start(ctx, "grpc_negotiate") defer span.End() span.SetAttributes( attribute.String("component", "controlplane"), attribute.Int("known_hosts_count", len(req.KnownHosts)), ) logger.InfoContext(ctx, "negotiate", "known_hosts_count", len(req.KnownHosts)) negotiateCalls.Add(ctx, 1) s.pool.Negotiate(req.KnownHosts) return &messages.NegotiateReply{Hosts: req.GetKnownHosts()}, nil } // ControlPlane: GetCartIds (locally owned carts only) func (s *ControlServer[V]) GetLocalActorIds(ctx context.Context, _ *messages.Empty) (*messages.ActorIdsReply, error) { ctx, span := tracer.Start(ctx, "grpc_get_local_actor_ids") defer span.End() ids := s.pool.GetLocalIds() span.SetAttributes( attribute.String("component", "controlplane"), attribute.Int("id_count", len(ids)), ) logger.InfoContext(ctx, "get local actor ids", "id_count", len(ids)) getLocalActorIdsCalls.Add(ctx, 1) return &messages.ActorIdsReply{Ids: ids}, nil } // ControlPlane: Closing (peer shutdown notification) func (s *ControlServer[V]) Closing(ctx context.Context, req *messages.ClosingNotice) (*messages.OwnerChangeAck, error) { ctx, span := tracer.Start(ctx, "grpc_closing") defer span.End() host := req.GetHost() span.SetAttributes( attribute.String("component", "controlplane"), attribute.String("host", host), ) logger.InfoContext(ctx, "closing notice", "host", host) closingCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", host))) if host != "" { s.pool.RemoveHost(host) } return &messages.OwnerChangeAck{ Accepted: true, Message: "removed host", }, nil } type ServerConfig struct { Addr string Options []grpc.ServerOption } func NewServerConfig(addr string, options ...grpc.ServerOption) ServerConfig { return ServerConfig{ Addr: addr, Options: options, } } func DefaultServerConfig() ServerConfig { return NewServerConfig(":1337") } // StartGRPCServer configures and starts the unified gRPC server on the given address. // It registers both the CartActor and ControlPlane services. func NewControlServer[V any](config ServerConfig, pool GrainPool[V]) (*grpc.Server, error) { lis, err := net.Listen("tcp", config.Addr) if err != nil { return nil, fmt.Errorf("failed to listen: %w", err) } grpcServer := grpc.NewServer(config.Options...) server := &ControlServer[V]{ pool: pool, } messages.RegisterControlPlaneServer(grpcServer, server) reflection.Register(grpcServer) log.Printf("gRPC server listening as %s on %s", pool.Hostname(), config.Addr) go func() { if err := grpcServer.Serve(lis); err != nil { log.Fatalf("failed to serve gRPC: %v", err) } }() return grpcServer, nil }