Merge branch 'feature/pubsub' of git-ssh.tornberg.me:mats/go-cart-actor into feature/pubsub

This commit is contained in:
matst80
2025-11-27 12:14:11 +01:00
6 changed files with 78 additions and 20 deletions

View File

@@ -151,7 +151,12 @@ func main() {
log.Fatalf("Error creating inventory service: %v\n", err) log.Fatalf("Error creating inventory service: %v\n", err)
} }
syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient, inventoryService, rdb) inventoryReservationService, err := inventory.NewRedisCartReservationService(rdb)
if err != nil {
log.Fatalf("Error creating inventory reservation service: %v\n", err)
}
syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient, inventoryService, inventoryReservationService)
app := &App{ app := &App{
pool: pool, pool: pool,

View File

@@ -19,7 +19,6 @@ import (
"github.com/matst80/go-redis-inventory/pkg/inventory" "github.com/matst80/go-redis-inventory/pkg/inventory"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/contrib/bridges/otelslog" "go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/anypb"
@@ -50,12 +49,13 @@ type PoolServer struct {
reservationService inventory.CartReservationService reservationService inventory.CartReservationService
} }
func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient, inventoryService inventory.InventoryService, inventoryRedisClient *redis.Client) *PoolServer { func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient, inventoryService inventory.InventoryService, inventoryReservationService inventory.CartReservationService) *PoolServer {
srv := &PoolServer{ srv := &PoolServer{
GrainPool: pool, GrainPool: pool,
pod_name: pod_name, pod_name: pod_name,
klarnaClient: klarnaClient, klarnaClient: klarnaClient,
inventoryService: inventoryService, inventoryService: inventoryService,
reservationService: inventoryReservationService,
} }
return srv return srv
@@ -80,6 +80,10 @@ func (s *PoolServer) AddSkuToCartHandler(w http.ResponseWriter, r *http.Request,
if err != nil { if err != nil {
return err return err
} }
err = s.HandleReservations(r.Context(), id, msg)
if err != nil {
return err
}
data, err := s.ApplyLocal(r.Context(), id, msg) data, err := s.ApplyLocal(r.Context(), id, msg)
if err != nil { if err != nil {
return err return err
@@ -254,7 +258,14 @@ func (s *PoolServer) AddMultipleItemHandler(w http.ResponseWriter, r *http.Reque
return err return err
} }
reply, err := s.ApplyLocal(r.Context(), id, getMultipleAddMessages(r.Context(), setCartItems.Items, setCartItems.Country)...) msgs := getMultipleAddMessages(r.Context(), setCartItems.Items, setCartItems.Country)
err = s.HandleReservations(r.Context(), id, msgs...)
if err != nil {
return err
}
reply, err := s.ApplyLocal(r.Context(), id, msgs...)
if err != nil { if err != nil {
return err return err
} }
@@ -274,11 +285,16 @@ func (s *PoolServer) GetReservationTime(item *messages.AddItem) time.Duration {
//return nil //return nil
} }
func (s *PoolServer) HandleReservations(ctx context.Context, cartId cart.CartId, msgs ...*messages.AddItem) error { func (s *PoolServer) HandleReservations(ctx context.Context, cartId cart.CartId, msgs ...proto.Message) error {
if s.reservationService == nil { if s.reservationService == nil {
return nil return nil
} }
for _, item := range msgs { for _, msg := range msgs {
item, ok := msg.(*messages.AddItem)
if !ok {
log.Printf("not an AddItem message, skipping reservation, was of type: %T", msg)
continue
}
timeout := s.GetReservationTime(item) timeout := s.GetReservationTime(item)
if timeout == 0 { if timeout == 0 {
continue continue
@@ -290,6 +306,7 @@ func (s *PoolServer) HandleReservations(ctx context.Context, cartId cart.CartId,
locationId = inventory.LocationID(*item.StoreId) locationId = inventory.LocationID(*item.StoreId)
} }
span.AddEvent("reserving item", trace.WithAttributes(attribute.String("sku", item.Sku), attribute.String("locationId", string(locationId)))) span.AddEvent("reserving item", trace.WithAttributes(attribute.String("sku", item.Sku), attribute.String("locationId", string(locationId))))
log.Printf("reserving item %s at location %s for cart %s", item.Sku, string(locationId), cartId.String())
end := time.Now().Add(timeout) end := time.Now().Add(timeout)
err := s.reservationService.ReserveForCart(ctx, inventory.CartReserveRequest{ err := s.reservationService.ReserveForCart(ctx, inventory.CartReserveRequest{
CartID: inventory.CartID(cartId.String()), CartID: inventory.CartID(cartId.String()),
@@ -320,8 +337,9 @@ func (s *PoolServer) AddSkuRequestHandler(w http.ResponseWriter, r *http.Request
if err != nil { if err != nil {
return err return err
} }
if s.reservationService != nil { err = s.HandleReservations(r.Context(), id, msg)
if err != nil {
return err
} }
reply, err := s.ApplyLocal(r.Context(), id, msg) reply, err := s.ApplyLocal(r.Context(), id, msg)
if err != nil { if err != nil {

View File

@@ -462,3 +462,14 @@ spec:
value: "redis.home:6379" value: "redis.home:6379"
- name: REDIS_PASSWORD - name: REDIS_PASSWORD
value: "slaskredis" value: "slaskredis"
---
kind: Service
apiVersion: v1
metadata:
name: inventory
spec:
selector:
app: cart-inventory
ports:
- name: web
port: 8080

View File

@@ -38,16 +38,24 @@ func (c *CartMutationContext) AddItem(g *CartGrain, m *messages.AddItem) error {
if !sameStore { if !sameStore {
continue continue
} }
existing.Quantity += int(m.Quantity) if m.ReservationEndTime != nil {
t := m.ReservationEndTime.AsTime()
if existing.ReservationEndTime == nil || existing.ReservationEndTime.Before(m.ReservationEndTime.AsTime()) {
existing.ReservationEndTime = &t
existing.Quantity += int(m.Quantity)
} else {
existing.ReservationEndTime = &t
}
} else {
existing.Quantity += int(m.Quantity)
}
existing.Stock = uint16(m.Stock) existing.Stock = uint16(m.Stock)
// If existing had nil store but new has one, adopt it. // If existing had nil store but new has one, adopt it.
if existing.StoreId == nil && m.StoreId != nil { if existing.StoreId == nil && m.StoreId != nil {
existing.StoreId = m.StoreId existing.StoreId = m.StoreId
} }
if m.ReservationEndTime != nil {
t := m.ReservationEndTime.AsTime()
existing.ReservationEndTime = &t
}
return nil return nil
} }

View File

@@ -48,8 +48,15 @@ func (c *CartMutationContext) ChangeQuantity(g *CartGrain, m *messages.ChangeQua
g.UpdateTotals() g.UpdateTotals()
return nil return nil
} }
item := g.Items[foundIndex]
g.Items[foundIndex].Quantity = int(m.Quantity) if item == nil {
g.UpdateTotals() return fmt.Errorf("ChangeQuantity: item id %d not found", m.Id)
}
if item.ReservationEndTime != nil {
return fmt.Errorf("ChangeQuantity: cannot change quantity of reserved item id %d", m.Id)
} else {
item.Quantity = int(m.Quantity)
g.UpdateTotals()
}
return nil return nil
} }

View File

@@ -2,6 +2,7 @@ package cart
import ( import (
"fmt" "fmt"
"time"
messages "git.k6n.net/go-cart-actor/pkg/messages" messages "git.k6n.net/go-cart-actor/pkg/messages"
) )
@@ -36,6 +37,14 @@ func InitializeCheckout(g *CartGrain, m *messages.InitializeCheckout) error {
if m.OrderId == "" { if m.OrderId == "" {
return fmt.Errorf("InitializeCheckout: missing orderId") return fmt.Errorf("InitializeCheckout: missing orderId")
} }
now := time.Now()
for _, item := range g.Items {
if item.ReservationEndTime != nil {
if now.After(*item.ReservationEndTime) {
return fmt.Errorf("InitializeCheckout: item id %d reservation has expired", item.Id)
}
}
}
g.OrderReference = m.OrderId g.OrderReference = m.OrderId
g.PaymentStatus = m.Status g.PaymentStatus = m.Status