Files
go-cart-actor/cmd/cart/pool-server.go
2025-12-02 23:23:59 +01:00

540 lines
17 KiB
Go

package main
import (
"context"
"encoding/json"
"io"
"log"
"net/http"
"strconv"
"sync"
"time"
"git.k6n.net/go-cart-actor/pkg/actor"
"git.k6n.net/go-cart-actor/pkg/cart"
messages "git.k6n.net/go-cart-actor/proto/cart"
"git.k6n.net/go-cart-actor/pkg/voucher"
"github.com/matst80/go-redis-inventory/pkg/inventory"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)
var (
grainMutations = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_mutations_total",
Help: "The total number of mutations",
})
grainLookups = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_lookups_total",
Help: "The total number of lookups",
})
)
type PoolServer struct {
actor.GrainPool[*cart.CartGrain]
pod_name string
inventoryService inventory.InventoryService
reservationService inventory.CartReservationService
}
func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, inventoryService inventory.InventoryService, inventoryReservationService inventory.CartReservationService) *PoolServer {
srv := &PoolServer{
GrainPool: pool,
pod_name: pod_name,
inventoryService: inventoryService,
reservationService: inventoryReservationService,
}
return srv
}
func (s *PoolServer) ApplyLocal(ctx context.Context, id cart.CartId, mutation ...proto.Message) (*actor.MutationResult[*cart.CartGrain], error) {
return s.Apply(ctx, uint64(id), mutation...)
}
func (s *PoolServer) GetCartHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
grain, err := s.Get(r.Context(), uint64(id))
if err != nil {
return err
}
return s.WriteResult(w, grain)
}
func (s *PoolServer) AddSkuToCartHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
sku := r.PathValue("sku")
msg, err := GetItemAddMessage(r.Context(), sku, 1, getCountryFromHost(r.Host), nil)
if err != nil {
return err
}
data, err := s.ApplyLocal(r.Context(), id, msg)
if err != nil {
return err
}
grainMutations.Add(float64(len(data.Mutations)))
return s.WriteResult(w, data)
}
func (s *PoolServer) WriteResult(w http.ResponseWriter, result any) error {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("X-Pod-Name", s.pod_name)
if result == nil {
w.WriteHeader(http.StatusInternalServerError)
return nil
}
w.WriteHeader(http.StatusOK)
enc := json.NewEncoder(w)
err := enc.Encode(result)
return err
}
func (s *PoolServer) DeleteItemHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
itemIdString := r.PathValue("itemId")
itemId, err := strconv.ParseInt(itemIdString, 10, 64)
if err != nil {
return err
}
data, err := s.ApplyLocal(r.Context(), id, &messages.RemoveItem{Id: uint32(itemId)})
if err != nil {
return err
}
return s.WriteResult(w, data)
}
func (s *PoolServer) QuantityChangeHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
changeQuantity := messages.ChangeQuantity{}
err := json.NewDecoder(r.Body).Decode(&changeQuantity)
if err != nil {
return err
}
reply, err := s.ApplyLocal(r.Context(), id, &changeQuantity)
if err != nil {
return err
}
return s.WriteResult(w, reply)
}
type Item struct {
Sku string `json:"sku"`
Quantity int `json:"quantity"`
StoreId *string `json:"storeId,omitempty"`
}
type SetCartItems struct {
Country string `json:"country"`
Items []Item `json:"items"`
}
func getMultipleAddMessages(ctx context.Context, items []Item, country string) []proto.Message {
wg := sync.WaitGroup{}
mu := sync.Mutex{}
msgs := make([]proto.Message, 0, len(items))
for _, itm := range items {
wg.Go(
func() {
msg, err := GetItemAddMessage(ctx, itm.Sku, itm.Quantity, country, itm.StoreId)
if err != nil {
log.Printf("error adding item %s: %v", itm.Sku, err)
return
}
mu.Lock()
msgs = append(msgs, msg)
mu.Unlock()
})
}
wg.Wait()
return msgs
}
func (s *PoolServer) SetCartItemsHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
setCartItems := SetCartItems{}
err := json.NewDecoder(r.Body).Decode(&setCartItems)
if err != nil {
return err
}
msgs := make([]proto.Message, 0, len(setCartItems.Items)+1)
msgs = append(msgs, &messages.ClearCartRequest{})
msgs = append(msgs, getMultipleAddMessages(r.Context(), setCartItems.Items, setCartItems.Country)...)
reply, err := s.ApplyLocal(r.Context(), id, msgs...)
if err != nil {
return err
}
return s.WriteResult(w, reply)
}
func (s *PoolServer) AddMultipleItemHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
setCartItems := SetCartItems{}
err := json.NewDecoder(r.Body).Decode(&setCartItems)
if err != nil {
return err
}
msgs := getMultipleAddMessages(r.Context(), setCartItems.Items, setCartItems.Country)
reply, err := s.ApplyLocal(r.Context(), id, msgs...)
if err != nil {
return err
}
return s.WriteResult(w, reply)
}
type AddRequest struct {
Sku string `json:"sku"`
Quantity int32 `json:"quantity"`
Country string `json:"country"`
StoreId *string `json:"storeId"`
}
func (s *PoolServer) GetReservationTime(item *messages.AddItem) time.Duration {
// TODO: Implement reservation time calculation, nil don't require reservation
return time.Minute * 15
}
func (s *PoolServer) AddSkuRequestHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
addRequest := AddRequest{Quantity: 1}
err := json.NewDecoder(r.Body).Decode(&addRequest)
if err != nil {
return err
}
msg, err := GetItemAddMessage(r.Context(), addRequest.Sku, int(addRequest.Quantity), addRequest.Country, addRequest.StoreId)
if err != nil {
return err
}
reply, err := s.ApplyLocal(r.Context(), id, msg)
if err != nil {
return err
}
return s.WriteResult(w, reply)
}
// func (s *PoolServer) HandleConfirmation(w http.ResponseWriter, r *http.Request, id CartId) error {
// orderId := r.PathValue("orderId")
// if orderId == "" {
// return fmt.Errorf("orderId is empty")
// }
// order, err := KlarnaInstance.GetOrder(orderId)
// if err != nil {
// return err
// }
// w.Header().Set("Content-Type", "application/json")
// w.Header().Set("X-Pod-Name", s.pod_name)
// w.Header().Set("Cache-Control", "no-cache")
// w.Header().Set("Access-Control-Allow-Origin", "*")
// w.WriteHeader(http.StatusOK)
// return json.NewEncoder(w).Encode(order)
// }
// func (s *PoolServer) HandleCheckout(w http.ResponseWriter, r *http.Request, id CartId) error {
// klarnaOrder, err := s.CreateOrUpdateCheckout(r.Host, id)
// if err != nil {
// return err
// }
// s.ApplyCheckoutStarted(klarnaOrder, id)
// w.Header().Set("Content-Type", "application/json")
// return json.NewEncoder(w).Encode(klarnaOrder)
// }
//
// Removed leftover legacy block after CookieCartIdHandler (obsolete code referencing cid/legacy)
func (s *PoolServer) RemoveCartCookie(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
// Clear cart cookie (breaking change: do not issue a new legacy id here)
http.SetCookie(w, &http.Cookie{
Name: "cartid",
Value: "",
Path: "/",
Secure: r.TLS != nil,
HttpOnly: true,
Expires: time.Unix(0, 0),
SameSite: http.SameSiteLaxMode,
})
w.WriteHeader(http.StatusOK)
return nil
}
func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error) func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error {
return func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error {
if ownerHost, ok := s.OwnerHost(uint64(cartId)); ok {
ctx, span := tracer.Start(r.Context(), "proxy")
defer span.End()
span.SetAttributes(attribute.String("cartid", cartId.String()))
hostAttr := attribute.String("other host", ownerHost.Name())
span.SetAttributes(hostAttr)
logger.InfoContext(ctx, "cart proxyed", "result", ownerHost.Name())
proxyCalls.Add(ctx, 1, metric.WithAttributes(hostAttr))
handled, err := ownerHost.Proxy(uint64(cartId), w, r, nil)
grainLookups.Inc()
if err == nil && handled {
return nil
}
}
_, span := tracer.Start(r.Context(), "own")
span.SetAttributes(attribute.String("cartid", cartId.String()))
defer span.End()
return fn(w, r, cartId)
}
}
var (
tracer = otel.Tracer(name)
meter = otel.Meter(name)
logger = otelslog.NewLogger(name)
proxyCalls metric.Int64Counter
)
func init() {
var err error
proxyCalls, err = meter.Int64Counter("proxy.calls",
metric.WithDescription("Number of proxy calls"),
metric.WithUnit("{calls}"))
if err != nil {
panic(err)
}
}
type AddVoucherRequest struct {
VoucherCode string `json:"code"`
}
func (s *PoolServer) AddVoucherHandler(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
data := &AddVoucherRequest{}
json.NewDecoder(r.Body).Decode(data)
v := voucher.Service{}
msg, err := v.GetVoucher(data.VoucherCode)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return err
}
reply, err := s.ApplyLocal(r.Context(), cartId, msg)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return err
}
s.WriteResult(w, reply)
return nil
}
type SubscriptionDetailsRequest struct {
Id *string `json:"id,omitempty"`
OfferingCode string `json:"offeringCode,omitempty"`
SigningType string `json:"signingType,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
}
func (sd *SubscriptionDetailsRequest) ToMessage() *messages.UpsertSubscriptionDetails {
return &messages.UpsertSubscriptionDetails{
Id: sd.Id,
OfferingCode: sd.OfferingCode,
SigningType: sd.SigningType,
Data: &anypb.Any{Value: sd.Data},
}
}
func (s *PoolServer) SubscriptionDetailsHandler(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
data := &SubscriptionDetailsRequest{}
err := json.NewDecoder(r.Body).Decode(data)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return err
}
reply, err := s.ApplyLocal(r.Context(), cartId, data.ToMessage())
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return err
}
s.WriteResult(w, reply)
return nil
}
func (s *PoolServer) RemoveVoucherHandler(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
idStr := r.PathValue("voucherId")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return err
}
reply, err := s.ApplyLocal(r.Context(), cartId, &messages.RemoveVoucher{Id: uint32(id)})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return err
}
s.WriteResult(w, reply)
return nil
}
func (s *PoolServer) SetUserIdHandler(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
setUserId := messages.SetUserId{}
err := json.NewDecoder(r.Body).Decode(&setUserId)
if err != nil {
return err
}
reply, err := s.ApplyLocal(r.Context(), cartId, &setUserId)
if err != nil {
return err
}
return s.WriteResult(w, reply)
}
func (s *PoolServer) LineItemMarkingHandler(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
itemIdStr := r.PathValue("itemId")
itemId, err := strconv.ParseInt(itemIdStr, 10, 64)
if err != nil {
return err
}
lineItemMarking := messages.LineItemMarking{Id: uint32(itemId)}
err = json.NewDecoder(r.Body).Decode(&lineItemMarking)
if err != nil {
return err
}
reply, err := s.ApplyLocal(r.Context(), cartId, &lineItemMarking)
if err != nil {
return err
}
return s.WriteResult(w, reply)
}
func (s *PoolServer) RemoveLineItemMarkingHandler(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
itemIdStr := r.PathValue("itemId")
itemId, err := strconv.ParseInt(itemIdStr, 10, 64)
if err != nil {
return err
}
removeLineItemMarking := messages.RemoveLineItemMarking{Id: uint32(itemId)}
reply, err := s.ApplyLocal(r.Context(), cartId, &removeLineItemMarking)
if err != nil {
return err
}
return s.WriteResult(w, reply)
}
func (s *PoolServer) InternalApplyMutationHandler(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return nil
}
data, err := io.ReadAll(r.Body)
if err != nil {
return err
}
mutation := &messages.Mutation{}
err = proto.Unmarshal(data, mutation)
if err != nil {
return err
}
reply, err := s.ApplyLocal(r.Context(), cartId, mutation)
if err != nil {
return err
}
return s.WriteResult(w, reply)
}
func (s *PoolServer) GetAnywhere(ctx context.Context, cartId cart.CartId) (*cart.CartGrain, error) {
id := uint64(cartId)
if host, isOnOtherHost := s.OwnerHost(id); isOnOtherHost {
grain := &cart.CartGrain{}
err := host.Get(ctx, id, grain)
return grain, err
}
return s.Get(ctx, id)
}
func (s *PoolServer) ApplyAnywhere(ctx context.Context, cartId cart.CartId, msgs ...proto.Message) error {
id := uint64(cartId)
if host, isOnOtherHost := s.OwnerHost(id); isOnOtherHost {
_, err := host.Apply(ctx, id, msgs...)
return err
}
_, err := s.Apply(ctx, id, msgs...)
return err
}
func (s *PoolServer) Serve(mux *http.ServeMux) {
// mux.HandleFunc("OPTIONS /cart", func(w http.ResponseWriter, r *http.Request) {
// w.Header().Set("Access-Control-Allow-Origin", "*")
// w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST, DELETE")
// w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
// w.WriteHeader(http.StatusOK)
// })
handleFunc := func(pattern string, handlerFunc func(http.ResponseWriter, *http.Request)) {
attr := attribute.String("http.route", pattern)
mux.HandleFunc(pattern, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
span := trace.SpanFromContext(r.Context())
span.SetName(pattern)
span.SetAttributes(attr)
labeler, _ := otelhttp.LabelerFromContext(r.Context())
labeler.Add(attr)
handlerFunc(w, r)
}))
}
handleFunc("GET /cart", CookieCartIdHandler(s.ProxyHandler(s.GetCartHandler)))
handleFunc("GET /cart/add/{sku}", CookieCartIdHandler(s.ProxyHandler(s.AddSkuToCartHandler)))
handleFunc("POST /cart/add", CookieCartIdHandler(s.ProxyHandler(s.AddMultipleItemHandler)))
handleFunc("POST /cart", CookieCartIdHandler(s.ProxyHandler(s.AddSkuRequestHandler)))
handleFunc("POST /cart/set", CookieCartIdHandler(s.ProxyHandler(s.SetCartItemsHandler)))
handleFunc("DELETE /cart/{itemId}", CookieCartIdHandler(s.ProxyHandler(s.DeleteItemHandler)))
handleFunc("PUT /cart", CookieCartIdHandler(s.ProxyHandler(s.QuantityChangeHandler)))
handleFunc("DELETE /cart", CookieCartIdHandler(s.ProxyHandler(s.RemoveCartCookie)))
handleFunc("PUT /cart/voucher", CookieCartIdHandler(s.ProxyHandler(s.AddVoucherHandler)))
handleFunc("PUT /cart/subscription-details", CookieCartIdHandler(s.ProxyHandler(s.SubscriptionDetailsHandler)))
handleFunc("DELETE /cart/voucher/{voucherId}", CookieCartIdHandler(s.ProxyHandler(s.RemoveVoucherHandler)))
handleFunc("PUT /cart/user", CookieCartIdHandler(s.ProxyHandler(s.SetUserIdHandler)))
handleFunc("PUT /cart/item/{itemId}/marking", CookieCartIdHandler(s.ProxyHandler(s.LineItemMarkingHandler)))
handleFunc("DELETE /cart/item/{itemId}/marking", CookieCartIdHandler(s.ProxyHandler(s.RemoveLineItemMarkingHandler)))
//mux.HandleFunc("GET /cart/checkout", CookieCartIdHandler(s.ProxyHandler(s.HandleCheckout)))
//mux.HandleFunc("GET /cart/confirmation/{orderId}", CookieCartIdHandler(s.ProxyHandler(s.HandleConfirmation)))
handleFunc("GET /cart/byid/{id}", CartIdHandler(s.ProxyHandler(s.GetCartHandler)))
handleFunc("POST /cart/byid/{id}", CartIdHandler(s.ProxyHandler(s.AddSkuRequestHandler)))
handleFunc("DELETE /cart/byid/{id}/{itemId}", CartIdHandler(s.ProxyHandler(s.DeleteItemHandler)))
handleFunc("PUT /cart/byid/{id}", CartIdHandler(s.ProxyHandler(s.QuantityChangeHandler)))
handleFunc("PUT /cart/byid/{id}/voucher", CookieCartIdHandler(s.ProxyHandler(s.AddVoucherHandler)))
handleFunc("DELETE /cart/byid/{id}/voucher/{voucherId}", CookieCartIdHandler(s.ProxyHandler(s.RemoveVoucherHandler)))
handleFunc("PUT /cart/byid/{id}/user", CartIdHandler(s.ProxyHandler(s.SetUserIdHandler)))
handleFunc("PUT /cart/byid/{id}/item/{itemId}/marking", CartIdHandler(s.ProxyHandler(s.LineItemMarkingHandler)))
handleFunc("DELETE /cart/byid/{id}/item/{itemId}/marking", CartIdHandler(s.ProxyHandler(s.RemoveLineItemMarkingHandler)))
}