feature/pubsub #7

Merged
mats merged 67 commits from feature/pubsub into main 2025-11-28 17:45:22 +01:00
2 changed files with 56 additions and 6 deletions
Showing only changes of commit 95426acd4a - Show all commits

View File

@@ -43,9 +43,10 @@ var (
type PoolServer struct {
actor.GrainPool[*cart.CartGrain]
pod_name string
klarnaClient *KlarnaClient
inventoryService inventory.InventoryService
pod_name string
klarnaClient *KlarnaClient
inventoryService inventory.InventoryService
reservationService inventory.CartReservationService
}
func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient, inventoryService inventory.InventoryService, inventoryRedisClient *redis.Client) *PoolServer {
@@ -266,6 +267,48 @@ type AddRequest struct {
StoreId *string `json:"storeId"`
}
func (s *PoolServer) GetReservationTime(item *messages.AddItem) time.Duration {
return time.Minute * 15
//return nil
}
func (s *PoolServer) HandleReservations(ctx context.Context, cartId cart.CartId, msgs ...*messages.AddItem) error {
if s.reservationService == nil {
return nil
}
for _, item := range msgs {
timeout := s.GetReservationTime(item)
if timeout == 0 {
continue
}
span := trace.SpanFromContext(ctx)
locationId := inventory.LocationID("se")
if item.StoreId != nil {
locationId = inventory.LocationID(*item.StoreId)
}
span.AddEvent("reserving item", trace.WithAttributes(attribute.String("sku", item.Sku), attribute.String("locationId", string(locationId))))
end := time.Now().Add(timeout)
err := s.reservationService.ReserveForCart(ctx, inventory.CartReserveRequest{
CartID: inventory.CartID(cartId.String()),
InventoryReference: &inventory.InventoryReference{
LocationID: locationId,
SKU: inventory.SKU(item.Sku),
},
Quantity: uint32(item.Quantity),
TTL: 15 * time.Minute,
})
if err != nil {
return err
}
logger.InfoContext(ctx, "reserved item", "sku", item.Sku, "location", string(locationId), "expires at", end.String())
span.End()
item.ReservationEndsTime = end
}
return nil
}
func (s *PoolServer) AddSkuRequestHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
addRequest := AddRequest{Quantity: 1}
err := json.NewDecoder(r.Body).Decode(&addRequest)
@@ -275,6 +318,9 @@ func (s *PoolServer) AddSkuRequestHandler(w http.ResponseWriter, r *http.Request
msg, err := GetItemAddMessage(r.Context(), addRequest.Sku, int(addRequest.Quantity), addRequest.Country, addRequest.StoreId)
if err != nil {
return err
}
if s.reservationService != nil {
}
reply, err := s.ApplyLocal(r.Context(), id, msg)
if err != nil {
@@ -331,9 +377,11 @@ func getInventoryRequests(items []*cart.CartItem) []inventory.ReserveRequest {
continue
}
requests = append(requests, inventory.ReserveRequest{
SKU: inventory.SKU(item.Sku),
LocationID: getLocationId(item),
Quantity: uint32(item.Quantity),
InventoryReference: &inventory.InventoryReference{
SKU: inventory.SKU(item.Sku),
LocationID: getLocationId(item),
},
Quantity: uint32(item.Quantity),
})
}
return requests

View File

@@ -3,6 +3,7 @@ package messages;
option go_package = "git.k6n.net/go-cart-actor/proto;messages";
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
message ClearCartRequest {}
@@ -32,6 +33,7 @@ message AddItem {
optional string storeId = 22;
optional uint32 parentId = 23;
string cgm = 25;
optional google.protobuf.Timestamp reservationEndTime = 26
}
message RemoveItem { uint32 Id = 1; }