package actor import ( "context" "fmt" "log" "net" "time" messages "git.tornberg.me/go-cart-actor/pkg/messages" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) // ControlServer implements the ControlPlane gRPC services. // It delegates to a grain pool and cluster operations to a synced pool. type ControlServer[V any] struct { messages.UnimplementedControlPlaneServer pool GrainPool[V] } func (s *ControlServer[V]) AnnounceOwnership(ctx context.Context, req *messages.OwnershipAnnounce) (*messages.OwnerChangeAck, error) { err := s.pool.HandleOwnershipChange(req.Host, req.Ids) if err != nil { return &messages.OwnerChangeAck{ Accepted: false, Message: "owner change failed", }, err } log.Printf("Ack count: %d", len(req.Ids)) return &messages.OwnerChangeAck{ Accepted: true, Message: "ownership announced", }, nil } func (s *ControlServer[V]) AnnounceExpiry(ctx context.Context, req *messages.ExpiryAnnounce) (*messages.OwnerChangeAck, error) { err := s.pool.HandleRemoteExpiry(req.Host, req.Ids) return &messages.OwnerChangeAck{ Accepted: err == nil, Message: "expiry acknowledged", }, nil } // ControlPlane: Ping func (s *ControlServer[V]) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) { return &messages.PingReply{ Host: s.pool.Hostname(), UnixTime: time.Now().Unix(), }, nil } // ControlPlane: Negotiate (merge host views) func (s *ControlServer[V]) Negotiate(ctx context.Context, req *messages.NegotiateRequest) (*messages.NegotiateReply, error) { s.pool.Negotiate(req.KnownHosts) return &messages.NegotiateReply{Hosts: req.GetKnownHosts()}, nil } // ControlPlane: GetCartIds (locally owned carts only) func (s *ControlServer[V]) GetLocalActorIds(ctx context.Context, _ *messages.Empty) (*messages.ActorIdsReply, error) { return &messages.ActorIdsReply{Ids: s.pool.GetLocalIds()}, nil } // ControlPlane: Closing (peer shutdown notification) func (s *ControlServer[V]) Closing(ctx context.Context, req *messages.ClosingNotice) (*messages.OwnerChangeAck, error) { if req.GetHost() != "" { s.pool.RemoveHost(req.GetHost()) } return &messages.OwnerChangeAck{ Accepted: true, Message: "removed host", }, nil } // StartGRPCServer configures and starts the unified gRPC server on the given address. // It registers both the CartActor and ControlPlane services. func NewControlServer[V any](addr string, pool GrainPool[V]) (*grpc.Server, error) { lis, err := net.Listen("tcp", addr) if err != nil { return nil, fmt.Errorf("failed to listen: %w", err) } grpcServer := grpc.NewServer() server := &ControlServer[V]{ pool: pool, } messages.RegisterControlPlaneServer(grpcServer, server) reflection.Register(grpcServer) log.Printf("gRPC server listening on %s", addr) go func() { if err := grpcServer.Serve(lis); err != nil { log.Fatalf("failed to serve gRPC: %v", err) } }() return grpcServer, nil }