diff --git a/cmd/cart/k8s-host-discovery.go b/cmd/cart/k8s-host-discovery.go index b654bb8..1e8f165 100644 --- a/cmd/cart/k8s-host-discovery.go +++ b/cmd/cart/k8s-host-discovery.go @@ -3,8 +3,6 @@ package main import ( "log" - "git.k6n.net/go-cart-actor/pkg/actor" - "git.k6n.net/go-cart-actor/pkg/cart" "git.k6n.net/go-cart-actor/pkg/discovery" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -32,7 +30,7 @@ func GetDiscovery() discovery.Discovery { }) } -func UseDiscovery(pool actor.GrainPool[*cart.CartGrain]) { +func UseDiscovery(pool discovery.DiscoveryTarget) { go func(hw discovery.Discovery) { if hw == nil { diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 52c9f72..33ad4ff 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -178,7 +178,7 @@ func main() { return nil }, - SpawnHost: func(host string) (actor.Host, error) { + SpawnHost: func(host string) (actor.Host[cart.CartGrain], error) { return proxy.NewRemoteHost[cart.CartGrain](host) }, TTL: 5 * time.Minute, @@ -201,7 +201,7 @@ func main() { mux := http.NewServeMux() debugMux := http.NewServeMux() - grpcSrv, err := actor.NewControlServer[*cart.CartGrain](controlPlaneConfig, pool) + grpcSrv, err := actor.NewControlServer[cart.CartGrain](controlPlaneConfig, pool) if err != nil { log.Fatalf("Error starting control plane gRPC server: %v\n", err) } diff --git a/cmd/cart/pool-server.go b/cmd/cart/pool-server.go index 9ff2015..89f1eb2 100644 --- a/cmd/cart/pool-server.go +++ b/cmd/cart/pool-server.go @@ -41,13 +41,13 @@ var ( ) type PoolServer struct { - actor.GrainPool[*cart.CartGrain] + actor.GrainPool[cart.CartGrain] pod_name string inventoryService inventory.InventoryService reservationService inventory.CartReservationService } -func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, inventoryService inventory.InventoryService, inventoryReservationService inventory.CartReservationService) *PoolServer { +func NewPoolServer(pool actor.GrainPool[cart.CartGrain], pod_name string, inventoryService inventory.InventoryService, inventoryReservationService inventory.CartReservationService) *PoolServer { srv := &PoolServer{ GrainPool: pool, pod_name: pod_name, @@ -58,7 +58,7 @@ func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, inven return srv } -func (s *PoolServer) ApplyLocal(ctx context.Context, id cart.CartId, mutation ...proto.Message) (*actor.MutationResult[*cart.CartGrain], error) { +func (s *PoolServer) ApplyLocal(ctx context.Context, id cart.CartId, mutation ...proto.Message) (*actor.MutationResult[cart.CartGrain], error) { return s.Apply(ctx, uint64(id), mutation...) } @@ -460,10 +460,7 @@ func (s *PoolServer) InternalApplyMutationHandler(w http.ResponseWriter, r *http func (s *PoolServer) GetAnywhere(ctx context.Context, cartId cart.CartId) (*cart.CartGrain, error) { id := uint64(cartId) if host, isOnOtherHost := s.OwnerHost(id); isOnOtherHost { - grain := &cart.CartGrain{} - - err := host.Get(ctx, id, grain) - return grain, err + return host.Get(ctx, id) } return s.Get(ctx, id) } diff --git a/cmd/checkout/adyen-handlers.go b/cmd/checkout/adyen-handlers.go index dc6a550..9dd44a6 100644 --- a/cmd/checkout/adyen-handlers.go +++ b/cmd/checkout/adyen-handlers.go @@ -57,13 +57,13 @@ type SessionRequest struct { // } -func getCheckoutIdFromNotificationItem(item webhook.NotificationRequestItem) (uint64, error) { +func getCheckoutIdFromNotificationItem(item webhook.NotificationRequestItem) (*cart.CartId, error) { cartId, ok := cart.ParseCartId(item.MerchantReference) if !ok { log.Printf("The notification does not have a valid cartId: %s", item.MerchantReference) - return 0, errors.New("invalid cart id") + return nil, errors.New("invalid cart id") } - return uint64(cartId), nil + return &cartId, nil } func (s *CheckoutPoolServer) AdyenHookHandler(w http.ResponseWriter, r *http.Request) { @@ -105,7 +105,7 @@ func (s *CheckoutPoolServer) AdyenHookHandler(w http.ResponseWriter, r *http.Req // If successful, apply payment completed //if isSuccess { - if err := s.applyAnywhere(r.Context(), checkoutId, + if err := s.ApplyAnywhere(r.Context(), *checkoutId, &messages.PaymentEvent{ PaymentId: item.PspReference, Success: isSuccess, @@ -154,7 +154,7 @@ func (s *CheckoutPoolServer) AdyenHookHandler(w http.ResponseWriter, r *http.Req Message: item.Reason, }) } - if err := s.applyAnywhere(r.Context(), checkoutId, msgs...); err != nil { + if err := s.ApplyAnywhere(r.Context(), *checkoutId, msgs...); err != nil { log.Printf("error applying authorization event: %v", err) http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -165,7 +165,7 @@ func (s *CheckoutPoolServer) AdyenHookHandler(w http.ResponseWriter, r *http.Req pspReference := item.PspReference uid := uuid.New().String() - ref := cart.CartId(checkoutId).String() + ref := checkoutId.String() req := service.ModificationsApi.CaptureAuthorisedPaymentInput(pspReference).IdempotencyKey(uid).PaymentCaptureRequest(adyenCheckout.PaymentCaptureRequest{ Amount: adyenCheckout.Amount(item.Amount), MerchantAccount: "ElgigantenECOM", @@ -176,7 +176,7 @@ func (s *CheckoutPoolServer) AdyenHookHandler(w http.ResponseWriter, r *http.Req log.Printf("Error capturing payment: %v", err) } else { log.Printf("Payment captured successfully: %+v", res) - s.Apply(r.Context(), checkoutId, &messages.OrderCreated{ + s.ApplyAnywhere(r.Context(), *checkoutId, &messages.OrderCreated{ OrderId: res.PaymentPspReference, Status: item.EventCode, }) @@ -191,7 +191,7 @@ func (s *CheckoutPoolServer) AdyenHookHandler(w http.ResponseWriter, r *http.Req log.Printf("Could not get checkout id: %v", err) } else { - if err := s.applyAnywhere(r.Context(), checkoutId, &messages.PaymentEvent{ + if err := s.ApplyAnywhere(r.Context(), *checkoutId, &messages.PaymentEvent{ PaymentId: item.PspReference, Success: isSuccess, Name: item.EventCode, diff --git a/cmd/checkout/k8s-host-discovery.go b/cmd/checkout/k8s-host-discovery.go index 34170e5..c26a763 100644 --- a/cmd/checkout/k8s-host-discovery.go +++ b/cmd/checkout/k8s-host-discovery.go @@ -3,8 +3,6 @@ package main import ( "log" - "git.k6n.net/go-cart-actor/pkg/actor" - "git.k6n.net/go-cart-actor/pkg/checkout" "git.k6n.net/go-cart-actor/pkg/discovery" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -32,7 +30,7 @@ func GetDiscovery() discovery.Discovery { }) } -func UseDiscovery(pool actor.GrainPool[*checkout.CheckoutGrain]) { +func UseDiscovery(pool discovery.DiscoveryTarget) { go func(hw discovery.Discovery) { if hw == nil { diff --git a/cmd/checkout/klarna-handlers.go b/cmd/checkout/klarna-handlers.go index a336f85..a99aa86 100644 --- a/cmd/checkout/klarna-handlers.go +++ b/cmd/checkout/klarna-handlers.go @@ -195,7 +195,7 @@ func (s *CheckoutPoolServer) KlarnaPushHandler(w http.ResponseWriter, r *http.Re }) } - s.applyAnywhere(r.Context(), uint64(grain.Id), &messages.PaymentCompleted{ + s.ApplyAnywhere(r.Context(), grain.Id, &messages.PaymentCompleted{ PaymentId: orderId, Status: "completed", ProcessorReference: &order.ID, @@ -267,7 +267,7 @@ func (a *CheckoutPoolServer) getGrainFromKlarnaOrder(ctx context.Context, order if !ok { return nil, fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1) } - grain, err := a.getAnywhere(ctx, uint64(cartId)) + grain, err := a.GetAnywhere(ctx, cartId) if err != nil { return nil, fmt.Errorf("failed to get cart grain: %w", err) } diff --git a/cmd/checkout/main.go b/cmd/checkout/main.go index 6849632..3f22a8c 100644 --- a/cmd/checkout/main.go +++ b/cmd/checkout/main.go @@ -89,7 +89,7 @@ func main() { Destroy: func(grain actor.Grain[checkout.CheckoutGrain]) error { return nil }, - SpawnHost: func(host string) (actor.Host, error) { + SpawnHost: func(host string) (actor.Host[checkout.CheckoutGrain], error) { return proxy.NewRemoteHost[checkout.CheckoutGrain](host) }, TTL: 1 * time.Hour, // Longer TTL for checkout @@ -121,7 +121,7 @@ func main() { log.Fatalf("no connection to amqp defined") } - grpcSrv, err := actor.NewControlServer[*checkout.CheckoutGrain](controlPlaneConfig, pool) + grpcSrv, err := actor.NewControlServer[checkout.CheckoutGrain](controlPlaneConfig, pool) if err != nil { log.Fatalf("Error starting control plane gRPC server: %v\n", err) } diff --git a/cmd/checkout/pool-server.go b/cmd/checkout/pool-server.go index 69c46ac..c27fd2a 100644 --- a/cmd/checkout/pool-server.go +++ b/cmd/checkout/pool-server.go @@ -46,7 +46,7 @@ var ( ) type CheckoutPoolServer struct { - actor.GrainPool[*checkout.CheckoutGrain] + actor.GrainPool[checkout.CheckoutGrain] pod_name string klarnaClient *KlarnaClient adyenClient *adyen.APIClient @@ -54,7 +54,7 @@ type CheckoutPoolServer struct { inventoryService *inventory.RedisInventoryService } -func NewCheckoutPoolServer(pool actor.GrainPool[*checkout.CheckoutGrain], pod_name string, klarnaClient *KlarnaClient, cartClient *CartClient, adyenClient *adyen.APIClient) *CheckoutPoolServer { +func NewCheckoutPoolServer(pool actor.GrainPool[checkout.CheckoutGrain], pod_name string, klarnaClient *KlarnaClient, cartClient *CartClient, adyenClient *adyen.APIClient) *CheckoutPoolServer { srv := &CheckoutPoolServer{ GrainPool: pool, pod_name: pod_name, @@ -66,7 +66,7 @@ func NewCheckoutPoolServer(pool actor.GrainPool[*checkout.CheckoutGrain], pod_na return srv } -func (s *CheckoutPoolServer) ApplyLocal(ctx context.Context, id checkout.CheckoutId, mutation ...proto.Message) (*actor.MutationResult[*checkout.CheckoutGrain], error) { +func (s *CheckoutPoolServer) ApplyLocal(ctx context.Context, id checkout.CheckoutId, mutation ...proto.Message) (*actor.MutationResult[checkout.CheckoutGrain], error) { return s.Apply(ctx, uint64(id), mutation...) } @@ -90,7 +90,7 @@ func (s *CheckoutPoolServer) SetDeliveryHandler(w http.ResponseWriter, r *http.R return err } - return s.WriteResult(w, result.Result) + return s.WriteResult(w, result) } func (s *CheckoutPoolServer) RemoveDeliveryHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error { @@ -108,7 +108,7 @@ func (s *CheckoutPoolServer) RemoveDeliveryHandler(w http.ResponseWriter, r *htt return err } - return s.WriteResult(w, result.Result) + return s.WriteResult(w, result) } func (s *CheckoutPoolServer) SetPickupPointHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error { @@ -122,7 +122,7 @@ func (s *CheckoutPoolServer) SetPickupPointHandler(w http.ResponseWriter, r *htt return err } - return s.WriteResult(w, result.Result) + return s.WriteResult(w, result) } func (s *CheckoutPoolServer) InitializeCheckoutHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error { @@ -136,7 +136,7 @@ func (s *CheckoutPoolServer) InitializeCheckoutHandler(w http.ResponseWriter, r return err } - return s.WriteResult(w, result.Result) + return s.WriteResult(w, result) } func (s *CheckoutPoolServer) InventoryReservedHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error { @@ -150,7 +150,7 @@ func (s *CheckoutPoolServer) InventoryReservedHandler(w http.ResponseWriter, r * return err } - return s.WriteResult(w, result.Result) + return s.WriteResult(w, result) } func (s *CheckoutPoolServer) OrderCreatedHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error { @@ -164,7 +164,7 @@ func (s *CheckoutPoolServer) OrderCreatedHandler(w http.ResponseWriter, r *http. return err } - return s.WriteResult(w, result.Result) + return s.WriteResult(w, result) } func (s *CheckoutPoolServer) ConfirmationViewedHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error { @@ -178,7 +178,7 @@ func (s *CheckoutPoolServer) ConfirmationViewedHandler(w http.ResponseWriter, r return err } - return s.WriteResult(w, result.Result) + return s.WriteResult(w, result) } func (s *CheckoutPoolServer) ContactDetailsUpdatedHandler(w http.ResponseWriter, r *http.Request, id checkout.CheckoutId) error { @@ -192,7 +192,7 @@ func (s *CheckoutPoolServer) ContactDetailsUpdatedHandler(w http.ResponseWriter, return err } - return s.WriteResult(w, result.Result) + return s.WriteResult(w, result) } func (s *CheckoutPoolServer) StartCheckoutHandler(w http.ResponseWriter, r *http.Request) { @@ -256,7 +256,7 @@ func (s *CheckoutPoolServer) StartCheckoutHandler(w http.ResponseWriter, r *http // Set checkout cookie setCheckoutCookie(w, checkoutId, r.TLS != nil) - if err := s.WriteResult(w, result.Result); err != nil { + if err := s.WriteResult(w, &result.Result); err != nil { logger.Error("failed to write result", "error", err) } } @@ -299,7 +299,7 @@ func (s *CheckoutPoolServer) CreateOrUpdateCheckout(r *http.Request, grain *chec } } -func (s *CheckoutPoolServer) ApplyKlarnaPaymentStarted(ctx context.Context, klarnaOrder *CheckoutOrder, id checkout.CheckoutId) (*actor.MutationResult[*checkout.CheckoutGrain], error) { +func (s *CheckoutPoolServer) ApplyKlarnaPaymentStarted(ctx context.Context, klarnaOrder *CheckoutOrder, id checkout.CheckoutId) (*actor.MutationResult[checkout.CheckoutGrain], error) { method := "checkout" return s.ApplyLocal(ctx, id, &messages.PaymentStarted{ PaymentId: klarnaOrder.ID, @@ -329,28 +329,22 @@ func init() { } } -func (s *CheckoutPoolServer) applyAnywhere(ctx context.Context, id uint64, msgs ...proto.Message) error { - host, found := s.OwnerHost(id) - if !found { - _, err := s.Apply(ctx, id, msgs...) - return err +func (s *CheckoutPoolServer) GetAnywhere(ctx context.Context, checkoutId cart.CartId) (*checkout.CheckoutGrain, error) { + id := uint64(checkoutId) + if host, isOnOtherHost := s.OwnerHost(id); isOnOtherHost { + return host.Get(ctx, id) } - _, err := host.Apply(ctx, id, msgs...) - return err + return s.Get(ctx, id) } -func (s *CheckoutPoolServer) getAnywhere(ctx context.Context, id uint64) (*checkout.CheckoutGrain, error) { - host, found := s.OwnerHost(id) - if !found { - grain, err := s.Get(ctx, id) - return grain, err +func (s *CheckoutPoolServer) ApplyAnywhere(ctx context.Context, checkoutId cart.CartId, msgs ...proto.Message) error { + id := uint64(checkoutId) + if host, isOnOtherHost := s.OwnerHost(id); isOnOtherHost { + _, err := host.Apply(ctx, id, msgs...) + return err } - ret := &checkout.CheckoutGrain{} - err := host.Get(ctx, id, ret) - if err != nil { - return nil, err - } - return ret, nil + _, err := s.Apply(ctx, id, msgs...) + return err } type StartPayment struct { diff --git a/pkg/actor/base62-id.go b/pkg/actor/base62-id.go new file mode 100644 index 0000000..a5ea1d3 --- /dev/null +++ b/pkg/actor/base62-id.go @@ -0,0 +1,131 @@ +package actor + +import ( + "crypto/rand" + "encoding/json" + "fmt" +) + +type GrainId uint64 + +const base62Alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + +// Reverse lookup (0xFF marks invalid) +var base62Rev [256]byte + +func init() { + for i := range base62Rev { + base62Rev[i] = 0xFF + } + for i := 0; i < len(base62Alphabet); i++ { + base62Rev[base62Alphabet[i]] = byte(i) + } +} + +// String returns the canonical base62 encoding of the 64-bit id. +func (id GrainId) String() string { + return encodeBase62(uint64(id)) +} + +// MarshalJSON encodes the cart id as a JSON string. +func (id GrainId) MarshalJSON() ([]byte, error) { + return json.Marshal(id.String()) +} + +// UnmarshalJSON decodes a cart id from a JSON string containing base62 text. +func (id *GrainId) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + parsed, ok := ParseGrainId(s) + if !ok { + return fmt.Errorf("invalid cart id: %q", s) + } + *id = parsed + return nil +} + +// NewGrainId generates a new cryptographically random non-zero 64-bit id. +func NewGrainId() (GrainId, error) { + var b [8]byte + if _, err := rand.Read(b[:]); err != nil { + return 0, fmt.Errorf("NewGrainId: %w", err) + } + u := (uint64(b[0]) << 56) | + (uint64(b[1]) << 48) | + (uint64(b[2]) << 40) | + (uint64(b[3]) << 32) | + (uint64(b[4]) << 24) | + (uint64(b[5]) << 16) | + (uint64(b[6]) << 8) | + uint64(b[7]) + if u == 0 { + // Extremely unlikely; regenerate once to avoid "0" identifier if desired. + return NewGrainId() + } + return GrainId(u), nil +} + +// MustNewGrainId panics if generation fails. +func MustNewGrainId() GrainId { + id, err := NewGrainId() + if err != nil { + panic(err) + } + return id +} + +// ParseGrainId parses a base62 string into a GrainId. +// Returns (0,false) for invalid input. +func ParseGrainId(s string) (GrainId, bool) { + // Accept length 1..11 (11 sufficient for 64 bits). Reject >11 immediately. + // Provide a slightly looser upper bound (<=16) only if you anticipate future + // extensions; here we stay strict. + if len(s) == 0 || len(s) > 11 { + return 0, false + } + u, ok := decodeBase62(s) + if !ok { + return 0, false + } + return GrainId(u), true +} + +// MustParseGrainId panics on invalid base62 input. +func MustParseGrainId(s string) GrainId { + id, ok := ParseGrainId(s) + if !ok { + panic(fmt.Sprintf("invalid cart id: %q", s)) + } + return id +} + +// encodeBase62 converts a uint64 to base62 (shortest form). +func encodeBase62(u uint64) string { + if u == 0 { + return "0" + } + var buf [11]byte + i := len(buf) + for u > 0 { + i-- + buf[i] = base62Alphabet[u%62] + u /= 62 + } + return string(buf[i:]) +} + +// decodeBase62 converts base62 text to uint64. +func decodeBase62(s string) (uint64, bool) { + var v uint64 + for i := 0; i < len(s); i++ { + c := s[i] + d := base62Rev[c] + if d == 0xFF { + return 0, false + } + v = v*62 + uint64(d) + } + return v, true +} diff --git a/pkg/actor/grain_pool.go b/pkg/actor/grain_pool.go index 182e46f..2928f0a 100644 --- a/pkg/actor/grain_pool.go +++ b/pkg/actor/grain_pool.go @@ -15,29 +15,29 @@ type MutationResult[V any] struct { type GrainPool[V any] interface { Apply(ctx context.Context, id uint64, mutation ...proto.Message) (*MutationResult[V], error) - Get(ctx context.Context, id uint64) (V, error) - OwnerHost(id uint64) (Host, bool) + Get(ctx context.Context, id uint64) (*V, error) + OwnerHost(id uint64) (Host[V], bool) Hostname() string TakeOwnership(id uint64) HandleOwnershipChange(host string, ids []uint64) error HandleRemoteExpiry(host string, ids []uint64) error Negotiate(otherHosts []string) GetLocalIds() []uint64 + IsHealthy() bool + Close() + IsKnown(string) bool RemoveHost(host string) AddRemoteHost(host string) - IsHealthy() bool - IsKnown(string) bool - Close() } // Host abstracts a remote node capable of proxying cart requests. -type Host interface { +type Host[V any] interface { AnnounceExpiry(ids []uint64) Negotiate(otherHosts []string) ([]string, error) Name() string Proxy(id uint64, w http.ResponseWriter, r *http.Request, customBody io.Reader) (bool, error) - Apply(ctx context.Context, id uint64, mutation ...proto.Message) (bool, error) - Get(ctx context.Context, id uint64, grain any) error + Apply(ctx context.Context, id uint64, mutation ...proto.Message) (*MutationResult[V], error) + Get(ctx context.Context, id uint64) (*V, error) GetActorIds() []uint64 Close() error Ping() bool diff --git a/pkg/actor/grpc_server.go b/pkg/actor/grpc_server.go index 7c5eacd..6dd225f 100644 --- a/pkg/actor/grpc_server.go +++ b/pkg/actor/grpc_server.go @@ -107,19 +107,27 @@ func (s *ControlServer[V]) AnnounceOwnership(ctx context.Context, req *messages. }, nil } +func toAny[V any](grain V) (*anypb.Any, error) { + data, err := json.Marshal(grain) + if err != nil { + return nil, err + } + return &anypb.Any{ + Value: data, + }, nil +} + func (s *ControlServer[V]) Get(ctx context.Context, req *messages.GetRequest) (*messages.GetReply, error) { grain, err := s.pool.Get(ctx, req.Id) if err != nil { return nil, err } - data, err := json.Marshal(grain) + grainAny, err := toAny(grain) if err != nil { return nil, err } return &messages.GetReply{ - Grain: &anypb.Any{ - Value: data, - }, + Grain: grainAny, }, nil } @@ -163,16 +171,40 @@ func (s *ControlServer[V]) Apply(ctx context.Context, in *messages.ApplyRequest) for i, anyMsg := range in.Messages { msg, err := anyMsg.UnmarshalNew() if err != nil { - return &messages.ApplyResult{Accepted: false}, fmt.Errorf("failed to unmarshal message: %w", err) + return nil, fmt.Errorf("failed to unmarshal message: %w", err) } msgs[i] = msg } - _, err := s.pool.Apply(ctx, in.Id, msgs...) + r, err := s.pool.Apply(ctx, in.Id, msgs...) if err != nil { - return &messages.ApplyResult{Accepted: false}, err + return nil, err + } + grainAny, err := toAny(r) + if err != nil { + return nil, err + } + mutList := make([]*messages.MutationResult, len(in.Messages)) + for i, msg := range r.Mutations { + mut, err := anypb.New(msg.Mutation) + if err != nil { + return nil, err + } + var errString *string + if msg.Error != nil { + s := msg.Error.Error() + errString = &s + } + mutList[i] = &messages.MutationResult{ + Type: msg.Type, + Message: mut, + Error: errString, + } } - return &messages.ApplyResult{Accepted: true}, nil + return &messages.ApplyResult{ + State: grainAny, + Mutations: mutList, + }, nil } // ControlPlane: Negotiate (merge host views) diff --git a/pkg/actor/grpc_server_test.go b/pkg/actor/grpc_server_test.go index 75318c8..1b33aaf 100644 --- a/pkg/actor/grpc_server_test.go +++ b/pkg/actor/grpc_server_test.go @@ -17,14 +17,14 @@ type mockGrainPool struct { applied []proto.Message } -func (m *mockGrainPool) Apply(ctx context.Context, id uint64, mutations ...proto.Message) (*MutationResult[*mockGrain], error) { +func (m *mockGrainPool) Apply(ctx context.Context, id uint64, mutations ...proto.Message) (*MutationResult[mockGrain], error) { m.applied = mutations // Simulate successful application - return &MutationResult[*mockGrain]{ - Result: &mockGrain{}, + return &MutationResult[mockGrain]{ + Result: mockGrain{}, Mutations: []ApplyResult{ - {Error: nil}, // Assume success - {Error: nil}, + {Type: "AddItem", Mutation: &cart_messages.AddItem{ItemId: 1, Quantity: 2}, Error: nil}, + {Type: "RemoveItem", Mutation: &cart_messages.RemoveItem{Id: 1}, Error: nil}, }, }, nil } @@ -33,7 +33,7 @@ func (m *mockGrainPool) Get(ctx context.Context, id uint64) (*mockGrain, error) return &mockGrain{}, nil } -func (m *mockGrainPool) OwnerHost(id uint64) (Host, bool) { +func (m *mockGrainPool) OwnerHost(id uint64) (Host[mockGrain], bool) { return nil, false } @@ -58,7 +58,7 @@ func TestApplyRequestWithMutations(t *testing.T) { pool := &mockGrainPool{} // Create gRPC server - server, err := NewControlServer[*mockGrain](DefaultServerConfig(), pool) + server, err := NewControlServer[mockGrain](DefaultServerConfig(), pool) if err != nil { t.Fatalf("failed to create server: %v", err) } @@ -88,8 +88,16 @@ func TestApplyRequestWithMutations(t *testing.T) { } // Verify response - if !resp.Accepted { - t.Errorf("expected Accepted=true, got false") + if resp.State == nil { + t.Errorf("expected State to be non-nil") + } + if len(resp.Mutations) != 2 { + t.Errorf("expected 2 mutation results, got %d", len(resp.Mutations)) + } + for i, mut := range resp.Mutations { + if mut.Error != nil { + t.Errorf("expected no error in mutation %d, got %s", i, *mut.Error) + } } // Verify mutations were extracted and applied @@ -103,3 +111,40 @@ func TestApplyRequestWithMutations(t *testing.T) { t.Errorf("expected RemoveItem with Id=1, got %v", pool.applied[1]) } } + +func TestGetRequest(t *testing.T) { + // Setup mock pool + pool := &mockGrainPool{} + + // Create gRPC server + server, err := NewControlServer[mockGrain](DefaultServerConfig(), pool) + if err != nil { + t.Fatalf("failed to create server: %v", err) + } + defer server.GracefulStop() + + // Create client connection + conn, err := grpc.Dial("localhost:1337", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer conn.Close() + + client := control_plane_messages.NewControlPlaneClient(conn) + + // Prepare GetRequest + req := &control_plane_messages.GetRequest{ + Id: 123, + } + + // Call Get + resp, err := client.Get(context.Background(), req) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + // Verify response + if resp.Grain == nil { + t.Errorf("expected Grain to be non-nil") + } +} diff --git a/pkg/actor/simple_grain_pool.go b/pkg/actor/simple_grain_pool.go index ca1b277..6fe942f 100644 --- a/pkg/actor/simple_grain_pool.go +++ b/pkg/actor/simple_grain_pool.go @@ -18,7 +18,7 @@ type SimpleGrainPool[V any] struct { mutationRegistry MutationRegistry spawn func(ctx context.Context, id uint64) (Grain[V], error) destroy func(grain Grain[V]) error - spawnHost func(host string) (Host, error) + spawnHost func(host string) (Host[V], error) listeners []LogListener storage LogStorage[V] ttl time.Duration @@ -27,8 +27,8 @@ type SimpleGrainPool[V any] struct { // Cluster coordination -------------------------------------------------- hostname string remoteMu sync.RWMutex - remoteOwners map[uint64]Host - remoteHosts map[string]Host + remoteOwners map[uint64]Host[V] + remoteHosts map[string]Host[V] //discardedHostHandler *DiscardedHostHandler // House-keeping --------------------------------------------------------- @@ -38,7 +38,7 @@ type SimpleGrainPool[V any] struct { type GrainPoolConfig[V any] struct { Hostname string Spawn func(ctx context.Context, id uint64) (Grain[V], error) - SpawnHost func(host string) (Host, error) + SpawnHost func(host string) (Host[V], error) Destroy func(grain Grain[V]) error TTL time.Duration PoolSize int @@ -57,8 +57,8 @@ func NewSimpleGrainPool[V any](config GrainPoolConfig[V]) (*SimpleGrainPool[V], ttl: config.TTL, poolSize: config.PoolSize, hostname: config.Hostname, - remoteOwners: make(map[uint64]Host), - remoteHosts: make(map[string]Host), + remoteOwners: make(map[uint64]Host[V]), + remoteHosts: make(map[string]Host[V]), } p.purgeTicker = time.NewTicker(time.Minute) @@ -99,7 +99,7 @@ func (p *SimpleGrainPool[V]) purge() { } } p.localMu.Unlock() - p.forAllHosts(func(remote Host) { + p.forAllHosts(func(remote Host[V]) { remote.AnnounceExpiry(purgedIds) }) @@ -136,7 +136,6 @@ func (p *SimpleGrainPool[V]) HandleRemoteExpiry(host string, ids []uint64) error } func (p *SimpleGrainPool[V]) HandleOwnershipChange(host string, ids []uint64) error { - log.Printf("host %s now owns %d cart ids", host, len(ids)) p.remoteMu.RLock() remoteHost, exists := p.remoteHosts[host] p.remoteMu.RUnlock() @@ -168,7 +167,7 @@ func (p *SimpleGrainPool[V]) AddRemoteHost(host string) { p.AddRemote(host) } -func (p *SimpleGrainPool[V]) AddRemote(host string) (Host, error) { +func (p *SimpleGrainPool[V]) AddRemote(host string) (Host[V], error) { if host == "" { return nil, fmt.Errorf("host is empty") } @@ -200,7 +199,7 @@ func (p *SimpleGrainPool[V]) AddRemote(host string) (Host, error) { return remote, nil } -func (p *SimpleGrainPool[V]) initializeRemote(remote Host) { +func (p *SimpleGrainPool[V]) initializeRemote(remote Host[V]) { remotesIds := remote.GetActorIds() @@ -268,7 +267,7 @@ func (p *SimpleGrainPool[V]) IsKnown(host string) bool { return ok } -func (p *SimpleGrainPool[V]) pingLoop(remote Host) { +func (p *SimpleGrainPool[V]) pingLoop(remote Host[V]) { remote.Ping() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() @@ -316,14 +315,14 @@ func (p *SimpleGrainPool[V]) SendNegotiation() { p.remoteMu.RLock() hosts := make([]string, 0, len(p.remoteHosts)+1) hosts = append(hosts, p.hostname) - remotes := make([]Host, 0, len(p.remoteHosts)) + remotes := make([]Host[V], 0, len(p.remoteHosts)) for h, r := range p.remoteHosts { hosts = append(hosts, h) remotes = append(remotes, r) } p.remoteMu.RUnlock() - p.forAllHosts(func(remote Host) { + p.forAllHosts(func(remote Host[V]) { knownByRemote, err := remote.Negotiate(hosts) if err != nil { @@ -338,7 +337,7 @@ func (p *SimpleGrainPool[V]) SendNegotiation() { }) } -func (p *SimpleGrainPool[V]) forAllHosts(fn func(Host)) { +func (p *SimpleGrainPool[V]) forAllHosts(fn func(Host[V])) { p.remoteMu.RLock() rh := maps.Clone(p.remoteHosts) p.remoteMu.RUnlock() @@ -366,7 +365,7 @@ func (p *SimpleGrainPool[V]) broadcastOwnership(ids []uint64) { return } - p.forAllHosts(func(rh Host) { + p.forAllHosts(func(rh Host[V]) { rh.AnnounceOwnership(p.hostname, ids) }) log.Printf("%s taking ownership of %d ids", p.hostname, len(ids)) @@ -385,10 +384,11 @@ func (p *SimpleGrainPool[V]) getOrClaimGrain(ctx context.Context, id uint64) (Gr if err != nil { return nil, err } + go p.broadcastOwnership([]uint64{id}) p.localMu.Lock() p.grains[id] = grain p.localMu.Unlock() - go p.broadcastOwnership([]uint64{id}) + return grain, nil } @@ -396,7 +396,7 @@ func (p *SimpleGrainPool[V]) getOrClaimGrain(ctx context.Context, id uint64) (Gr // var ErrNotOwner = fmt.Errorf("not owner") // Apply applies a mutation to a grain. -func (p *SimpleGrainPool[V]) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (*MutationResult[*V], error) { +func (p *SimpleGrainPool[V]) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (*MutationResult[V], error) { grain, err := p.getOrClaimGrain(ctx, id) if err != nil { return nil, err @@ -421,8 +421,8 @@ func (p *SimpleGrainPool[V]) Apply(ctx context.Context, id uint64, mutation ...p return nil, err } - return &MutationResult[*V]{ - Result: result, + return &MutationResult[V]{ + Result: *result, Mutations: mutations, }, nil } @@ -437,7 +437,7 @@ func (p *SimpleGrainPool[V]) Get(ctx context.Context, id uint64) (*V, error) { } // OwnerHost reports the remote owner (if any) for the supplied cart id. -func (p *SimpleGrainPool[V]) OwnerHost(id uint64) (Host, bool) { +func (p *SimpleGrainPool[V]) OwnerHost(id uint64) (Host[V], bool) { p.remoteMu.RLock() defer p.remoteMu.RUnlock() owner, ok := p.remoteOwners[id] @@ -452,7 +452,7 @@ func (p *SimpleGrainPool[V]) Hostname() string { // Close notifies remotes that this host is shutting down. func (p *SimpleGrainPool[V]) Close() { - p.forAllHosts(func(rh Host) { + p.forAllHosts(func(rh Host[V]) { rh.Close() }) diff --git a/pkg/cart/cart-mutation-helper.go b/pkg/cart/cart-mutation-helper.go index c17bb9f..e0965a8 100644 --- a/pkg/cart/cart-mutation-helper.go +++ b/pkg/cart/cart-mutation-helper.go @@ -23,9 +23,7 @@ func (c *CartMutationContext) ReserveItem(ctx context.Context, cartId CartId, sk if quantity <= 0 || c.reservationService == nil { return nil, nil } - if sku != "919641" { - return nil, nil - } + l := inventory.LocationID("se") if locationId != nil { l = inventory.LocationID(*locationId) @@ -49,6 +47,13 @@ func (c *CartMutationContext) ReserveItem(ctx context.Context, cartId CartId, sk } +func (c *CartMutationContext) UseReservations(item *CartItem) bool { + if item.ReservationEndTime != nil { + return true + } + return item.Cgm == "55010" +} + func (c *CartMutationContext) ReleaseItem(ctx context.Context, cartId CartId, sku string, locationId *string) error { if c.reservationService == nil { return nil @@ -60,12 +65,6 @@ func (c *CartMutationContext) ReleaseItem(ctx context.Context, cartId CartId, sk return c.reservationService.ReleaseForCart(ctx, inventory.SKU(sku), l, inventory.CartID(cartId.String())) } -func Create[T any]() func() *T { - return func() *T { - return new(T) - } -} - func NewCartMultationRegistry(context *CartMutationContext) actor.MutationRegistry { reg := actor.NewMutationRegistry() diff --git a/pkg/cart/cart_id.go b/pkg/cart/cart_id.go index e57c2b2..bb6d8d5 100644 --- a/pkg/cart/cart_id.go +++ b/pkg/cart/cart_id.go @@ -4,6 +4,8 @@ import ( "crypto/rand" "encoding/json" "fmt" + + "git.k6n.net/go-cart-actor/pkg/actor" ) // cart_id.go @@ -34,7 +36,7 @@ import ( // // --------------------------------------------------------------------------- -type CartId uint64 +type CartId actor.GrainId const base62Alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" diff --git a/pkg/cart/mutation_add_item.go b/pkg/cart/mutation_add_item.go index 655942a..f929792 100644 --- a/pkg/cart/mutation_add_item.go +++ b/pkg/cart/mutation_add_item.go @@ -45,14 +45,16 @@ func (c *CartMutationContext) AddItem(g *CartGrain, m *cart_messages.AddItem) er if !sameStore { continue } - if err := c.ReleaseItem(ctx, g.Id, existing.Sku, existing.StoreId); err != nil { - log.Printf("failed to release item %d: %v", existing.Id, err) + if c.UseReservations(existing) { + if err := c.ReleaseItem(ctx, g.Id, existing.Sku, existing.StoreId); err != nil { + log.Printf("failed to release item %d: %v", existing.Id, err) + } + endTime, err := c.ReserveItem(ctx, g.Id, existing.Sku, existing.StoreId, existing.Quantity+uint16(m.Quantity)) + if err != nil { + return err + } + existing.ReservationEndTime = endTime } - endTime, err := c.ReserveItem(ctx, g.Id, existing.Sku, existing.StoreId, existing.Quantity+uint16(m.Quantity)) - if err != nil { - return err - } - existing.ReservationEndTime = endTime existing.Quantity += uint16(m.Quantity) existing.Stock = uint16(m.Stock) // If existing had nil store but new has one, adopt it. @@ -79,16 +81,6 @@ func (c *CartMutationContext) AddItem(g *CartGrain, m *cart_messages.AddItem) er needsReservation = m.ReservationEndTime.AsTime().Before(time.Now()) } - if needsReservation { - endTime, err := c.ReserveItem(ctx, g.Id, m.Sku, m.StoreId, uint16(m.Quantity)) - if err != nil { - return err - } - if endTime != nil { - m.ReservationEndTime = timestamppb.New(*endTime) - } - } - cartItem := &CartItem{ Id: g.lastItemId, ItemId: uint32(m.ItemId), @@ -123,10 +115,19 @@ func (c *CartMutationContext) AddItem(g *CartGrain, m *cart_messages.AddItem) er StoreId: m.StoreId, } - if m.ReservationEndTime != nil { - t := m.ReservationEndTime.AsTime() - cartItem.ReservationEndTime = &t + + if needsReservation && c.UseReservations(cartItem) { + endTime, err := c.ReserveItem(ctx, g.Id, m.Sku, m.StoreId, uint16(m.Quantity)) + if err != nil { + return err + } + if endTime != nil { + m.ReservationEndTime = timestamppb.New(*endTime) + t := m.ReservationEndTime.AsTime() + cartItem.ReservationEndTime = &t + } } + g.Items = append(g.Items, cartItem) g.UpdateTotals() return nil diff --git a/pkg/cart/mutation_change_quantity.go b/pkg/cart/mutation_change_quantity.go index f0ea9c2..892216f 100644 --- a/pkg/cart/mutation_change_quantity.go +++ b/pkg/cart/mutation_change_quantity.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "time" messages "git.k6n.net/go-cart-actor/proto/cart" ) @@ -49,9 +50,11 @@ func (c *CartMutationContext) ChangeQuantity(g *CartGrain, m *messages.ChangeQua if m.Quantity <= 0 { // Remove the item itemToRemove := g.Items[foundIndex] - err := c.ReleaseItem(ctx, g.Id, itemToRemove.Sku, itemToRemove.StoreId) - if err != nil { - log.Printf("unable to release reservation for %s in location: %v", itemToRemove.Sku, itemToRemove.StoreId) + if itemToRemove.ReservationEndTime != nil && itemToRemove.ReservationEndTime.Before(time.Now()) { + err := c.ReleaseItem(ctx, g.Id, itemToRemove.Sku, itemToRemove.StoreId) + if err != nil { + log.Printf("unable to release reservation for %s in location: %v", itemToRemove.Sku, itemToRemove.StoreId) + } } g.Items = append(g.Items[:foundIndex], g.Items[foundIndex+1:]...) g.UpdateTotals() @@ -61,18 +64,20 @@ func (c *CartMutationContext) ChangeQuantity(g *CartGrain, m *messages.ChangeQua if item == nil { return fmt.Errorf("ChangeQuantity: item id %d not found", m.Id) } - if item.ReservationEndTime != nil { - err := c.ReleaseItem(ctx, g.Id, item.Sku, item.StoreId) - if err != nil { - log.Printf("unable to release reservation for %s in location: %v", item.Sku, item.StoreId) - } + if c.UseReservations(item) { + if item.ReservationEndTime != nil { + err := c.ReleaseItem(ctx, g.Id, item.Sku, item.StoreId) + if err != nil { + log.Printf("unable to release reservation for %s in location: %v", item.Sku, item.StoreId) + } + } + endTime, err := c.ReserveItem(ctx, g.Id, item.Sku, item.StoreId, uint16(m.Quantity)) + if err != nil { + return err + } + item.ReservationEndTime = endTime } - endTime, err := c.ReserveItem(ctx, g.Id, item.Sku, item.StoreId, uint16(m.Quantity)) - if err != nil { - return err - } - item.ReservationEndTime = endTime item.Quantity = uint16(m.Quantity) g.UpdateTotals() diff --git a/pkg/discovery/types.go b/pkg/discovery/types.go index 9eaeb48..0fbe1df 100644 --- a/pkg/discovery/types.go +++ b/pkg/discovery/types.go @@ -9,3 +9,9 @@ type Discovery interface { Discover() ([]string, error) Watch() (<-chan HostChange, error) } + +type DiscoveryTarget interface { + IsKnown(string) bool + RemoveHost(host string) + AddRemoteHost(host string) +} diff --git a/pkg/proxy/remotehost.go b/pkg/proxy/remotehost.go index 738eb22..a8aa9ec 100644 --- a/pkg/proxy/remotehost.go +++ b/pkg/proxy/remotehost.go @@ -11,6 +11,7 @@ import ( "net/http" "time" + "git.k6n.net/go-cart-actor/pkg/actor" messages "git.k6n.net/go-cart-actor/proto/control" "go.opentelemetry.io/contrib/bridges/otelslog" "go.opentelemetry.io/otel" @@ -130,25 +131,29 @@ func (h *RemoteHost[V]) Ping() bool { return true } -func (h *RemoteHost[V]) Get(ctx context.Context, id uint64, grain any) error { +func (h *RemoteHost[V]) Get(ctx context.Context, id uint64) (*V, error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() reply, error := h.controlClient.Get(ctx, &messages.GetRequest{Id: id}) if error != nil { - return error + return nil, error } - return json.Unmarshal(reply.Grain.Value, grain) + var grain V + err := json.Unmarshal(reply.Grain.Value, &grain) + if err != nil { + return nil, fmt.Errorf("failed to unpack state: %w", err) + } + return &grain, nil } -func (h *RemoteHost[V]) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (bool, error) { +func (h *RemoteHost[V]) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (*actor.MutationResult[V], error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - toSend := make([]*anypb.Any, len(mutation)) for i, msg := range mutation { anyMsg, err := anypb.New(msg) if err != nil { - return false, fmt.Errorf("failed to pack message: %w", err) + return nil, fmt.Errorf("failed to pack message: %w", err) } toSend[i] = anyMsg } @@ -160,10 +165,34 @@ func (h *RemoteHost[V]) Apply(ctx context.Context, id uint64, mutation ...proto. if err != nil { h.missedPings++ log.Printf("Apply %s failed: %v", h.host, err) - return false, err + return nil, err } h.missedPings = 0 - return resp.Accepted, nil + var grain V + err = json.Unmarshal(resp.State.Value, &grain) + if err != nil { + return nil, fmt.Errorf("failed to unpack state: %w", err) + } + var mutationList []actor.ApplyResult + for _, msg := range resp.Mutations { + + mutation, err := anypb.UnmarshalNew(msg.Message, proto.UnmarshalOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to unpack mutation: %w", err) + } + if msg.Error != nil { + err = errors.New(*msg.Error) + } + mutationList = append(mutationList, actor.ApplyResult{ + Mutation: mutation, + Error: err, + }) + } + res := &actor.MutationResult[V]{ + Result: grain, + Mutations: mutationList, + } + return res, nil } func (h *RemoteHost[V]) Negotiate(knownHosts []string) ([]string, error) { diff --git a/proto/control/control_plane.pb.go b/proto/control/control_plane.pb.go index 5c97e10..69879c7 100644 --- a/proto/control/control_plane.pb.go +++ b/proto/control/control_plane.pb.go @@ -592,16 +592,77 @@ func (x *GetReply) GetGrain() *anypb.Any { return nil } +type MutationResult struct { + state protoimpl.MessageState `protogen:"open.v1"` + Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` + Message *anypb.Any `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + Error *string `protobuf:"bytes,3,opt,name=error,proto3,oneof" json:"error,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *MutationResult) Reset() { + *x = MutationResult{} + mi := &file_control_plane_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MutationResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MutationResult) ProtoMessage() {} + +func (x *MutationResult) ProtoReflect() protoreflect.Message { + mi := &file_control_plane_proto_msgTypes[12] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MutationResult.ProtoReflect.Descriptor instead. +func (*MutationResult) Descriptor() ([]byte, []int) { + return file_control_plane_proto_rawDescGZIP(), []int{12} +} + +func (x *MutationResult) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *MutationResult) GetMessage() *anypb.Any { + if x != nil { + return x.Message + } + return nil +} + +func (x *MutationResult) GetError() string { + if x != nil && x.Error != nil { + return *x.Error + } + return "" +} + type ApplyResult struct { state protoimpl.MessageState `protogen:"open.v1"` - Accepted bool `protobuf:"varint,1,opt,name=accepted,proto3" json:"accepted,omitempty"` + State *anypb.Any `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"` + Mutations []*MutationResult `protobuf:"bytes,2,rep,name=mutations,proto3" json:"mutations,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *ApplyResult) Reset() { *x = ApplyResult{} - mi := &file_control_plane_proto_msgTypes[12] + mi := &file_control_plane_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -613,7 +674,7 @@ func (x *ApplyResult) String() string { func (*ApplyResult) ProtoMessage() {} func (x *ApplyResult) ProtoReflect() protoreflect.Message { - mi := &file_control_plane_proto_msgTypes[12] + mi := &file_control_plane_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -626,14 +687,21 @@ func (x *ApplyResult) ProtoReflect() protoreflect.Message { // Deprecated: Use ApplyResult.ProtoReflect.Descriptor instead. func (*ApplyResult) Descriptor() ([]byte, []int) { - return file_control_plane_proto_rawDescGZIP(), []int{12} + return file_control_plane_proto_rawDescGZIP(), []int{13} } -func (x *ApplyResult) GetAccepted() bool { +func (x *ApplyResult) GetState() *anypb.Any { if x != nil { - return x.Accepted + return x.State } - return false + return nil +} + +func (x *ApplyResult) GetMutations() []*MutationResult { + if x != nil { + return x.Mutations + } + return nil } var File_control_plane_proto protoreflect.FileDescriptor @@ -680,60 +748,73 @@ var file_control_plane_proto_rawDesc = string([]byte{ 0x36, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x2a, 0x0a, 0x05, 0x67, 0x72, 0x61, 0x69, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, - 0x52, 0x05, 0x67, 0x72, 0x61, 0x69, 0x6e, 0x22, 0x29, 0x0a, 0x0b, 0x41, 0x70, 0x70, 0x6c, 0x79, - 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, - 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, - 0x65, 0x64, 0x32, 0xd6, 0x05, 0x0a, 0x0c, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x50, 0x6c, - 0x61, 0x6e, 0x65, 0x12, 0x48, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x1d, 0x2e, 0x63, 0x6f, - 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x73, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x21, 0x2e, 0x63, 0x6f, 0x6e, - 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x73, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x5d, 0x0a, - 0x09, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x12, 0x28, 0x2e, 0x63, 0x6f, 0x6e, - 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x73, 0x2e, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, - 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4e, 0x65, - 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x58, 0x0a, 0x10, - 0x47, 0x65, 0x74, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x64, 0x73, - 0x12, 0x1d, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, - 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, - 0x25, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x64, - 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x66, 0x0a, 0x11, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, - 0x63, 0x65, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x12, 0x29, 0x2e, 0x63, 0x6f, - 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x41, 0x6e, - 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x1a, 0x26, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, - 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, - 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, 0x52, - 0x0a, 0x05, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x12, 0x24, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, - 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, - 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, - 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x73, 0x75, - 0x6c, 0x74, 0x12, 0x60, 0x0a, 0x0e, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x45, 0x78, - 0x70, 0x69, 0x72, 0x79, 0x12, 0x26, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, - 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x45, 0x78, - 0x70, 0x69, 0x72, 0x79, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x1a, 0x26, 0x2e, 0x63, + 0x52, 0x05, 0x67, 0x72, 0x61, 0x69, 0x6e, 0x22, 0x79, 0x0a, 0x0e, 0x4d, 0x75, 0x74, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x2e, 0x0a, + 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x19, 0x0a, + 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x88, 0x01, 0x01, 0x42, 0x08, 0x0a, 0x06, 0x5f, 0x65, 0x72, 0x72, + 0x6f, 0x72, 0x22, 0x7f, 0x0a, 0x0b, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x73, 0x75, 0x6c, + 0x74, 0x12, 0x2a, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x44, 0x0a, + 0x09, 0x6d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x26, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, + 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x09, 0x6d, 0x75, 0x74, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x32, 0xd6, 0x05, 0x0a, 0x0c, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x50, + 0x6c, 0x61, 0x6e, 0x65, 0x12, 0x48, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x1d, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, - 0x65, 0x41, 0x63, 0x6b, 0x12, 0x58, 0x0a, 0x07, 0x43, 0x6c, 0x6f, 0x73, 0x69, 0x6e, 0x67, 0x12, - 0x25, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x69, 0x6e, 0x67, - 0x4e, 0x6f, 0x74, 0x69, 0x63, 0x65, 0x1a, 0x26, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, + 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x21, 0x2e, 0x63, 0x6f, + 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x73, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x5d, + 0x0a, 0x09, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x12, 0x28, 0x2e, 0x63, 0x6f, + 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, + 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4e, + 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x58, 0x0a, + 0x10, 0x47, 0x65, 0x74, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x64, + 0x73, 0x12, 0x1d, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, + 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, + 0x1a, 0x25, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, + 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, + 0x64, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x66, 0x0a, 0x11, 0x41, 0x6e, 0x6e, 0x6f, 0x75, + 0x6e, 0x63, 0x65, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x12, 0x29, 0x2e, 0x63, + 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x41, + 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x1a, 0x26, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, + 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, + 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, + 0x52, 0x0a, 0x05, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x12, 0x24, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, + 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x73, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, + 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x73, + 0x75, 0x6c, 0x74, 0x12, 0x60, 0x0a, 0x0e, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x45, + 0x78, 0x70, 0x69, 0x72, 0x79, 0x12, 0x26, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, + 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x45, + 0x78, 0x70, 0x69, 0x72, 0x79, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x1a, 0x26, 0x2e, + 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, + 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, 0x58, 0x0a, 0x07, 0x43, 0x6c, 0x6f, 0x73, 0x69, 0x6e, 0x67, + 0x12, 0x25, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, + 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x69, 0x6e, + 0x67, 0x4e, 0x6f, 0x74, 0x69, 0x63, 0x65, 0x1a, 0x26, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, + 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, + 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, + 0x4b, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x22, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, - 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, 0x4b, - 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x22, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, - 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x47, - 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x63, 0x6f, 0x6e, 0x74, - 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x73, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x42, 0x40, 0x5a, 0x3e, 0x67, - 0x69, 0x74, 0x2e, 0x6b, 0x36, 0x6e, 0x2e, 0x6e, 0x65, 0x74, 0x2f, 0x67, 0x6f, 0x2d, 0x63, 0x61, - 0x72, 0x74, 0x2d, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x63, - 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x3b, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, - 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x63, 0x6f, 0x6e, + 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x73, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x42, 0x40, 0x5a, 0x3e, + 0x67, 0x69, 0x74, 0x2e, 0x6b, 0x36, 0x6e, 0x2e, 0x6e, 0x65, 0x74, 0x2f, 0x67, 0x6f, 0x2d, 0x63, + 0x61, 0x72, 0x74, 0x2d, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, + 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x3b, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, + 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, }) var ( @@ -748,7 +829,7 @@ func file_control_plane_proto_rawDescGZIP() []byte { return file_control_plane_proto_rawDescData } -var file_control_plane_proto_msgTypes = make([]protoimpl.MessageInfo, 13) +var file_control_plane_proto_msgTypes = make([]protoimpl.MessageInfo, 14) var file_control_plane_proto_goTypes = []any{ (*Empty)(nil), // 0: control_plane_messages.Empty (*PingReply)(nil), // 1: control_plane_messages.PingReply @@ -762,33 +843,37 @@ var file_control_plane_proto_goTypes = []any{ (*ApplyRequest)(nil), // 9: control_plane_messages.ApplyRequest (*GetRequest)(nil), // 10: control_plane_messages.GetRequest (*GetReply)(nil), // 11: control_plane_messages.GetReply - (*ApplyResult)(nil), // 12: control_plane_messages.ApplyResult - (*anypb.Any)(nil), // 13: google.protobuf.Any + (*MutationResult)(nil), // 12: control_plane_messages.MutationResult + (*ApplyResult)(nil), // 13: control_plane_messages.ApplyResult + (*anypb.Any)(nil), // 14: google.protobuf.Any } var file_control_plane_proto_depIdxs = []int32{ - 13, // 0: control_plane_messages.ApplyRequest.messages:type_name -> google.protobuf.Any - 13, // 1: control_plane_messages.GetReply.grain:type_name -> google.protobuf.Any - 0, // 2: control_plane_messages.ControlPlane.Ping:input_type -> control_plane_messages.Empty - 2, // 3: control_plane_messages.ControlPlane.Negotiate:input_type -> control_plane_messages.NegotiateRequest - 0, // 4: control_plane_messages.ControlPlane.GetLocalActorIds:input_type -> control_plane_messages.Empty - 7, // 5: control_plane_messages.ControlPlane.AnnounceOwnership:input_type -> control_plane_messages.OwnershipAnnounce - 9, // 6: control_plane_messages.ControlPlane.Apply:input_type -> control_plane_messages.ApplyRequest - 8, // 7: control_plane_messages.ControlPlane.AnnounceExpiry:input_type -> control_plane_messages.ExpiryAnnounce - 6, // 8: control_plane_messages.ControlPlane.Closing:input_type -> control_plane_messages.ClosingNotice - 10, // 9: control_plane_messages.ControlPlane.Get:input_type -> control_plane_messages.GetRequest - 1, // 10: control_plane_messages.ControlPlane.Ping:output_type -> control_plane_messages.PingReply - 3, // 11: control_plane_messages.ControlPlane.Negotiate:output_type -> control_plane_messages.NegotiateReply - 4, // 12: control_plane_messages.ControlPlane.GetLocalActorIds:output_type -> control_plane_messages.ActorIdsReply - 5, // 13: control_plane_messages.ControlPlane.AnnounceOwnership:output_type -> control_plane_messages.OwnerChangeAck - 12, // 14: control_plane_messages.ControlPlane.Apply:output_type -> control_plane_messages.ApplyResult - 5, // 15: control_plane_messages.ControlPlane.AnnounceExpiry:output_type -> control_plane_messages.OwnerChangeAck - 5, // 16: control_plane_messages.ControlPlane.Closing:output_type -> control_plane_messages.OwnerChangeAck - 11, // 17: control_plane_messages.ControlPlane.Get:output_type -> control_plane_messages.GetReply - 10, // [10:18] is the sub-list for method output_type - 2, // [2:10] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 14, // 0: control_plane_messages.ApplyRequest.messages:type_name -> google.protobuf.Any + 14, // 1: control_plane_messages.GetReply.grain:type_name -> google.protobuf.Any + 14, // 2: control_plane_messages.MutationResult.message:type_name -> google.protobuf.Any + 14, // 3: control_plane_messages.ApplyResult.state:type_name -> google.protobuf.Any + 12, // 4: control_plane_messages.ApplyResult.mutations:type_name -> control_plane_messages.MutationResult + 0, // 5: control_plane_messages.ControlPlane.Ping:input_type -> control_plane_messages.Empty + 2, // 6: control_plane_messages.ControlPlane.Negotiate:input_type -> control_plane_messages.NegotiateRequest + 0, // 7: control_plane_messages.ControlPlane.GetLocalActorIds:input_type -> control_plane_messages.Empty + 7, // 8: control_plane_messages.ControlPlane.AnnounceOwnership:input_type -> control_plane_messages.OwnershipAnnounce + 9, // 9: control_plane_messages.ControlPlane.Apply:input_type -> control_plane_messages.ApplyRequest + 8, // 10: control_plane_messages.ControlPlane.AnnounceExpiry:input_type -> control_plane_messages.ExpiryAnnounce + 6, // 11: control_plane_messages.ControlPlane.Closing:input_type -> control_plane_messages.ClosingNotice + 10, // 12: control_plane_messages.ControlPlane.Get:input_type -> control_plane_messages.GetRequest + 1, // 13: control_plane_messages.ControlPlane.Ping:output_type -> control_plane_messages.PingReply + 3, // 14: control_plane_messages.ControlPlane.Negotiate:output_type -> control_plane_messages.NegotiateReply + 4, // 15: control_plane_messages.ControlPlane.GetLocalActorIds:output_type -> control_plane_messages.ActorIdsReply + 5, // 16: control_plane_messages.ControlPlane.AnnounceOwnership:output_type -> control_plane_messages.OwnerChangeAck + 13, // 17: control_plane_messages.ControlPlane.Apply:output_type -> control_plane_messages.ApplyResult + 5, // 18: control_plane_messages.ControlPlane.AnnounceExpiry:output_type -> control_plane_messages.OwnerChangeAck + 5, // 19: control_plane_messages.ControlPlane.Closing:output_type -> control_plane_messages.OwnerChangeAck + 11, // 20: control_plane_messages.ControlPlane.Get:output_type -> control_plane_messages.GetReply + 13, // [13:21] is the sub-list for method output_type + 5, // [5:13] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_control_plane_proto_init() } @@ -796,13 +881,14 @@ func file_control_plane_proto_init() { if File_control_plane_proto != nil { return } + file_control_plane_proto_msgTypes[12].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_control_plane_proto_rawDesc), len(file_control_plane_proto_rawDesc)), NumEnums: 0, - NumMessages: 13, + NumMessages: 14, NumExtensions: 0, NumServices: 1, }, diff --git a/proto/control_plane.proto b/proto/control_plane.proto index bf8568b..9f4b444 100644 --- a/proto/control_plane.proto +++ b/proto/control_plane.proto @@ -81,8 +81,15 @@ message GetReply { google.protobuf.Any grain = 1; } +message MutationResult { + string type = 1; + google.protobuf.Any message = 2; + optional string error = 3; +} + message ApplyResult { - bool accepted = 1; + google.protobuf.Any state = 1; + repeated MutationResult mutations = 2; } // ControlPlane defines cluster coordination and ownership operations.