139 lines
4.2 KiB
Go
139 lines
4.2 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
|
|
messages "git.tornberg.me/go-cart-actor/proto"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/reflection"
|
|
)
|
|
|
|
// cartActorGRPCServer implements the CartActor and ControlPlane gRPC services.
|
|
// It delegates cart operations to a grain pool and cluster operations to a synced pool.
|
|
type cartActorGRPCServer struct {
|
|
messages.UnimplementedCartActorServer
|
|
messages.UnimplementedControlPlaneServer
|
|
|
|
pool GrainPool // For cart state mutations and queries
|
|
syncedPool *SyncedPool // For cluster membership and control
|
|
}
|
|
|
|
// NewCartActorGRPCServer creates and initializes the server.
|
|
func NewCartActorGRPCServer(pool GrainPool, syncedPool *SyncedPool) *cartActorGRPCServer {
|
|
return &cartActorGRPCServer{
|
|
pool: pool,
|
|
syncedPool: syncedPool,
|
|
}
|
|
}
|
|
|
|
// Mutate applies a mutation from an envelope to the corresponding cart grain.
|
|
func (s *cartActorGRPCServer) Mutate(ctx context.Context, envelope *messages.MutationEnvelope) (*messages.MutationReply, error) {
|
|
if envelope.GetCartId() == "" {
|
|
return &messages.MutationReply{
|
|
StatusCode: 400,
|
|
Result: &messages.MutationReply_Error{Error: "cart_id is required"},
|
|
}, nil
|
|
}
|
|
cartID := ToCartId(envelope.GetCartId())
|
|
|
|
var mutation interface{}
|
|
switch m := envelope.Mutation.(type) {
|
|
case *messages.MutationEnvelope_AddRequest:
|
|
mutation = m.AddRequest
|
|
case *messages.MutationEnvelope_AddItem:
|
|
mutation = m.AddItem
|
|
case *messages.MutationEnvelope_RemoveItem:
|
|
mutation = m.RemoveItem
|
|
case *messages.MutationEnvelope_RemoveDelivery:
|
|
mutation = m.RemoveDelivery
|
|
case *messages.MutationEnvelope_ChangeQuantity:
|
|
mutation = m.ChangeQuantity
|
|
case *messages.MutationEnvelope_SetDelivery:
|
|
mutation = m.SetDelivery
|
|
case *messages.MutationEnvelope_SetPickupPoint:
|
|
mutation = m.SetPickupPoint
|
|
case *messages.MutationEnvelope_CreateCheckoutOrder:
|
|
mutation = m.CreateCheckoutOrder
|
|
case *messages.MutationEnvelope_SetCartItems:
|
|
mutation = m.SetCartItems
|
|
case *messages.MutationEnvelope_OrderCompleted:
|
|
mutation = m.OrderCompleted
|
|
default:
|
|
return &messages.MutationReply{
|
|
StatusCode: 400,
|
|
Result: &messages.MutationReply_Error{Error: fmt.Sprintf("unsupported mutation type: %T", m)},
|
|
}, nil
|
|
}
|
|
|
|
// Delegate the mutation to the grain pool.
|
|
// The pool is responsible for routing it to the correct grain (local or remote).
|
|
grain, err := s.pool.Process(cartID, mutation)
|
|
if err != nil {
|
|
return &messages.MutationReply{
|
|
StatusCode: 500,
|
|
Result: &messages.MutationReply_Error{Error: err.Error()},
|
|
}, nil
|
|
}
|
|
|
|
// Map the internal grain state to the protobuf representation.
|
|
cartState := ToCartState(grain)
|
|
|
|
return &messages.MutationReply{
|
|
StatusCode: 200,
|
|
Result: &messages.MutationReply_State{State: cartState},
|
|
}, nil
|
|
}
|
|
|
|
// GetState retrieves the current state of a cart grain.
|
|
func (s *cartActorGRPCServer) GetState(ctx context.Context, req *messages.StateRequest) (*messages.StateReply, error) {
|
|
if req.GetCartId() == "" {
|
|
return &messages.StateReply{
|
|
StatusCode: 400,
|
|
Result: &messages.StateReply_Error{Error: "cart_id is required"},
|
|
}, nil
|
|
}
|
|
cartID := ToCartId(req.GetCartId())
|
|
|
|
grain, err := s.pool.Get(cartID)
|
|
if err != nil {
|
|
return &messages.StateReply{
|
|
StatusCode: 500,
|
|
Result: &messages.StateReply_Error{Error: err.Error()},
|
|
}, nil
|
|
}
|
|
|
|
cartState := ToCartState(grain)
|
|
|
|
return &messages.StateReply{
|
|
StatusCode: 200,
|
|
Result: &messages.StateReply_State{State: cartState},
|
|
}, nil
|
|
}
|
|
|
|
// StartGRPCServer configures and starts the unified gRPC server on the given address.
|
|
// It registers both the CartActor and ControlPlane services.
|
|
func StartGRPCServer(addr string, pool GrainPool, syncedPool *SyncedPool) (*grpc.Server, error) {
|
|
lis, err := net.Listen("tcp", addr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to listen: %w", err)
|
|
}
|
|
|
|
grpcServer := grpc.NewServer()
|
|
server := NewCartActorGRPCServer(pool, syncedPool)
|
|
|
|
messages.RegisterCartActorServer(grpcServer, server)
|
|
messages.RegisterControlPlaneServer(grpcServer, server)
|
|
reflection.Register(grpcServer)
|
|
|
|
log.Printf("gRPC server listening on %s", addr)
|
|
go func() {
|
|
if err := grpcServer.Serve(lis); err != nil {
|
|
log.Fatalf("failed to serve gRPC: %v", err)
|
|
}
|
|
}()
|
|
|
|
return grpcServer, nil
|
|
} |