diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 7226698..e4d8192 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -151,7 +151,12 @@ func main() { log.Fatalf("Error creating inventory service: %v\n", err) } - syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient, inventoryService, rdb) + inventoryReservationService, err := inventory.NewRedisCartReservationService(rdb) + if err != nil { + log.Fatalf("Error creating inventory reservation service: %v\n", err) + } + + syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient, inventoryService, inventoryReservationService) app := &App{ pool: pool, diff --git a/cmd/cart/pool-server.go b/cmd/cart/pool-server.go index 7f0e092..cd03c9e 100644 --- a/cmd/cart/pool-server.go +++ b/cmd/cart/pool-server.go @@ -19,7 +19,6 @@ import ( "github.com/matst80/go-redis-inventory/pkg/inventory" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/redis/go-redis/v9" "go.opentelemetry.io/contrib/bridges/otelslog" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "google.golang.org/protobuf/types/known/anypb" @@ -50,12 +49,13 @@ type PoolServer struct { reservationService inventory.CartReservationService } -func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient, inventoryService inventory.InventoryService, inventoryRedisClient *redis.Client) *PoolServer { +func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient, inventoryService inventory.InventoryService, inventoryReservationService inventory.CartReservationService) *PoolServer { srv := &PoolServer{ - GrainPool: pool, - pod_name: pod_name, - klarnaClient: klarnaClient, - inventoryService: inventoryService, + GrainPool: pool, + pod_name: pod_name, + klarnaClient: klarnaClient, + inventoryService: inventoryService, + reservationService: inventoryReservationService, } return srv @@ -80,6 +80,10 @@ func (s *PoolServer) AddSkuToCartHandler(w http.ResponseWriter, r *http.Request, if err != nil { return err } + err = s.HandleReservations(r.Context(), id, msg) + if err != nil { + return err + } data, err := s.ApplyLocal(r.Context(), id, msg) if err != nil { return err @@ -254,7 +258,14 @@ func (s *PoolServer) AddMultipleItemHandler(w http.ResponseWriter, r *http.Reque return err } - reply, err := s.ApplyLocal(r.Context(), id, getMultipleAddMessages(r.Context(), setCartItems.Items, setCartItems.Country)...) + msgs := getMultipleAddMessages(r.Context(), setCartItems.Items, setCartItems.Country) + + err = s.HandleReservations(r.Context(), id, msgs...) + if err != nil { + return err + } + + reply, err := s.ApplyLocal(r.Context(), id, msgs...) if err != nil { return err } @@ -274,11 +285,16 @@ func (s *PoolServer) GetReservationTime(item *messages.AddItem) time.Duration { //return nil } -func (s *PoolServer) HandleReservations(ctx context.Context, cartId cart.CartId, msgs ...*messages.AddItem) error { +func (s *PoolServer) HandleReservations(ctx context.Context, cartId cart.CartId, msgs ...proto.Message) error { if s.reservationService == nil { return nil } - for _, item := range msgs { + for _, msg := range msgs { + item, ok := msg.(*messages.AddItem) + if !ok { + log.Printf("not an AddItem message, skipping reservation, was of type: %T", msg) + continue + } timeout := s.GetReservationTime(item) if timeout == 0 { continue @@ -290,6 +306,7 @@ func (s *PoolServer) HandleReservations(ctx context.Context, cartId cart.CartId, locationId = inventory.LocationID(*item.StoreId) } span.AddEvent("reserving item", trace.WithAttributes(attribute.String("sku", item.Sku), attribute.String("locationId", string(locationId)))) + log.Printf("reserving item %s at location %s for cart %s", item.Sku, string(locationId), cartId.String()) end := time.Now().Add(timeout) err := s.reservationService.ReserveForCart(ctx, inventory.CartReserveRequest{ CartID: inventory.CartID(cartId.String()), @@ -320,8 +337,9 @@ func (s *PoolServer) AddSkuRequestHandler(w http.ResponseWriter, r *http.Request if err != nil { return err } - if s.reservationService != nil { - + err = s.HandleReservations(r.Context(), id, msg) + if err != nil { + return err } reply, err := s.ApplyLocal(r.Context(), id, msg) if err != nil { diff --git a/deployment/deployment.yaml b/deployment/deployment.yaml index 3e17716..7f86486 100644 --- a/deployment/deployment.yaml +++ b/deployment/deployment.yaml @@ -462,3 +462,14 @@ spec: value: "redis.home:6379" - name: REDIS_PASSWORD value: "slaskredis" +--- +kind: Service +apiVersion: v1 +metadata: + name: inventory +spec: + selector: + app: cart-inventory + ports: + - name: web + port: 8080 \ No newline at end of file diff --git a/pkg/cart/mutation_add_item.go b/pkg/cart/mutation_add_item.go index 5b9b2a6..1d85024 100644 --- a/pkg/cart/mutation_add_item.go +++ b/pkg/cart/mutation_add_item.go @@ -38,16 +38,24 @@ func (c *CartMutationContext) AddItem(g *CartGrain, m *messages.AddItem) error { if !sameStore { continue } - existing.Quantity += int(m.Quantity) + if m.ReservationEndTime != nil { + t := m.ReservationEndTime.AsTime() + if existing.ReservationEndTime == nil || existing.ReservationEndTime.Before(m.ReservationEndTime.AsTime()) { + existing.ReservationEndTime = &t + existing.Quantity += int(m.Quantity) + } else { + existing.ReservationEndTime = &t + } + + } else { + existing.Quantity += int(m.Quantity) + } existing.Stock = uint16(m.Stock) // If existing had nil store but new has one, adopt it. if existing.StoreId == nil && m.StoreId != nil { existing.StoreId = m.StoreId } - if m.ReservationEndTime != nil { - t := m.ReservationEndTime.AsTime() - existing.ReservationEndTime = &t - } + return nil } diff --git a/pkg/cart/mutation_change_quantity.go b/pkg/cart/mutation_change_quantity.go index 9f30db2..8682e9f 100644 --- a/pkg/cart/mutation_change_quantity.go +++ b/pkg/cart/mutation_change_quantity.go @@ -48,8 +48,15 @@ func (c *CartMutationContext) ChangeQuantity(g *CartGrain, m *messages.ChangeQua g.UpdateTotals() return nil } - - g.Items[foundIndex].Quantity = int(m.Quantity) - g.UpdateTotals() + item := g.Items[foundIndex] + if item == nil { + return fmt.Errorf("ChangeQuantity: item id %d not found", m.Id) + } + if item.ReservationEndTime != nil { + return fmt.Errorf("ChangeQuantity: cannot change quantity of reserved item id %d", m.Id) + } else { + item.Quantity = int(m.Quantity) + g.UpdateTotals() + } return nil } diff --git a/pkg/cart/mutation_initialize_checkout.go b/pkg/cart/mutation_initialize_checkout.go index 0e3a14f..83ebff7 100644 --- a/pkg/cart/mutation_initialize_checkout.go +++ b/pkg/cart/mutation_initialize_checkout.go @@ -2,6 +2,7 @@ package cart import ( "fmt" + "time" messages "git.k6n.net/go-cart-actor/pkg/messages" ) @@ -36,6 +37,14 @@ func InitializeCheckout(g *CartGrain, m *messages.InitializeCheckout) error { if m.OrderId == "" { return fmt.Errorf("InitializeCheckout: missing orderId") } + now := time.Now() + for _, item := range g.Items { + if item.ReservationEndTime != nil { + if now.After(*item.ReservationEndTime) { + return fmt.Errorf("InitializeCheckout: item id %d reservation has expired", item.Id) + } + } + } g.OrderReference = m.OrderId g.PaymentStatus = m.Status