120 lines
3.3 KiB
Go
120 lines
3.3 KiB
Go
package actor
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"time"
|
|
|
|
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/reflection"
|
|
)
|
|
|
|
// ControlServer implements the ControlPlane gRPC services.
|
|
// It delegates to a grain pool and cluster operations to a synced pool.
|
|
type ControlServer[V any] struct {
|
|
messages.UnimplementedControlPlaneServer
|
|
|
|
pool GrainPool[V]
|
|
}
|
|
|
|
func (s *ControlServer[V]) AnnounceOwnership(ctx context.Context, req *messages.OwnershipAnnounce) (*messages.OwnerChangeAck, error) {
|
|
err := s.pool.HandleOwnershipChange(req.Host, req.Ids)
|
|
if err != nil {
|
|
return &messages.OwnerChangeAck{
|
|
Accepted: false,
|
|
Message: "owner change failed",
|
|
}, err
|
|
}
|
|
|
|
log.Printf("Ack count: %d", len(req.Ids))
|
|
return &messages.OwnerChangeAck{
|
|
Accepted: true,
|
|
Message: "ownership announced",
|
|
}, nil
|
|
}
|
|
|
|
func (s *ControlServer[V]) AnnounceExpiry(ctx context.Context, req *messages.ExpiryAnnounce) (*messages.OwnerChangeAck, error) {
|
|
err := s.pool.HandleRemoteExpiry(req.Host, req.Ids)
|
|
return &messages.OwnerChangeAck{
|
|
Accepted: err == nil,
|
|
Message: "expiry acknowledged",
|
|
}, nil
|
|
}
|
|
|
|
// ControlPlane: Ping
|
|
func (s *ControlServer[V]) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) {
|
|
// log.Printf("got ping")
|
|
return &messages.PingReply{
|
|
Host: s.pool.Hostname(),
|
|
UnixTime: time.Now().Unix(),
|
|
}, nil
|
|
}
|
|
|
|
// ControlPlane: Negotiate (merge host views)
|
|
func (s *ControlServer[V]) Negotiate(ctx context.Context, req *messages.NegotiateRequest) (*messages.NegotiateReply, error) {
|
|
|
|
s.pool.Negotiate(req.KnownHosts)
|
|
return &messages.NegotiateReply{Hosts: req.GetKnownHosts()}, nil
|
|
}
|
|
|
|
// ControlPlane: GetCartIds (locally owned carts only)
|
|
func (s *ControlServer[V]) GetLocalActorIds(ctx context.Context, _ *messages.Empty) (*messages.ActorIdsReply, error) {
|
|
return &messages.ActorIdsReply{Ids: s.pool.GetLocalIds()}, nil
|
|
}
|
|
|
|
// ControlPlane: Closing (peer shutdown notification)
|
|
func (s *ControlServer[V]) Closing(ctx context.Context, req *messages.ClosingNotice) (*messages.OwnerChangeAck, error) {
|
|
if req.GetHost() != "" {
|
|
s.pool.RemoveHost(req.GetHost())
|
|
}
|
|
return &messages.OwnerChangeAck{
|
|
Accepted: true,
|
|
Message: "removed host",
|
|
}, nil
|
|
}
|
|
|
|
type ServerConfig struct {
|
|
Addr string
|
|
Options []grpc.ServerOption
|
|
}
|
|
|
|
func NewServerConfig(addr string, options ...grpc.ServerOption) ServerConfig {
|
|
return ServerConfig{
|
|
Addr: addr,
|
|
Options: options,
|
|
}
|
|
}
|
|
|
|
func DefaultServerConfig() ServerConfig {
|
|
return NewServerConfig(":1337")
|
|
}
|
|
|
|
// StartGRPCServer configures and starts the unified gRPC server on the given address.
|
|
// It registers both the CartActor and ControlPlane services.
|
|
func NewControlServer[V any](config ServerConfig, pool GrainPool[V]) (*grpc.Server, error) {
|
|
lis, err := net.Listen("tcp", config.Addr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to listen: %w", err)
|
|
}
|
|
|
|
grpcServer := grpc.NewServer(config.Options...)
|
|
server := &ControlServer[V]{
|
|
pool: pool,
|
|
}
|
|
|
|
messages.RegisterControlPlaneServer(grpcServer, server)
|
|
reflection.Register(grpcServer)
|
|
|
|
log.Printf("gRPC server listening as %s on %s", pool.Hostname(), config.Addr)
|
|
go func() {
|
|
if err := grpcServer.Serve(lis); err != nil {
|
|
log.Fatalf("failed to serve gRPC: %v", err)
|
|
}
|
|
}()
|
|
|
|
return grpcServer, nil
|
|
}
|