Files
go-cart-actor/pkg/actor/grpc_server.go
matst80 ea35871676
Some checks failed
Build and Publish / Metadata (push) Successful in 4s
Build and Publish / BuildAndDeployArm64 (push) Successful in 3m40s
Build and Publish / BuildAndDeployAmd64 (push) Has been cancelled
test
2025-10-12 23:18:55 +02:00

103 lines
2.9 KiB
Go

package actor
import (
"context"
"fmt"
"log"
"net"
"time"
messages "git.tornberg.me/go-cart-actor/pkg/messages"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
// ControlServer implements the ControlPlane gRPC services.
// It delegates to a grain pool and cluster operations to a synced pool.
type ControlServer[V any] struct {
messages.UnimplementedControlPlaneServer
pool GrainPool[V]
}
func (s *ControlServer[V]) AnnounceOwnership(ctx context.Context, req *messages.OwnershipAnnounce) (*messages.OwnerChangeAck, error) {
err := s.pool.HandleOwnershipChange(req.Host, req.Ids)
if err != nil {
return &messages.OwnerChangeAck{
Accepted: false,
Message: "owner change failed",
}, err
}
log.Printf("Ack count: %d", len(req.Ids))
return &messages.OwnerChangeAck{
Accepted: true,
Message: "ownership announced",
}, nil
}
func (s *ControlServer[V]) AnnounceExpiry(ctx context.Context, req *messages.ExpiryAnnounce) (*messages.OwnerChangeAck, error) {
err := s.pool.HandleRemoteExpiry(req.Host, req.Ids)
return &messages.OwnerChangeAck{
Accepted: err == nil,
Message: "expiry acknowledged",
}, nil
}
// ControlPlane: Ping
func (s *ControlServer[V]) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) {
return &messages.PingReply{
Host: s.pool.Hostname(),
UnixTime: time.Now().Unix(),
}, nil
}
// ControlPlane: Negotiate (merge host views)
func (s *ControlServer[V]) Negotiate(ctx context.Context, req *messages.NegotiateRequest) (*messages.NegotiateReply, error) {
s.pool.Negotiate(req.KnownHosts)
return &messages.NegotiateReply{Hosts: req.GetKnownHosts()}, nil
}
// ControlPlane: GetCartIds (locally owned carts only)
func (s *ControlServer[V]) GetLocalActorIds(ctx context.Context, _ *messages.Empty) (*messages.ActorIdsReply, error) {
return &messages.ActorIdsReply{Ids: s.pool.GetLocalIds()}, nil
}
// ControlPlane: Closing (peer shutdown notification)
func (s *ControlServer[V]) Closing(ctx context.Context, req *messages.ClosingNotice) (*messages.OwnerChangeAck, error) {
if req.GetHost() != "" {
s.pool.RemoveHost(req.GetHost())
}
return &messages.OwnerChangeAck{
Accepted: true,
Message: "removed host",
}, nil
}
// StartGRPCServer configures and starts the unified gRPC server on the given address.
// It registers both the CartActor and ControlPlane services.
func NewControlServer[V any](addr string, pool GrainPool[V]) (*grpc.Server, error) {
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to listen: %w", err)
}
grpcServer := grpc.NewServer()
server := &ControlServer[V]{
pool: pool,
}
log.Printf("gRPC server listening on %s", addr)
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve gRPC: %v", err)
}
messages.RegisterControlPlaneServer(grpcServer, server)
reflection.Register(grpcServer)
return grpcServer, nil
}