225 lines
6.8 KiB
Go
225 lines
6.8 KiB
Go
package actor
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"time"
|
|
|
|
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
|
"go.opentelemetry.io/contrib/bridges/otelslog"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/reflection"
|
|
)
|
|
|
|
// ControlServer implements the ControlPlane gRPC services.
|
|
// It delegates to a grain pool and cluster operations to a synced pool.
|
|
type ControlServer[V any] struct {
|
|
messages.UnimplementedControlPlaneServer
|
|
pool GrainPool[V]
|
|
}
|
|
|
|
const name = "grpc_server"
|
|
|
|
var (
|
|
tracer = otel.Tracer(name)
|
|
meter = otel.Meter(name)
|
|
logger = otelslog.NewLogger(name)
|
|
pingCalls metric.Int64Counter
|
|
negotiateCalls metric.Int64Counter
|
|
getLocalActorIdsCalls metric.Int64Counter
|
|
announceOwnershipCalls metric.Int64Counter
|
|
announceExpiryCalls metric.Int64Counter
|
|
closingCalls metric.Int64Counter
|
|
)
|
|
|
|
func init() {
|
|
var err error
|
|
pingCalls, err = meter.Int64Counter("grpc.ping_calls",
|
|
metric.WithDescription("Number of ping calls"),
|
|
metric.WithUnit("{calls}"))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
negotiateCalls, err = meter.Int64Counter("grpc.negotiate_calls",
|
|
metric.WithDescription("Number of negotiate calls"),
|
|
metric.WithUnit("{calls}"))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
getLocalActorIdsCalls, err = meter.Int64Counter("grpc.get_local_actor_ids_calls",
|
|
metric.WithDescription("Number of get local actor ids calls"),
|
|
metric.WithUnit("{calls}"))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
announceOwnershipCalls, err = meter.Int64Counter("grpc.announce_ownership_calls",
|
|
metric.WithDescription("Number of announce ownership calls"),
|
|
metric.WithUnit("{calls}"))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
announceExpiryCalls, err = meter.Int64Counter("grpc.announce_expiry_calls",
|
|
metric.WithDescription("Number of announce expiry calls"),
|
|
metric.WithUnit("{calls}"))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
closingCalls, err = meter.Int64Counter("grpc.closing_calls",
|
|
metric.WithDescription("Number of closing calls"),
|
|
metric.WithUnit("{calls}"))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (s *ControlServer[V]) AnnounceOwnership(ctx context.Context, req *messages.OwnershipAnnounce) (*messages.OwnerChangeAck, error) {
|
|
ctx, span := tracer.Start(ctx, "grpc_announce_ownership")
|
|
defer span.End()
|
|
span.SetAttributes(
|
|
attribute.String("host", req.Host),
|
|
attribute.Int("id_count", len(req.Ids)),
|
|
)
|
|
logger.InfoContext(ctx, "announce ownership", "host", req.Host, "id_count", len(req.Ids))
|
|
announceOwnershipCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", req.Host)))
|
|
|
|
err := s.pool.HandleOwnershipChange(req.Host, req.Ids)
|
|
if err != nil {
|
|
span.RecordError(err)
|
|
return &messages.OwnerChangeAck{
|
|
Accepted: false,
|
|
Message: "owner change failed",
|
|
}, err
|
|
}
|
|
|
|
log.Printf("Ack count: %d", len(req.Ids))
|
|
return &messages.OwnerChangeAck{
|
|
Accepted: true,
|
|
Message: "ownership announced",
|
|
}, nil
|
|
}
|
|
|
|
func (s *ControlServer[V]) AnnounceExpiry(ctx context.Context, req *messages.ExpiryAnnounce) (*messages.OwnerChangeAck, error) {
|
|
ctx, span := tracer.Start(ctx, "grpc_announce_expiry")
|
|
defer span.End()
|
|
span.SetAttributes(
|
|
attribute.String("host", req.Host),
|
|
attribute.Int("id_count", len(req.Ids)),
|
|
)
|
|
logger.InfoContext(ctx, "announce expiry", "host", req.Host, "id_count", len(req.Ids))
|
|
announceExpiryCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", req.Host)))
|
|
|
|
err := s.pool.HandleRemoteExpiry(req.Host, req.Ids)
|
|
if err != nil {
|
|
span.RecordError(err)
|
|
}
|
|
return &messages.OwnerChangeAck{
|
|
Accepted: err == nil,
|
|
Message: "expiry acknowledged",
|
|
}, nil
|
|
}
|
|
|
|
// ControlPlane: Ping
|
|
func (s *ControlServer[V]) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) {
|
|
ctx, span := tracer.Start(ctx, "grpc_ping")
|
|
defer span.End()
|
|
host := s.pool.Hostname()
|
|
span.SetAttributes(attribute.String("host", host))
|
|
logger.InfoContext(ctx, "ping received", "host", host)
|
|
pingCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", host)))
|
|
|
|
// log.Printf("got ping")
|
|
return &messages.PingReply{
|
|
Host: host,
|
|
UnixTime: time.Now().Unix(),
|
|
}, nil
|
|
}
|
|
|
|
// ControlPlane: Negotiate (merge host views)
|
|
func (s *ControlServer[V]) Negotiate(ctx context.Context, req *messages.NegotiateRequest) (*messages.NegotiateReply, error) {
|
|
ctx, span := tracer.Start(ctx, "grpc_negotiate")
|
|
defer span.End()
|
|
span.SetAttributes(attribute.Int("known_hosts_count", len(req.KnownHosts)))
|
|
logger.InfoContext(ctx, "negotiate", "known_hosts_count", len(req.KnownHosts))
|
|
negotiateCalls.Add(ctx, 1)
|
|
|
|
s.pool.Negotiate(req.KnownHosts)
|
|
return &messages.NegotiateReply{Hosts: req.GetKnownHosts()}, nil
|
|
}
|
|
|
|
// ControlPlane: GetCartIds (locally owned carts only)
|
|
func (s *ControlServer[V]) GetLocalActorIds(ctx context.Context, _ *messages.Empty) (*messages.ActorIdsReply, error) {
|
|
ctx, span := tracer.Start(ctx, "grpc_get_local_actor_ids")
|
|
defer span.End()
|
|
ids := s.pool.GetLocalIds()
|
|
span.SetAttributes(attribute.Int("id_count", len(ids)))
|
|
logger.InfoContext(ctx, "get local actor ids", "id_count", len(ids))
|
|
getLocalActorIdsCalls.Add(ctx, 1)
|
|
|
|
return &messages.ActorIdsReply{Ids: ids}, nil
|
|
}
|
|
|
|
// ControlPlane: Closing (peer shutdown notification)
|
|
func (s *ControlServer[V]) Closing(ctx context.Context, req *messages.ClosingNotice) (*messages.OwnerChangeAck, error) {
|
|
ctx, span := tracer.Start(ctx, "grpc_closing")
|
|
defer span.End()
|
|
host := req.GetHost()
|
|
span.SetAttributes(attribute.String("host", host))
|
|
logger.InfoContext(ctx, "closing notice", "host", host)
|
|
closingCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", host)))
|
|
|
|
if host != "" {
|
|
s.pool.RemoveHost(host)
|
|
}
|
|
return &messages.OwnerChangeAck{
|
|
Accepted: true,
|
|
Message: "removed host",
|
|
}, nil
|
|
}
|
|
|
|
type ServerConfig struct {
|
|
Addr string
|
|
Options []grpc.ServerOption
|
|
}
|
|
|
|
func NewServerConfig(addr string, options ...grpc.ServerOption) ServerConfig {
|
|
return ServerConfig{
|
|
Addr: addr,
|
|
Options: options,
|
|
}
|
|
}
|
|
|
|
func DefaultServerConfig() ServerConfig {
|
|
return NewServerConfig(":1337")
|
|
}
|
|
|
|
// StartGRPCServer configures and starts the unified gRPC server on the given address.
|
|
// It registers both the CartActor and ControlPlane services.
|
|
func NewControlServer[V any](config ServerConfig, pool GrainPool[V]) (*grpc.Server, error) {
|
|
lis, err := net.Listen("tcp", config.Addr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to listen: %w", err)
|
|
}
|
|
|
|
grpcServer := grpc.NewServer(config.Options...)
|
|
server := &ControlServer[V]{
|
|
pool: pool,
|
|
}
|
|
|
|
messages.RegisterControlPlaneServer(grpcServer, server)
|
|
reflection.Register(grpcServer)
|
|
|
|
log.Printf("gRPC server listening as %s on %s", pool.Hostname(), config.Addr)
|
|
go func() {
|
|
if err := grpcServer.Serve(lis); err != nil {
|
|
log.Fatalf("failed to serve gRPC: %v", err)
|
|
}
|
|
}()
|
|
|
|
return grpcServer, nil
|
|
}
|