diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 589c206..c6e66ec 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -183,7 +183,7 @@ func main() { return nil }, SpawnHost: func(host string) (actor.Host, error) { - return proxy.NewRemoteHost(host) + return proxy.NewRemoteHost[cart.CartGrain](host) }, TTL: 5 * time.Minute, PoolSize: 2 * 65535, diff --git a/pkg/actor/grpc_server.go b/pkg/actor/grpc_server.go index 91b63e1..5a2ad45 100644 --- a/pkg/actor/grpc_server.go +++ b/pkg/actor/grpc_server.go @@ -2,6 +2,7 @@ package actor import ( "context" + "encoding/json" "fmt" "log" "net" @@ -15,6 +16,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/reflection" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" ) // ControlServer implements the ControlPlane gRPC services. @@ -105,6 +107,22 @@ func (s *ControlServer[V]) AnnounceOwnership(ctx context.Context, req *messages. }, nil } +func (s *ControlServer[V]) Get(ctx context.Context, req *messages.GetRequest) (*messages.GetReply, error) { + grain, err := s.pool.Get(ctx, req.Id) + if err != nil { + return nil, err + } + data, err := json.Marshal(grain) + if err != nil { + return nil, err + } + return &messages.GetReply{ + Grain: &anypb.Any{ + Value: data, + }, + }, nil +} + func (s *ControlServer[V]) AnnounceExpiry(ctx context.Context, req *messages.ExpiryAnnounce) (*messages.OwnerChangeAck, error) { ctx, span := tracer.Start(ctx, "grpc_announce_expiry") defer span.End() diff --git a/pkg/messages/control_plane.pb.go b/pkg/messages/control_plane.pb.go index 4bcde6c..c3d6f0c 100644 --- a/pkg/messages/control_plane.pb.go +++ b/pkg/messages/control_plane.pb.go @@ -504,6 +504,94 @@ func (x *ApplyRequest) GetMessages() []*anypb.Any { return nil } +type GetRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetRequest) Reset() { + *x = GetRequest{} + mi := &file_control_plane_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRequest) ProtoMessage() {} + +func (x *GetRequest) ProtoReflect() protoreflect.Message { + mi := &file_control_plane_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRequest.ProtoReflect.Descriptor instead. +func (*GetRequest) Descriptor() ([]byte, []int) { + return file_control_plane_proto_rawDescGZIP(), []int{10} +} + +func (x *GetRequest) GetId() uint64 { + if x != nil { + return x.Id + } + return 0 +} + +type GetReply struct { + state protoimpl.MessageState `protogen:"open.v1"` + Grain *anypb.Any `protobuf:"bytes,1,opt,name=grain,proto3" json:"grain,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetReply) Reset() { + *x = GetReply{} + mi := &file_control_plane_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetReply) ProtoMessage() {} + +func (x *GetReply) ProtoReflect() protoreflect.Message { + mi := &file_control_plane_proto_msgTypes[11] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetReply.ProtoReflect.Descriptor instead. +func (*GetReply) Descriptor() ([]byte, []int) { + return file_control_plane_proto_rawDescGZIP(), []int{11} +} + +func (x *GetReply) GetGrain() *anypb.Any { + if x != nil { + return x.Grain + } + return nil +} + type ApplyResult struct { state protoimpl.MessageState `protogen:"open.v1"` Accepted bool `protobuf:"varint,1,opt,name=accepted,proto3" json:"accepted,omitempty"` @@ -513,7 +601,7 @@ type ApplyResult struct { func (x *ApplyResult) Reset() { *x = ApplyResult{} - mi := &file_control_plane_proto_msgTypes[10] + mi := &file_control_plane_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -525,7 +613,7 @@ func (x *ApplyResult) String() string { func (*ApplyResult) ProtoMessage() {} func (x *ApplyResult) ProtoReflect() protoreflect.Message { - mi := &file_control_plane_proto_msgTypes[10] + mi := &file_control_plane_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -538,7 +626,7 @@ func (x *ApplyResult) ProtoReflect() protoreflect.Message { // Deprecated: Use ApplyResult.ProtoReflect.Descriptor instead. func (*ApplyResult) Descriptor() ([]byte, []int) { - return file_control_plane_proto_rawDescGZIP(), []int{10} + return file_control_plane_proto_rawDescGZIP(), []int{12} } func (x *ApplyResult) GetAccepted() bool { @@ -586,41 +674,50 @@ var file_control_plane_proto_rawDesc = string([]byte{ 0x64, 0x12, 0x30, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x73, 0x22, 0x29, 0x0a, 0x0b, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x73, 0x75, - 0x6c, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x65, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x65, 0x64, 0x32, 0xc5, - 0x03, 0x0a, 0x0c, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x50, 0x6c, 0x61, 0x6e, 0x65, 0x12, - 0x2c, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x73, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x73, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x41, 0x0a, - 0x09, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x12, 0x1a, 0x2e, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x73, 0x2e, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, - 0x12, 0x3c, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x41, 0x63, 0x74, 0x6f, - 0x72, 0x49, 0x64, 0x73, 0x12, 0x0f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, - 0x2e, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x64, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x4a, - 0x0a, 0x11, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x73, - 0x68, 0x69, 0x70, 0x12, 0x1b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, - 0x77, 0x6e, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, - 0x1a, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, - 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, 0x36, 0x0a, 0x05, 0x41, 0x70, - 0x70, 0x6c, 0x79, 0x12, 0x16, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x41, - 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x73, 0x75, - 0x6c, 0x74, 0x12, 0x44, 0x0a, 0x0e, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x45, 0x78, - 0x70, 0x69, 0x72, 0x79, 0x12, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, - 0x45, 0x78, 0x70, 0x69, 0x72, 0x79, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x1a, 0x18, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, - 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, 0x3c, 0x0a, 0x07, 0x43, 0x6c, 0x6f, 0x73, - 0x69, 0x6e, 0x67, 0x12, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x43, - 0x6c, 0x6f, 0x73, 0x69, 0x6e, 0x67, 0x4e, 0x6f, 0x74, 0x69, 0x63, 0x65, 0x1a, 0x18, 0x2e, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61, - 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x2e, 0x6b, 0x36, - 0x6e, 0x2e, 0x6e, 0x65, 0x74, 0x2f, 0x67, 0x6f, 0x2d, 0x63, 0x61, 0x72, 0x74, 0x2d, 0x61, 0x63, - 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x67, 0x65, 0x73, 0x22, 0x1c, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x02, 0x69, + 0x64, 0x22, 0x36, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x2a, 0x0a, + 0x05, 0x67, 0x72, 0x61, 0x69, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, + 0x6e, 0x79, 0x52, 0x05, 0x67, 0x72, 0x61, 0x69, 0x6e, 0x22, 0x29, 0x0a, 0x0b, 0x41, 0x70, 0x70, + 0x6c, 0x79, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x63, 0x63, 0x65, + 0x70, 0x74, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x61, 0x63, 0x63, 0x65, + 0x70, 0x74, 0x65, 0x64, 0x32, 0xf6, 0x03, 0x0a, 0x0c, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, + 0x50, 0x6c, 0x61, 0x6e, 0x65, 0x12, 0x2c, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0f, 0x2e, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x13, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, + 0x70, 0x6c, 0x79, 0x12, 0x41, 0x0a, 0x09, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, + 0x12, 0x1a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4e, 0x65, 0x67, 0x6f, + 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, + 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x3c, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x4c, 0x6f, 0x63, + 0x61, 0x6c, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x64, 0x73, 0x12, 0x0f, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x64, 0x73, 0x52, + 0x65, 0x70, 0x6c, 0x79, 0x12, 0x4a, 0x0a, 0x11, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, + 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x12, 0x1b, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x41, 0x6e, + 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x1a, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, + 0x12, 0x36, 0x0a, 0x05, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x12, 0x16, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x73, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x15, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x41, 0x70, 0x70, + 0x6c, 0x79, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x44, 0x0a, 0x0e, 0x41, 0x6e, 0x6e, 0x6f, + 0x75, 0x6e, 0x63, 0x65, 0x45, 0x78, 0x70, 0x69, 0x72, 0x79, 0x12, 0x18, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x45, 0x78, 0x70, 0x69, 0x72, 0x79, 0x41, 0x6e, 0x6e, 0x6f, + 0x75, 0x6e, 0x63, 0x65, 0x1a, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, + 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, 0x3c, + 0x0a, 0x07, 0x43, 0x6c, 0x6f, 0x73, 0x69, 0x6e, 0x67, 0x12, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x73, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x69, 0x6e, 0x67, 0x4e, 0x6f, 0x74, 0x69, + 0x63, 0x65, 0x1a, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, + 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, 0x2f, 0x0a, 0x03, + 0x47, 0x65, 0x74, 0x12, 0x14, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x47, + 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x73, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x42, 0x2a, 0x5a, + 0x28, 0x67, 0x69, 0x74, 0x2e, 0x6b, 0x36, 0x6e, 0x2e, 0x6e, 0x65, 0x74, 0x2f, 0x67, 0x6f, 0x2d, + 0x63, 0x61, 0x72, 0x74, 0x2d, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x3b, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, }) var ( @@ -635,7 +732,7 @@ func file_control_plane_proto_rawDescGZIP() []byte { return file_control_plane_proto_rawDescData } -var file_control_plane_proto_msgTypes = make([]protoimpl.MessageInfo, 11) +var file_control_plane_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_control_plane_proto_goTypes = []any{ (*Empty)(nil), // 0: messages.Empty (*PingReply)(nil), // 1: messages.PingReply @@ -647,30 +744,35 @@ var file_control_plane_proto_goTypes = []any{ (*OwnershipAnnounce)(nil), // 7: messages.OwnershipAnnounce (*ExpiryAnnounce)(nil), // 8: messages.ExpiryAnnounce (*ApplyRequest)(nil), // 9: messages.ApplyRequest - (*ApplyResult)(nil), // 10: messages.ApplyResult - (*anypb.Any)(nil), // 11: google.protobuf.Any + (*GetRequest)(nil), // 10: messages.GetRequest + (*GetReply)(nil), // 11: messages.GetReply + (*ApplyResult)(nil), // 12: messages.ApplyResult + (*anypb.Any)(nil), // 13: google.protobuf.Any } var file_control_plane_proto_depIdxs = []int32{ - 11, // 0: messages.ApplyRequest.messages:type_name -> google.protobuf.Any - 0, // 1: messages.ControlPlane.Ping:input_type -> messages.Empty - 2, // 2: messages.ControlPlane.Negotiate:input_type -> messages.NegotiateRequest - 0, // 3: messages.ControlPlane.GetLocalActorIds:input_type -> messages.Empty - 7, // 4: messages.ControlPlane.AnnounceOwnership:input_type -> messages.OwnershipAnnounce - 9, // 5: messages.ControlPlane.Apply:input_type -> messages.ApplyRequest - 8, // 6: messages.ControlPlane.AnnounceExpiry:input_type -> messages.ExpiryAnnounce - 6, // 7: messages.ControlPlane.Closing:input_type -> messages.ClosingNotice - 1, // 8: messages.ControlPlane.Ping:output_type -> messages.PingReply - 3, // 9: messages.ControlPlane.Negotiate:output_type -> messages.NegotiateReply - 4, // 10: messages.ControlPlane.GetLocalActorIds:output_type -> messages.ActorIdsReply - 5, // 11: messages.ControlPlane.AnnounceOwnership:output_type -> messages.OwnerChangeAck - 10, // 12: messages.ControlPlane.Apply:output_type -> messages.ApplyResult - 5, // 13: messages.ControlPlane.AnnounceExpiry:output_type -> messages.OwnerChangeAck - 5, // 14: messages.ControlPlane.Closing:output_type -> messages.OwnerChangeAck - 8, // [8:15] is the sub-list for method output_type - 1, // [1:8] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 13, // 0: messages.ApplyRequest.messages:type_name -> google.protobuf.Any + 13, // 1: messages.GetReply.grain:type_name -> google.protobuf.Any + 0, // 2: messages.ControlPlane.Ping:input_type -> messages.Empty + 2, // 3: messages.ControlPlane.Negotiate:input_type -> messages.NegotiateRequest + 0, // 4: messages.ControlPlane.GetLocalActorIds:input_type -> messages.Empty + 7, // 5: messages.ControlPlane.AnnounceOwnership:input_type -> messages.OwnershipAnnounce + 9, // 6: messages.ControlPlane.Apply:input_type -> messages.ApplyRequest + 8, // 7: messages.ControlPlane.AnnounceExpiry:input_type -> messages.ExpiryAnnounce + 6, // 8: messages.ControlPlane.Closing:input_type -> messages.ClosingNotice + 10, // 9: messages.ControlPlane.Get:input_type -> messages.GetRequest + 1, // 10: messages.ControlPlane.Ping:output_type -> messages.PingReply + 3, // 11: messages.ControlPlane.Negotiate:output_type -> messages.NegotiateReply + 4, // 12: messages.ControlPlane.GetLocalActorIds:output_type -> messages.ActorIdsReply + 5, // 13: messages.ControlPlane.AnnounceOwnership:output_type -> messages.OwnerChangeAck + 12, // 14: messages.ControlPlane.Apply:output_type -> messages.ApplyResult + 5, // 15: messages.ControlPlane.AnnounceExpiry:output_type -> messages.OwnerChangeAck + 5, // 16: messages.ControlPlane.Closing:output_type -> messages.OwnerChangeAck + 11, // 17: messages.ControlPlane.Get:output_type -> messages.GetReply + 10, // [10:18] is the sub-list for method output_type + 2, // [2:10] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_control_plane_proto_init() } @@ -684,7 +786,7 @@ func file_control_plane_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_control_plane_proto_rawDesc), len(file_control_plane_proto_rawDesc)), NumEnums: 0, - NumMessages: 11, + NumMessages: 13, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/messages/control_plane_grpc.pb.go b/pkg/messages/control_plane_grpc.pb.go index f328d33..c3fd3ad 100644 --- a/pkg/messages/control_plane_grpc.pb.go +++ b/pkg/messages/control_plane_grpc.pb.go @@ -26,6 +26,7 @@ const ( ControlPlane_Apply_FullMethodName = "/messages.ControlPlane/Apply" ControlPlane_AnnounceExpiry_FullMethodName = "/messages.ControlPlane/AnnounceExpiry" ControlPlane_Closing_FullMethodName = "/messages.ControlPlane/Closing" + ControlPlane_Get_FullMethodName = "/messages.ControlPlane/Get" ) // ControlPlaneClient is the client API for ControlPlane service. @@ -47,6 +48,7 @@ type ControlPlaneClient interface { AnnounceExpiry(ctx context.Context, in *ExpiryAnnounce, opts ...grpc.CallOption) (*OwnerChangeAck, error) // Closing announces graceful shutdown so peers can proactively adjust. Closing(ctx context.Context, in *ClosingNotice, opts ...grpc.CallOption) (*OwnerChangeAck, error) + Get(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetReply, error) } type controlPlaneClient struct { @@ -127,6 +129,16 @@ func (c *controlPlaneClient) Closing(ctx context.Context, in *ClosingNotice, opt return out, nil } +func (c *controlPlaneClient) Get(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetReply, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetReply) + err := c.cc.Invoke(ctx, ControlPlane_Get_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // ControlPlaneServer is the server API for ControlPlane service. // All implementations must embed UnimplementedControlPlaneServer // for forward compatibility. @@ -146,6 +158,7 @@ type ControlPlaneServer interface { AnnounceExpiry(context.Context, *ExpiryAnnounce) (*OwnerChangeAck, error) // Closing announces graceful shutdown so peers can proactively adjust. Closing(context.Context, *ClosingNotice) (*OwnerChangeAck, error) + Get(context.Context, *GetRequest) (*GetReply, error) mustEmbedUnimplementedControlPlaneServer() } @@ -177,6 +190,9 @@ func (UnimplementedControlPlaneServer) AnnounceExpiry(context.Context, *ExpiryAn func (UnimplementedControlPlaneServer) Closing(context.Context, *ClosingNotice) (*OwnerChangeAck, error) { return nil, status.Errorf(codes.Unimplemented, "method Closing not implemented") } +func (UnimplementedControlPlaneServer) Get(context.Context, *GetRequest) (*GetReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method Get not implemented") +} func (UnimplementedControlPlaneServer) mustEmbedUnimplementedControlPlaneServer() {} func (UnimplementedControlPlaneServer) testEmbeddedByValue() {} @@ -324,6 +340,24 @@ func _ControlPlane_Closing_Handler(srv interface{}, ctx context.Context, dec fun return interceptor(ctx, in, info, handler) } +func _ControlPlane_Get_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ControlPlaneServer).Get(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ControlPlane_Get_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ControlPlaneServer).Get(ctx, req.(*GetRequest)) + } + return interceptor(ctx, in, info, handler) +} + // ControlPlane_ServiceDesc is the grpc.ServiceDesc for ControlPlane service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -359,6 +393,10 @@ var ControlPlane_ServiceDesc = grpc.ServiceDesc{ MethodName: "Closing", Handler: _ControlPlane_Closing_Handler, }, + { + MethodName: "Get", + Handler: _ControlPlane_Get_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "control_plane.proto", diff --git a/pkg/proxy/remotehost.go b/pkg/proxy/remotehost.go index bc42e2b..f0864c5 100644 --- a/pkg/proxy/remotehost.go +++ b/pkg/proxy/remotehost.go @@ -3,6 +3,7 @@ package proxy import ( "bytes" "context" + "encoding/json" "errors" "fmt" "io" @@ -22,7 +23,7 @@ import ( // RemoteHost mirrors the lightweight controller used for remote node // interaction. -type RemoteHost struct { +type RemoteHost[V any] struct { host string httpBase string conn *grpc.ClientConn @@ -67,7 +68,7 @@ func (m *MockResponseWriter) WriteHeader(statusCode int) { m.StatusCode = statusCode } -func NewRemoteHost(host string) (*RemoteHost, error) { +func NewRemoteHost[V any](host string) (*RemoteHost[V], error) { target := fmt.Sprintf("%s:1337", host) @@ -88,7 +89,7 @@ func NewRemoteHost(host string) (*RemoteHost, error) { } client := &http.Client{Transport: transport, Timeout: 10 * time.Second} - return &RemoteHost{ + return &RemoteHost[V]{ host: host, httpBase: fmt.Sprintf("http://%s:8080", host), conn: conn, @@ -99,18 +100,18 @@ func NewRemoteHost(host string) (*RemoteHost, error) { }, nil } -func (h *RemoteHost) Name() string { +func (h *RemoteHost[V]) Name() string { return h.host } -func (h *RemoteHost) Close() error { +func (h *RemoteHost[V]) Close() error { if h.conn != nil { h.conn.Close() } return nil } -func (h *RemoteHost) Ping() bool { +func (h *RemoteHost[V]) Ping() bool { var err error = errors.ErrUnsupported for err != nil { ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -130,7 +131,17 @@ func (h *RemoteHost) Ping() bool { return true } -func (h *RemoteHost) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (bool, error) { +func (h *RemoteHost[V]) Get(ctx context.Context, id uint64, grain *V) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + reply, error := h.controlClient.Get(ctx, &messages.GetRequest{Id: id}) + if error != nil { + return error + } + return json.Unmarshal(reply.Grain.Value, grain) +} + +func (h *RemoteHost[V]) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (bool, error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -156,7 +167,7 @@ func (h *RemoteHost) Apply(ctx context.Context, id uint64, mutation ...proto.Mes return resp.Accepted, nil } -func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) { +func (h *RemoteHost[V]) Negotiate(knownHosts []string) ([]string, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -172,7 +183,7 @@ func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) { return resp.Hosts, nil } -func (h *RemoteHost) GetActorIds() []uint64 { +func (h *RemoteHost[V]) GetActorIds() []uint64 { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() reply, err := h.controlClient.GetLocalActorIds(ctx, &messages.Empty{}) @@ -184,7 +195,7 @@ func (h *RemoteHost) GetActorIds() []uint64 { return reply.GetIds() } -func (h *RemoteHost) AnnounceOwnership(ownerHost string, uids []uint64) { +func (h *RemoteHost[V]) AnnounceOwnership(ownerHost string, uids []uint64) { _, err := h.controlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{ Host: ownerHost, Ids: uids, @@ -197,7 +208,7 @@ func (h *RemoteHost) AnnounceOwnership(ownerHost string, uids []uint64) { h.missedPings = 0 } -func (h *RemoteHost) AnnounceExpiry(uids []uint64) { +func (h *RemoteHost[V]) AnnounceExpiry(uids []uint64) { _, err := h.controlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{ Host: h.host, Ids: uids, @@ -210,7 +221,7 @@ func (h *RemoteHost) AnnounceExpiry(uids []uint64) { h.missedPings = 0 } -func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request, customBody io.Reader) (bool, error) { +func (h *RemoteHost[V]) Proxy(id uint64, w http.ResponseWriter, r *http.Request, customBody io.Reader) (bool, error) { target := fmt.Sprintf("%s%s", h.httpBase, r.URL.RequestURI()) ctx, span := tracer.Start(r.Context(), "remote_proxy") @@ -267,6 +278,6 @@ func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request, cu } -func (r *RemoteHost) IsHealthy() bool { +func (r *RemoteHost[V]) IsHealthy() bool { return r.missedPings < 3 } diff --git a/proto/control_plane.proto b/proto/control_plane.proto index 399d064..80b7b1a 100644 --- a/proto/control_plane.proto +++ b/proto/control_plane.proto @@ -73,6 +73,14 @@ message ApplyRequest { repeated google.protobuf.Any messages = 2; } +message GetRequest { + uint64 id = 1; +} + +message GetReply { + google.protobuf.Any grain = 1; +} + message ApplyResult { bool accepted = 1; } @@ -99,6 +107,7 @@ service ControlPlane { // Closing announces graceful shutdown so peers can proactively adjust. rpc Closing(ClosingNotice) returns (OwnerChangeAck); + rpc Get(GetRequest) returns (GetReply); } // -----------------------------------------------------------------------------