add function to apply mutations over grpc
This commit is contained in:
@@ -35,6 +35,7 @@ type Host interface {
|
||||
Negotiate(otherHosts []string) ([]string, error)
|
||||
Name() string
|
||||
Proxy(id uint64, w http.ResponseWriter, r *http.Request) (bool, error)
|
||||
Apply(ctx context.Context, id uint64, mutation ...proto.Message) (bool, error)
|
||||
GetActorIds() []uint64
|
||||
Close() error
|
||||
Ping() bool
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
messages "git.k6n.net/go-cart-actor/pkg/messages"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"go.opentelemetry.io/contrib/bridges/otelslog"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
@@ -139,6 +140,69 @@ func (s *ControlServer[V]) Ping(ctx context.Context, _ *messages.Empty) (*messag
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *ControlServer[V]) Apply(ctx context.Context, in *messages.ApplyRequest) (*messages.ApplyResult, error) {
|
||||
msgs := make([]proto.Message, len(in.Messages))
|
||||
for i, mut := range in.Messages {
|
||||
var msg proto.Message
|
||||
if m := mut.GetClearCart(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetAddItem(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetRemoveItem(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetChangeQuantity(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetSetDelivery(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetSetPickupPoint(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetRemoveDelivery(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetSetUserId(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetLineItemMarking(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetRemoveLineItemMarking(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetSubscriptionAdded(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetPaymentDeclined(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetConfirmationViewed(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetCreateCheckoutOrder(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetOrderCreated(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetNoop(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetInitializeCheckout(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetInventoryReserved(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetAddVoucher(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetRemoveVoucher(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetUpsertSubscriptionDetails(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetPreConditionFailed(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetAddGiftcard(); m != nil {
|
||||
msg = m
|
||||
} else if m := mut.GetRemoveGiftcard(); m != nil {
|
||||
msg = m
|
||||
}
|
||||
msgs[i] = msg
|
||||
}
|
||||
_, err := s.pool.Apply(ctx, in.Id, msgs...)
|
||||
if err != nil {
|
||||
return &messages.ApplyResult{Accepted: false}, err
|
||||
}
|
||||
|
||||
return &messages.ApplyResult{Accepted: true}, nil
|
||||
}
|
||||
|
||||
// ControlPlane: Negotiate (merge host views)
|
||||
func (s *ControlServer[V]) Negotiate(ctx context.Context, req *messages.NegotiateRequest) (*messages.NegotiateReply, error) {
|
||||
ctx, span := tracer.Start(ctx, "grpc_negotiate")
|
||||
|
||||
@@ -212,9 +212,9 @@ func (r *ProtoMutationRegistry) Apply(ctx context.Context, grain any, msg ...pro
|
||||
}
|
||||
|
||||
for _, m := range msg {
|
||||
// Ignore nil mutation elements (untyped or typed nil pointers) silently; they carry no data.
|
||||
// Error if any mutation element is nil.
|
||||
if m == nil {
|
||||
continue
|
||||
return results, fmt.Errorf("nil mutation message")
|
||||
}
|
||||
// Typed nil: interface holds concrete proto message type whose pointer value is nil.
|
||||
rv := reflect.ValueOf(m)
|
||||
@@ -251,6 +251,12 @@ func (r *ProtoMutationRegistry) Apply(ctx context.Context, grain any, msg ...pro
|
||||
}
|
||||
}
|
||||
}
|
||||
// Return error for unregistered mutations
|
||||
for _, res := range results {
|
||||
if res.Error == ErrMutationNotRegistered {
|
||||
return results, res.Error
|
||||
}
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ package actor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
"slices"
|
||||
"testing"
|
||||
@@ -90,7 +89,7 @@ func TestRegisteredMutationBasics(t *testing.T) {
|
||||
}
|
||||
|
||||
// Apply nil grain
|
||||
if _, err := reg.Apply(nil, add); err == nil {
|
||||
if _, err := reg.Apply(context.Background(), nil, add); err == nil {
|
||||
t.Fatalf("expected error for nil grain")
|
||||
}
|
||||
|
||||
@@ -100,7 +99,8 @@ func TestRegisteredMutationBasics(t *testing.T) {
|
||||
}
|
||||
|
||||
// Apply unregistered message
|
||||
if _, err := reg.Apply(context.Background(), state, &messages.Noop{}); !errors.Is(err, ErrMutationNotRegistered) {
|
||||
_, err := reg.Apply(context.Background(), state, &messages.Noop{})
|
||||
if err != ErrMutationNotRegistered {
|
||||
t.Fatalf("expected ErrMutationNotRegistered, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user