diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index c6d5b2f..0f6a86c 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -18,7 +18,7 @@ import ( "git.k6n.net/go-cart-actor/pkg/actor" "git.k6n.net/go-cart-actor/pkg/cart" - "github.com/gogo/protobuf/proto" + "google.golang.org/protobuf/proto" ) type FileServer struct { diff --git a/cmd/cart/pool-server.go b/cmd/cart/pool-server.go index 7d00110..78fcdb2 100644 --- a/cmd/cart/pool-server.go +++ b/cmd/cart/pool-server.go @@ -19,12 +19,12 @@ import ( "github.com/adyen/adyen-go-api-library/v14/src/adyen" "github.com/adyen/adyen-go-api-library/v14/src/hmacvalidator" "github.com/adyen/adyen-go-api-library/v14/src/webhook" - "github.com/gogo/protobuf/proto" "github.com/matst80/go-redis-inventory/pkg/inventory" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.opentelemetry.io/contrib/bridges/otelslog" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "go.opentelemetry.io/otel" diff --git a/pkg/actor/disk_storage.go b/pkg/actor/disk_storage.go index 50021ed..75e7376 100644 --- a/pkg/actor/disk_storage.go +++ b/pkg/actor/disk_storage.go @@ -10,7 +10,7 @@ import ( "sync" "time" - "github.com/gogo/protobuf/proto" + "google.golang.org/protobuf/proto" ) type QueueEvent struct { diff --git a/pkg/actor/grain_pool.go b/pkg/actor/grain_pool.go index 221a2e1..9da140d 100644 --- a/pkg/actor/grain_pool.go +++ b/pkg/actor/grain_pool.go @@ -4,7 +4,7 @@ import ( "context" "net/http" - "github.com/gogo/protobuf/proto" + "google.golang.org/protobuf/proto" ) type MutationResult[V any] struct { diff --git a/pkg/actor/grpc_server.go b/pkg/actor/grpc_server.go index 01a2152..91b63e1 100644 --- a/pkg/actor/grpc_server.go +++ b/pkg/actor/grpc_server.go @@ -8,13 +8,13 @@ import ( "time" messages "git.k6n.net/go-cart-actor/pkg/messages" - "github.com/gogo/protobuf/proto" "go.opentelemetry.io/contrib/bridges/otelslog" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "google.golang.org/grpc" "google.golang.org/grpc/reflection" + "google.golang.org/protobuf/proto" ) // ControlServer implements the ControlPlane gRPC services. @@ -142,56 +142,10 @@ func (s *ControlServer[V]) Ping(ctx context.Context, _ *messages.Empty) (*messag func (s *ControlServer[V]) Apply(ctx context.Context, in *messages.ApplyRequest) (*messages.ApplyResult, error) { msgs := make([]proto.Message, len(in.Messages)) - for i, mut := range in.Messages { - var msg proto.Message - if m := mut.GetClearCart(); m != nil { - msg = m - } else if m := mut.GetAddItem(); m != nil { - msg = m - } else if m := mut.GetRemoveItem(); m != nil { - msg = m - } else if m := mut.GetChangeQuantity(); m != nil { - msg = m - } else if m := mut.GetSetDelivery(); m != nil { - msg = m - } else if m := mut.GetSetPickupPoint(); m != nil { - msg = m - } else if m := mut.GetRemoveDelivery(); m != nil { - msg = m - } else if m := mut.GetSetUserId(); m != nil { - msg = m - } else if m := mut.GetLineItemMarking(); m != nil { - msg = m - } else if m := mut.GetRemoveLineItemMarking(); m != nil { - msg = m - } else if m := mut.GetSubscriptionAdded(); m != nil { - msg = m - } else if m := mut.GetPaymentDeclined(); m != nil { - msg = m - } else if m := mut.GetConfirmationViewed(); m != nil { - msg = m - } else if m := mut.GetCreateCheckoutOrder(); m != nil { - msg = m - } else if m := mut.GetOrderCreated(); m != nil { - msg = m - } else if m := mut.GetNoop(); m != nil { - msg = m - } else if m := mut.GetInitializeCheckout(); m != nil { - msg = m - } else if m := mut.GetInventoryReserved(); m != nil { - msg = m - } else if m := mut.GetAddVoucher(); m != nil { - msg = m - } else if m := mut.GetRemoveVoucher(); m != nil { - msg = m - } else if m := mut.GetUpsertSubscriptionDetails(); m != nil { - msg = m - } else if m := mut.GetPreConditionFailed(); m != nil { - msg = m - } else if m := mut.GetAddGiftcard(); m != nil { - msg = m - } else if m := mut.GetRemoveGiftcard(); m != nil { - msg = m + for i, anyMsg := range in.Messages { + msg, err := anyMsg.UnmarshalNew() + if err != nil { + return &messages.ApplyResult{Accepted: false}, fmt.Errorf("failed to unmarshal message: %w", err) } msgs[i] = msg } diff --git a/pkg/actor/grpc_server_test.go b/pkg/actor/grpc_server_test.go new file mode 100644 index 0000000..cf83f83 --- /dev/null +++ b/pkg/actor/grpc_server_test.go @@ -0,0 +1,104 @@ +package actor + +import ( + "context" + "testing" + + "git.k6n.net/go-cart-actor/pkg/messages" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" +) + +// MockGrainPool for testing +type mockGrainPool struct { + applied []proto.Message +} + +func (m *mockGrainPool) Apply(ctx context.Context, id uint64, mutations ...proto.Message) (*MutationResult[*mockGrain], error) { + m.applied = mutations + // Simulate successful application + return &MutationResult[*mockGrain]{ + Result: &mockGrain{}, + Mutations: []ApplyResult{ + {Error: nil}, // Assume success + {Error: nil}, + }, + }, nil +} + +func (m *mockGrainPool) Get(ctx context.Context, id uint64) (*mockGrain, error) { + return &mockGrain{}, nil +} + +func (m *mockGrainPool) OwnerHost(id uint64) (Host, bool) { + return nil, false +} + +func (m *mockGrainPool) TakeOwnership(id uint64) {} + +func (m *mockGrainPool) Hostname() string { return "test-host" } + +func (m *mockGrainPool) HandleOwnershipChange(host string, ids []uint64) error { return nil } +func (m *mockGrainPool) HandleRemoteExpiry(host string, ids []uint64) error { return nil } +func (m *mockGrainPool) Negotiate(hosts []string) {} +func (m *mockGrainPool) GetLocalIds() []uint64 { return []uint64{} } +func (m *mockGrainPool) RemoveHost(host string) {} +func (m *mockGrainPool) AddRemoteHost(host string) {} +func (m *mockGrainPool) IsHealthy() bool { return true } +func (m *mockGrainPool) IsKnown(host string) bool { return false } +func (m *mockGrainPool) Close() {} + +type mockGrain struct{} + +func TestApplyRequestWithMutations(t *testing.T) { + // Setup mock pool + pool := &mockGrainPool{} + + // Create gRPC server + server, err := NewControlServer[*mockGrain](DefaultServerConfig(), pool) + if err != nil { + t.Fatalf("failed to create server: %v", err) + } + defer server.GracefulStop() + + // Create client connection + conn, err := grpc.Dial("localhost:1337", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer conn.Close() + + client := messages.NewControlPlaneClient(conn) + + // Prepare ApplyRequest with multiple Any messages + addItemAny, _ := anypb.New(&messages.AddItem{ItemId: 1, Quantity: 2}) + removeItemAny, _ := anypb.New(&messages.RemoveItem{Id: 1}) + req := &messages.ApplyRequest{ + Id: 123, + Messages: []*anypb.Any{addItemAny, removeItemAny}, + } + + // Call Apply + resp, err := client.Apply(context.Background(), req) + if err != nil { + t.Fatalf("Apply failed: %v", err) + } + + // Verify response + if !resp.Accepted { + t.Errorf("expected Accepted=true, got false") + } + + // Verify mutations were extracted and applied + if len(pool.applied) != 2 { + t.Errorf("expected 2 mutations applied, got %d", len(pool.applied)) + } + if addItem, ok := pool.applied[0].(*messages.AddItem); !ok || addItem.ItemId != 1 { + t.Errorf("expected AddItem with ItemId=1, got %v", pool.applied[0]) + } + if removeItem, ok := pool.applied[1].(*messages.RemoveItem); !ok || removeItem.Id != 1 { + t.Errorf("expected RemoveItem with Id=1, got %v", pool.applied[1]) + } +} diff --git a/pkg/actor/mutation_registry.go b/pkg/actor/mutation_registry.go index a53e98d..3b92cd2 100644 --- a/pkg/actor/mutation_registry.go +++ b/pkg/actor/mutation_registry.go @@ -7,8 +7,8 @@ import ( "reflect" "sync" - "github.com/gogo/protobuf/proto" "go.opentelemetry.io/otel/attribute" + "google.golang.org/protobuf/proto" ) type ApplyResult struct { diff --git a/pkg/actor/simple_grain_pool.go b/pkg/actor/simple_grain_pool.go index 4f673b6..ca1b277 100644 --- a/pkg/actor/simple_grain_pool.go +++ b/pkg/actor/simple_grain_pool.go @@ -8,7 +8,7 @@ import ( "sync" "time" - "github.com/gogo/protobuf/proto" + "google.golang.org/protobuf/proto" ) type SimpleGrainPool[V any] struct { diff --git a/pkg/actor/state.go b/pkg/actor/state.go index b1e70ac..17483b7 100644 --- a/pkg/actor/state.go +++ b/pkg/actor/state.go @@ -7,7 +7,7 @@ import ( "io" "time" - "github.com/gogo/protobuf/proto" + "google.golang.org/protobuf/proto" ) type StateStorage struct { diff --git a/pkg/cart/mutation_test.go b/pkg/cart/mutation_test.go index 1a9ccfc..5f129a2 100644 --- a/pkg/cart/mutation_test.go +++ b/pkg/cart/mutation_test.go @@ -10,8 +10,8 @@ import ( "testing" "time" - "github.com/gogo/protobuf/proto" "github.com/matst80/go-redis-inventory/pkg/inventory" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "git.k6n.net/go-cart-actor/pkg/actor" diff --git a/pkg/messages/control_plane.pb.go b/pkg/messages/control_plane.pb.go index d474c27..854a86a 100644 --- a/pkg/messages/control_plane.pb.go +++ b/pkg/messages/control_plane.pb.go @@ -9,6 +9,7 @@ package messages import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + anypb "google.golang.org/protobuf/types/known/anypb" reflect "reflect" sync "sync" unsafe "unsafe" @@ -454,7 +455,7 @@ func (x *ExpiryAnnounce) GetIds() []uint64 { type ApplyRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - Messages []*Mutation `protobuf:"bytes,2,rep,name=messages,proto3" json:"messages,omitempty"` + Messages []*anypb.Any `protobuf:"bytes,2,rep,name=messages,proto3" json:"messages,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -496,7 +497,7 @@ func (x *ApplyRequest) GetId() uint64 { return 0 } -func (x *ApplyRequest) GetMessages() []*Mutation { +func (x *ApplyRequest) GetMessages() []*anypb.Any { if x != nil { return x.Messages } @@ -552,73 +553,75 @@ var File_control_plane_proto protoreflect.FileDescriptor var file_control_plane_proto_rawDesc = string([]byte{ 0x0a, 0x13, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x1a, - 0x0e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, - 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x3c, 0x0a, 0x09, 0x50, 0x69, 0x6e, 0x67, - 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x75, 0x6e, 0x69, - 0x78, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x75, 0x6e, - 0x69, 0x78, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x33, 0x0a, 0x10, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, - 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x6b, 0x6e, - 0x6f, 0x77, 0x6e, 0x5f, 0x68, 0x6f, 0x73, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, - 0x0a, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x48, 0x6f, 0x73, 0x74, 0x73, 0x22, 0x26, 0x0a, 0x0e, 0x4e, - 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x14, 0x0a, - 0x05, 0x68, 0x6f, 0x73, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x68, 0x6f, - 0x73, 0x74, 0x73, 0x22, 0x21, 0x0a, 0x0d, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x64, 0x73, 0x52, - 0x65, 0x70, 0x6c, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x69, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, - 0x04, 0x52, 0x03, 0x69, 0x64, 0x73, 0x22, 0x46, 0x0a, 0x0e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, - 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x63, 0x63, 0x65, - 0x70, 0x74, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x61, 0x63, 0x63, 0x65, - 0x70, 0x74, 0x65, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x23, - 0x0a, 0x0d, 0x43, 0x6c, 0x6f, 0x73, 0x69, 0x6e, 0x67, 0x4e, 0x6f, 0x74, 0x69, 0x63, 0x65, 0x12, - 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, - 0x6f, 0x73, 0x74, 0x22, 0x39, 0x0a, 0x11, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, - 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, - 0x69, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x04, 0x52, 0x03, 0x69, 0x64, 0x73, 0x22, 0x36, - 0x0a, 0x0e, 0x45, 0x78, 0x70, 0x69, 0x72, 0x79, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, + 0x0e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, + 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x22, 0x3c, 0x0a, 0x09, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x69, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, - 0x04, 0x52, 0x03, 0x69, 0x64, 0x73, 0x22, 0x4e, 0x0a, 0x0c, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x04, 0x52, 0x02, 0x69, 0x64, 0x12, 0x2e, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x73, 0x2e, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0x29, 0x0a, 0x0b, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, - 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x65, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x65, - 0x64, 0x32, 0xc5, 0x03, 0x0a, 0x0c, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x50, 0x6c, 0x61, - 0x6e, 0x65, 0x12, 0x2c, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0f, 0x2e, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x13, 0x2e, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x70, 0x6c, 0x79, - 0x12, 0x41, 0x0a, 0x09, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x12, 0x1a, 0x2e, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, - 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, 0x65, - 0x70, 0x6c, 0x79, 0x12, 0x3c, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x41, - 0x63, 0x74, 0x6f, 0x72, 0x49, 0x64, 0x73, 0x12, 0x0f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x73, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x73, 0x2e, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x64, 0x73, 0x52, 0x65, 0x70, 0x6c, - 0x79, 0x12, 0x4a, 0x0a, 0x11, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x4f, 0x77, 0x6e, - 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x12, 0x1b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x41, 0x6e, 0x6e, 0x6f, 0x75, - 0x6e, 0x63, 0x65, 0x1a, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, - 0x77, 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, 0x36, 0x0a, - 0x05, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x12, 0x16, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x73, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, - 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x44, 0x0a, 0x0e, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, - 0x65, 0x45, 0x78, 0x70, 0x69, 0x72, 0x79, 0x12, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x73, 0x2e, 0x45, 0x78, 0x70, 0x69, 0x72, 0x79, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, - 0x65, 0x1a, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, - 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, 0x3c, 0x0a, 0x07, 0x43, - 0x6c, 0x6f, 0x73, 0x69, 0x6e, 0x67, 0x12, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x73, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x69, 0x6e, 0x67, 0x4e, 0x6f, 0x74, 0x69, 0x63, 0x65, 0x1a, - 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, - 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, - 0x2e, 0x6b, 0x36, 0x6e, 0x2e, 0x6e, 0x65, 0x74, 0x2f, 0x67, 0x6f, 0x2d, 0x63, 0x61, 0x72, 0x74, - 0x2d, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x68, 0x6f, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x75, 0x6e, 0x69, 0x78, 0x5f, 0x74, 0x69, 0x6d, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x75, 0x6e, 0x69, 0x78, 0x54, 0x69, 0x6d, + 0x65, 0x22, 0x33, 0x0a, 0x10, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x5f, 0x68, + 0x6f, 0x73, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0a, 0x6b, 0x6e, 0x6f, 0x77, + 0x6e, 0x48, 0x6f, 0x73, 0x74, 0x73, 0x22, 0x26, 0x0a, 0x0e, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, + 0x61, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x68, 0x6f, 0x73, 0x74, + 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x68, 0x6f, 0x73, 0x74, 0x73, 0x22, 0x21, + 0x0a, 0x0d, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x64, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, + 0x10, 0x0a, 0x03, 0x69, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x04, 0x52, 0x03, 0x69, 0x64, + 0x73, 0x22, 0x46, 0x0a, 0x0e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, + 0x41, 0x63, 0x6b, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x65, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x65, 0x64, 0x12, + 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x23, 0x0a, 0x0d, 0x43, 0x6c, 0x6f, + 0x73, 0x69, 0x6e, 0x67, 0x4e, 0x6f, 0x74, 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, + 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x22, 0x39, + 0x0a, 0x11, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x41, 0x6e, 0x6e, 0x6f, 0x75, + 0x6e, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x69, 0x64, 0x73, 0x18, 0x02, + 0x20, 0x03, 0x28, 0x04, 0x52, 0x03, 0x69, 0x64, 0x73, 0x22, 0x36, 0x0a, 0x0e, 0x45, 0x78, 0x70, + 0x69, 0x72, 0x79, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x68, + 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, + 0x10, 0x0a, 0x03, 0x69, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x04, 0x52, 0x03, 0x69, 0x64, + 0x73, 0x22, 0x50, 0x0a, 0x0c, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x02, 0x69, + 0x64, 0x12, 0x30, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x02, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x73, 0x22, 0x29, 0x0a, 0x0b, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x65, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x65, 0x64, 0x32, 0xc5, + 0x03, 0x0a, 0x0c, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x50, 0x6c, 0x61, 0x6e, 0x65, 0x12, + 0x2c, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x73, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x73, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x41, 0x0a, + 0x09, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x12, 0x1a, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x73, 0x2e, 0x4e, 0x65, 0x67, 0x6f, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, + 0x12, 0x3c, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x41, 0x63, 0x74, 0x6f, + 0x72, 0x49, 0x64, 0x73, 0x12, 0x0f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, + 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, + 0x2e, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x64, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x4a, + 0x0a, 0x11, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x73, + 0x68, 0x69, 0x70, 0x12, 0x1b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, + 0x77, 0x6e, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, + 0x1a, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, + 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, 0x36, 0x0a, 0x05, 0x41, 0x70, + 0x70, 0x6c, 0x79, 0x12, 0x16, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x41, + 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x52, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x12, 0x44, 0x0a, 0x0e, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x45, 0x78, + 0x70, 0x69, 0x72, 0x79, 0x12, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, + 0x45, 0x78, 0x70, 0x69, 0x72, 0x79, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x1a, 0x18, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, + 0x68, 0x61, 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x12, 0x3c, 0x0a, 0x07, 0x43, 0x6c, 0x6f, 0x73, + 0x69, 0x6e, 0x67, 0x12, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x43, + 0x6c, 0x6f, 0x73, 0x69, 0x6e, 0x67, 0x4e, 0x6f, 0x74, 0x69, 0x63, 0x65, 0x1a, 0x18, 0x2e, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x43, 0x68, 0x61, + 0x6e, 0x67, 0x65, 0x41, 0x63, 0x6b, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x2e, 0x6b, 0x36, + 0x6e, 0x2e, 0x6e, 0x65, 0x74, 0x2f, 0x67, 0x6f, 0x2d, 0x63, 0x61, 0x72, 0x74, 0x2d, 0x61, 0x63, + 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, }) var ( @@ -646,10 +649,10 @@ var file_control_plane_proto_goTypes = []any{ (*ExpiryAnnounce)(nil), // 8: messages.ExpiryAnnounce (*ApplyRequest)(nil), // 9: messages.ApplyRequest (*ApplyResult)(nil), // 10: messages.ApplyResult - (*Mutation)(nil), // 11: messages.Mutation + (*anypb.Any)(nil), // 11: google.protobuf.Any } var file_control_plane_proto_depIdxs = []int32{ - 11, // 0: messages.ApplyRequest.messages:type_name -> messages.Mutation + 11, // 0: messages.ApplyRequest.messages:type_name -> google.protobuf.Any 0, // 1: messages.ControlPlane.Ping:input_type -> messages.Empty 2, // 2: messages.ControlPlane.Negotiate:input_type -> messages.NegotiateRequest 0, // 3: messages.ControlPlane.GetLocalActorIds:input_type -> messages.Empty diff --git a/pkg/proxy/remotehost.go b/pkg/proxy/remotehost.go index 960c606..cf1a85b 100644 --- a/pkg/proxy/remotehost.go +++ b/pkg/proxy/remotehost.go @@ -9,13 +9,14 @@ import ( "net/http" "time" - messages "git.k6n.net/go-cart-actor/pkg/messages" - "github.com/gogo/protobuf/proto" + "git.k6n.net/go-cart-actor/pkg/messages" "go.opentelemetry.io/contrib/bridges/otelslog" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" ) // RemoteHost mirrors the lightweight controller used for remote node @@ -105,60 +106,13 @@ func (h *RemoteHost) Apply(ctx context.Context, id uint64, mutation ...proto.Mes ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - toSend := make([]*messages.Mutation, len(mutation)) + toSend := make([]*anypb.Any, len(mutation)) for i, msg := range mutation { - switch m := msg.(type) { - case *messages.ClearCartRequest: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_ClearCart{ClearCart: m}} - case *messages.AddItem: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_AddItem{AddItem: m}} - case *messages.RemoveItem: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_RemoveItem{RemoveItem: m}} - case *messages.ChangeQuantity: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_ChangeQuantity{ChangeQuantity: m}} - case *messages.SetDelivery: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_SetDelivery{SetDelivery: m}} - case *messages.SetPickupPoint: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_SetPickupPoint{SetPickupPoint: m}} - case *messages.RemoveDelivery: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_RemoveDelivery{RemoveDelivery: m}} - case *messages.SetUserId: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_SetUserId{SetUserId: m}} - case *messages.LineItemMarking: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_LineItemMarking{LineItemMarking: m}} - case *messages.RemoveLineItemMarking: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_RemoveLineItemMarking{RemoveLineItemMarking: m}} - case *messages.SubscriptionAdded: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_SubscriptionAdded{SubscriptionAdded: m}} - case *messages.PaymentDeclined: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_PaymentDeclined{PaymentDeclined: m}} - case *messages.ConfirmationViewed: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_ConfirmationViewed{ConfirmationViewed: m}} - case *messages.CreateCheckoutOrder: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_CreateCheckoutOrder{CreateCheckoutOrder: m}} - case *messages.OrderCreated: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_OrderCreated{OrderCreated: m}} - case *messages.Noop: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_Noop{Noop: m}} - case *messages.InitializeCheckout: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_InitializeCheckout{InitializeCheckout: m}} - case *messages.InventoryReserved: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_InventoryReserved{InventoryReserved: m}} - case *messages.AddVoucher: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_AddVoucher{AddVoucher: m}} - case *messages.RemoveVoucher: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_RemoveVoucher{RemoveVoucher: m}} - case *messages.UpsertSubscriptionDetails: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_UpsertSubscriptionDetails{UpsertSubscriptionDetails: m}} - case *messages.PreConditionFailed: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_PreConditionFailed{PreConditionFailed: m}} - case *messages.AddGiftcard: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_AddGiftcard{AddGiftcard: m}} - case *messages.RemoveGiftcard: - toSend[i] = &messages.Mutation{Type: &messages.Mutation_RemoveGiftcard{RemoveGiftcard: m}} - default: - toSend[i] = nil + anyMsg, err := anypb.New(msg) + if err != nil { + return false, fmt.Errorf("failed to pack message: %w", err) } + toSend[i] = anyMsg } resp, err := h.controlClient.Apply(ctx, &messages.ApplyRequest{ diff --git a/proto/control_plane.proto b/proto/control_plane.proto index 16b4aa8..cdeb587 100644 --- a/proto/control_plane.proto +++ b/proto/control_plane.proto @@ -5,6 +5,7 @@ package messages; option go_package = "git.k6n.net/go-cart-actor/proto;messages"; import "messages.proto"; +import "google/protobuf/any.proto"; // ----------------------------------------------------------------------------- // Control Plane gRPC API @@ -70,7 +71,7 @@ message ExpiryAnnounce { message ApplyRequest { uint64 id = 1; - repeated Mutation messages = 2; + repeated google.protobuf.Any messages = 2; } message ApplyResult {