even more refactoring
Some checks failed
Build and Publish / BuildAndDeploy (push) Successful in 3m7s
Build and Publish / BuildAndDeployAmd64 (push) Has been cancelled

This commit is contained in:
matst80
2025-10-10 11:46:19 +00:00
parent 12d87036f6
commit 716f1121aa
32 changed files with 3857 additions and 953 deletions

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"log"
"net"
"time"
messages "git.tornberg.me/go-cart-actor/proto"
"google.golang.org/grpc"
@@ -17,7 +18,7 @@ type cartActorGRPCServer struct {
messages.UnimplementedCartActorServer
messages.UnimplementedControlPlaneServer
pool GrainPool // For cart state mutations and queries
pool GrainPool // For cart state mutations and queries
syncedPool *SyncedPool // For cluster membership and control
}
@@ -29,62 +30,125 @@ func NewCartActorGRPCServer(pool GrainPool, syncedPool *SyncedPool) *cartActorGR
}
}
// Mutate applies a mutation from an envelope to the corresponding cart grain.
func (s *cartActorGRPCServer) Mutate(ctx context.Context, envelope *messages.MutationEnvelope) (*messages.MutationReply, error) {
if envelope.GetCartId() == "" {
return &messages.MutationReply{
StatusCode: 400,
Result: &messages.MutationReply_Error{Error: "cart_id is required"},
}, nil
}
cartID := ToCartId(envelope.GetCartId())
var mutation interface{}
switch m := envelope.Mutation.(type) {
case *messages.MutationEnvelope_AddRequest:
mutation = m.AddRequest
case *messages.MutationEnvelope_AddItem:
mutation = m.AddItem
case *messages.MutationEnvelope_RemoveItem:
mutation = m.RemoveItem
case *messages.MutationEnvelope_RemoveDelivery:
mutation = m.RemoveDelivery
case *messages.MutationEnvelope_ChangeQuantity:
mutation = m.ChangeQuantity
case *messages.MutationEnvelope_SetDelivery:
mutation = m.SetDelivery
case *messages.MutationEnvelope_SetPickupPoint:
mutation = m.SetPickupPoint
case *messages.MutationEnvelope_CreateCheckoutOrder:
mutation = m.CreateCheckoutOrder
case *messages.MutationEnvelope_SetCartItems:
mutation = m.SetCartItems
case *messages.MutationEnvelope_OrderCompleted:
mutation = m.OrderCompleted
default:
return &messages.MutationReply{
StatusCode: 400,
Result: &messages.MutationReply_Error{Error: fmt.Sprintf("unsupported mutation type: %T", m)},
}, nil
}
// Delegate the mutation to the grain pool.
// The pool is responsible for routing it to the correct grain (local or remote).
grain, err := s.pool.Process(cartID, mutation)
// applyMutation routes a single cart mutation to the target grain (used by per-mutation RPC handlers).
func (s *cartActorGRPCServer) applyMutation(cartID string, mutation interface{}) *messages.CartMutationReply {
grain, err := s.pool.Apply(ToCartId(cartID), mutation)
if err != nil {
return &messages.MutationReply{
StatusCode: 500,
Result: &messages.MutationReply_Error{Error: err.Error()},
return &messages.CartMutationReply{
StatusCode: 500,
Result: &messages.CartMutationReply_Error{Error: err.Error()},
ServerTimestamp: time.Now().Unix(),
}
}
cartState := ToCartState(grain)
return &messages.CartMutationReply{
StatusCode: 200,
Result: &messages.CartMutationReply_State{State: cartState},
ServerTimestamp: time.Now().Unix(),
}
}
func (s *cartActorGRPCServer) AddRequest(ctx context.Context, req *messages.AddRequestRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
// Map the internal grain state to the protobuf representation.
cartState := ToCartState(grain)
func (s *cartActorGRPCServer) AddItem(ctx context.Context, req *messages.AddItemRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
return &messages.MutationReply{
StatusCode: 200,
Result: &messages.MutationReply_State{State: cartState},
}, nil
func (s *cartActorGRPCServer) RemoveItem(ctx context.Context, req *messages.RemoveItemRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
func (s *cartActorGRPCServer) RemoveDelivery(ctx context.Context, req *messages.RemoveDeliveryRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
func (s *cartActorGRPCServer) ChangeQuantity(ctx context.Context, req *messages.ChangeQuantityRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
func (s *cartActorGRPCServer) SetDelivery(ctx context.Context, req *messages.SetDeliveryRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
func (s *cartActorGRPCServer) SetPickupPoint(ctx context.Context, req *messages.SetPickupPointRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
/*
Checkout RPC removed. Checkout is handled at the HTTP layer (PoolServer.HandleCheckout).
*/
func (s *cartActorGRPCServer) SetCartItems(ctx context.Context, req *messages.SetCartItemsRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
func (s *cartActorGRPCServer) OrderCompleted(ctx context.Context, req *messages.OrderCompletedRequest) (*messages.CartMutationReply, error) {
if req.GetCartId() == "" {
return &messages.CartMutationReply{
StatusCode: 400,
Result: &messages.CartMutationReply_Error{Error: "cart_id is required"},
ServerTimestamp: time.Now().Unix(),
}, nil
}
return s.applyMutation(req.GetCartId(), req.GetPayload()), nil
}
// GetState retrieves the current state of a cart grain.
@@ -136,4 +200,4 @@ func StartGRPCServer(addr string, pool GrainPool, syncedPool *SyncedPool) (*grpc
}()
return grpcServer, nil
}
}