Merge branch 'main' of https://git.tornberg.me/mats/go-cart-actor
Some checks failed
Build and Publish / Metadata (push) Successful in 11s
Build and Publish / BuildAndDeployAmd64 (push) Successful in 1m1s
Build and Publish / BuildAndDeployArm64 (push) Has been cancelled

This commit is contained in:
2025-11-13 18:23:09 +01:00
11 changed files with 163 additions and 57 deletions

View File

@@ -10,6 +10,7 @@ import (
type AmqpOrderHandler struct {
conn *amqp.Connection
queue *amqp.Queue
}
func NewAmqpOrderHandler(conn *amqp.Connection) *AmqpOrderHandler {
@@ -18,26 +19,25 @@ func NewAmqpOrderHandler(conn *amqp.Connection) *AmqpOrderHandler {
}
}
func (h *AmqpOrderHandler) DefineTopics() error {
func (h *AmqpOrderHandler) DefineQueue() error {
ch, err := h.conn.Channel()
if err != nil {
return fmt.Errorf("failed to open a channel: %w", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
"orders", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
queue, err := ch.QueueDeclare(
"order-queue", // name
false,
false,
false,
false,
nil,
)
if err != nil {
return fmt.Errorf("failed to declare an exchange: %w", err)
}
h.queue = &queue
return nil
}
@@ -51,11 +51,12 @@ func (h *AmqpOrderHandler) OrderCompleted(body []byte) error {
defer cancel()
return ch.PublishWithContext(ctx,
"orders", // exchange
"new", // routing key
"", // exchange
h.queue.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
//DeliveryMode: amqp.,
ContentType: "application/json",
Body: body,
})

View File

@@ -70,7 +70,7 @@ func BuildCheckoutOrderPayload(grain *cart.CartGrain, meta *CheckoutMeta) ([]byt
Name: it.Meta.Name,
Quantity: it.Quantity,
UnitPrice: int(it.Price.IncVat),
TaxRate: 2500, // TODO: derive if variable tax rates are introduced
TaxRate: it.Tax, // TODO: derive if variable tax rates are introduced
QuantityUnit: "st",
TotalAmount: int(it.TotalPrice.IncVat),
TotalTaxAmount: int(it.TotalPrice.TotalVat()),

View File

@@ -9,10 +9,37 @@ import (
"git.tornberg.me/go-cart-actor/pkg/actor"
"git.tornberg.me/go-cart-actor/pkg/cart"
"git.tornberg.me/mats/go-redis-inventory/pkg/inventory"
amqp "github.com/rabbitmq/amqp091-go"
)
func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux) {
var tpl = `<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>s10r testing - checkout</title>
</head>
<body>
%s
</body>
</html>
`
func (a *App) getGrainFromOrder(order *CheckoutOrder) (*cart.CartGrain, error) {
cartId, ok := cart.ParseCartId(order.MerchantReference1)
if !ok {
return nil, fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1)
}
grain, err := a.pool.Get(uint64(cartId))
if err != nil {
return nil, fmt.Errorf("failed to get cart grain: %w", err)
}
return grain, nil
}
func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux, inventoryService inventory.InventoryService) {
conn, err := amqp.Dial(amqpUrl)
if err != nil {
log.Fatalf("failed to connect to RabbitMQ: %v", err)
@@ -27,10 +54,10 @@ func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux) {
amqpListener.DefineTopics()
a.pool.AddListener(amqpListener)
orderHandler := NewAmqpOrderHandler(conn)
orderHandler.DefineTopics()
mux.HandleFunc("POST /push", func(w http.ResponseWriter, r *http.Request) {
orderHandler.DefineQueue()
mux.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) {
log.Printf("Klarna order confirmation push, method: %s", r.Method)
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
@@ -104,6 +131,7 @@ func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux) {
})
mux.HandleFunc("/notification", func(w http.ResponseWriter, r *http.Request) {
log.Printf("Klarna order notification, method: %s", r.Method)
logger.InfoContext(r.Context(), "Klarna order notification received", "method", r.Method)
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
@@ -114,7 +142,23 @@ func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux) {
w.WriteHeader(http.StatusBadRequest)
}
log.Printf("Klarna order notification: %s", order.ID)
logger.InfoContext(r.Context(), "Klarna order notification received", "order_id", order.ID)
grain, err := a.getGrainFromOrder(order)
if err != nil {
logger.ErrorContext(r.Context(), "Unable to get grain from klarna order", "error", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}
if inventoryService != nil {
inventoryRequests := getInventoryRequests(grain.Items)
err = inventoryService.ReserveInventory(inventoryRequests...)
if err != nil {
logger.WarnContext(r.Context(), "placeorder inventory reservation failed")
w.WriteHeader(http.StatusNotAcceptable)
return
}
}
w.WriteHeader(http.StatusOK)
})
mux.HandleFunc("POST /validate", func(w http.ResponseWriter, r *http.Request) {
@@ -128,7 +172,22 @@ func (a *App) HandleCheckoutRequests(amqpUrl string, mux *http.ServeMux) {
if err != nil {
w.WriteHeader(http.StatusBadRequest)
}
log.Printf("Klarna order validation: %s", order.ID)
logger.InfoContext(r.Context(), "Klarna order validation received", "order_id", order.ID, "cart_id", order.MerchantReference1)
grain, err := a.getGrainFromOrder(order)
if err != nil {
logger.ErrorContext(r.Context(), "Unable to get grain from klarna order", "error", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}
if inventoryService != nil {
inventoryRequests := getInventoryRequests(grain.Items)
_, err = inventoryService.ReservationCheck(inventoryRequests...)
if err != nil {
logger.WarnContext(r.Context(), "placeorder inventory check failed")
w.WriteHeader(http.StatusNotAcceptable)
return
}
}
w.WriteHeader(http.StatusOK)
})

View File

@@ -19,9 +19,11 @@ import (
"git.tornberg.me/go-cart-actor/pkg/promotions"
"git.tornberg.me/go-cart-actor/pkg/proxy"
"git.tornberg.me/go-cart-actor/pkg/voucher"
"git.tornberg.me/mats/go-redis-inventory/pkg/inventory"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)
@@ -45,20 +47,8 @@ type App struct {
var podIp = os.Getenv("POD_IP")
var name = os.Getenv("POD_NAME")
var amqpUrl = os.Getenv("AMQP_URL")
var tpl = `<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>s10r testing - checkout</title>
</head>
<body>
%s
</body>
</html>
`
var redisAddress = os.Getenv("REDIS_ADDRESS")
var redisPassword = os.Getenv("REDIS_PASSWORD")
func getCountryFromHost(host string) string {
if strings.Contains(strings.ToLower(host), "-no") {
@@ -133,8 +123,17 @@ func main() {
}
klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD"))
rdb := redis.NewClient(&redis.Options{
Addr: redisAddress,
Password: redisPassword,
DB: 0,
})
inventoryService, err := inventory.NewRedisInventoryService(rdb, context.Background())
if err != nil {
log.Fatalf("Error creating inventory service: %v\n", err)
}
syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient)
syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient, inventoryService)
app := &App{
pool: pool,
@@ -148,7 +147,7 @@ func main() {
if amqpUrl == "" {
log.Printf("no connection to amqp defined")
} else {
app.HandleCheckoutRequests(amqpUrl, mux)
app.HandleCheckoutRequests(amqpUrl, mux, inventoryService)
}
grpcSrv, err := actor.NewControlServer[*cart.CartGrain](controlPlaneConfig, pool)

View File

@@ -15,6 +15,7 @@ import (
"git.tornberg.me/go-cart-actor/pkg/cart"
messages "git.tornberg.me/go-cart-actor/pkg/messages"
"git.tornberg.me/go-cart-actor/pkg/voucher"
"git.tornberg.me/mats/go-redis-inventory/pkg/inventory"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -42,13 +43,15 @@ type PoolServer struct {
actor.GrainPool[*cart.CartGrain]
pod_name string
klarnaClient *KlarnaClient
inventoryService inventory.InventoryService
}
func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient) *PoolServer {
func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient, inventoryService inventory.InventoryService) *PoolServer {
return &PoolServer{
GrainPool: pool,
pod_name: pod_name,
klarnaClient: klarnaClient,
inventoryService: inventoryService,
}
}
@@ -310,15 +313,37 @@ func getLocale(country string) string {
return "sv-se"
}
func getLocationId(item *cart.CartItem) inventory.LocationID {
if item.StoreId == nil || *item.StoreId == "" {
return "se"
}
return inventory.LocationID(*item.StoreId)
}
func getInventoryRequests(items []*cart.CartItem) []inventory.ReserveRequest {
var requests []inventory.ReserveRequest
for _, item := range items {
if item == nil {
continue
}
requests = append(requests, inventory.ReserveRequest{
SKU: inventory.SKU(item.Sku),
LocationID: getLocationId(item),
Quantity: uint32(item.Quantity),
})
}
return requests
}
func (s *PoolServer) CreateOrUpdateCheckout(ctx context.Context, host string, id cart.CartId) (*CheckoutOrder, error) {
country := getCountryFromHost(host)
meta := &CheckoutMeta{
Terms: fmt.Sprintf("https://%s/terms", host),
Checkout: fmt.Sprintf("https://%s/checkout?order_id={checkout.order.id}", host),
Confirmation: fmt.Sprintf("https://%s/confirmation/{checkout.order.id}", host),
Validation: fmt.Sprintf("https://%s/validate", host),
Push: fmt.Sprintf("https://%s/push?order_id={checkout.order.id}", host),
Notification: "https://cart.tornberg.me/notification",
Validation: "https://cart.tornberg.me/validate",
Push: "https://cart.tornberg.me/push?order_id={checkout.order.id}",
Country: country,
Currency: getCurrency(country),
Locale: getLocale(country),
@@ -329,6 +354,14 @@ func (s *PoolServer) CreateOrUpdateCheckout(ctx context.Context, host string, id
if err != nil {
return nil, err
}
if s.inventoryService != nil {
inventoryRequests := getInventoryRequests(grain.Items)
failingRequest, err := s.inventoryService.ReservationCheck(inventoryRequests...)
if err != nil {
logger.WarnContext(ctx, "inventory check failed", string(failingRequest.SKU), string(failingRequest.LocationID))
return nil, err
}
}
// Build pure checkout payload
payload, _, err := BuildCheckoutOrderPayload(grain, meta)
@@ -553,6 +586,7 @@ func (s *PoolServer) CheckoutHandler(fn func(order *CheckoutOrder, w http.Respon
if orderId == "" {
order, err := s.CreateOrUpdateCheckout(r.Context(), r.Host, cartId)
if err != nil {
logger.Error("unable to create klarna session: %v", err)
return err
}
s.ApplyCheckoutStarted(order, cartId)

View File

@@ -103,6 +103,7 @@ func main() {
amqpUrl, ok := os.LookupEnv("RABBIT_HOST")
if ok {
log.Printf("Connecting to rabbitmq")
conn, err := amqp.DialConfig(amqpUrl, amqp.Config{
Properties: amqp.NewConnectionProperties(),
})

View File

@@ -177,6 +177,10 @@ spec:
secretKeyRef:
name: klarna-api-credentials
key: username
- name: REDIS_ADDRESS
value: "10.10.3.18:6379"
- name: REDIS_PASSWORD
value: "slaskredis"
- name: OTEL_RESOURCE_ATTRIBUTES
value: "service.name=cart,service.version=0.1.2"
- name: OTEL_EXPORTER_OTLP_ENDPOINT
@@ -280,6 +284,10 @@ spec:
env:
- name: TZ
value: "Europe/Stockholm"
- name: REDIS_ADDRESS
value: "redis.home:6379"
- name: REDIS_PASSWORD
value: "slaskredis"
- name: OTEL_RESOURCE_ATTRIBUTES
value: "service.name=cart,service.version=0.1.2"
- name: OTEL_EXPORTER_OTLP_ENDPOINT
@@ -452,7 +460,9 @@ spec:
env:
- name: TZ
value: "Europe/Stockholm"
- name: RABBIT_HOST
value: amqp://admin:12bananer@rabbitmq.s10n:5672/
- name: REDIS_ADDRESS
value: "10.10.3.18:6379"
value: "redis.home:6379"
- name: REDIS_PASSWORD
value: "slaskredis"

2
go.mod
View File

@@ -3,6 +3,7 @@ module git.tornberg.me/go-cart-actor
go 1.25.3
require (
git.tornberg.me/mats/go-redis-inventory v0.0.0-20251110193851-19d7ad0de6e5
github.com/gogo/protobuf v1.3.2
github.com/google/uuid v1.6.0
github.com/matst80/slask-finder v0.0.0-20251023104024-f788e5a51d68
@@ -29,7 +30,6 @@ require (
)
require (
git.tornberg.me/mats/go-redis-inventory v0.0.0-20251110193851-19d7ad0de6e5 // indirect
github.com/RoaringBitmap/roaring/v2 v2.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.24.1 // indirect

View File

@@ -37,6 +37,7 @@ type CartItem struct {
Price Price `json:"price"`
TotalPrice Price `json:"totalPrice"`
OrgPrice *Price `json:"orgPrice,omitempty"`
Tax int
Stock StockStatus `json:"stock"`
Quantity int `json:"qty"`
Discount *Price `json:"discount,omitempty"`
@@ -256,7 +257,7 @@ func (c *CartGrain) UpdateTotals() {
diff.Add(*item.OrgPrice)
diff.Subtract(item.Price)
diff.Multiply(int64(item.Quantity))
rowTotal.Subtract(*diff)
//rowTotal.Subtract(*diff)
item.Discount = diff
if diff.IncVat > 0 {
c.TotalDiscount.Add(*diff)

View File

@@ -34,8 +34,8 @@ func TestCartGrainUpdateTotalsBasic(t *testing.T) {
}
// Discount: current implementation computes (OrgPrice - Price) ignoring quantity -> 1500-1250=250
if c.TotalDiscount.IncVat != 250 {
t.Fatalf("TotalDiscount expected 250 got %d", c.TotalDiscount.IncVat)
if c.TotalDiscount.IncVat != 500 {
t.Fatalf("TotalDiscount expected 500 got %d", c.TotalDiscount.IncVat)
}
}

View File

@@ -63,6 +63,7 @@ func AddItem(g *CartGrain, m *messages.AddItem) error {
ItemId: uint32(m.ItemId),
Quantity: int(m.Quantity),
Sku: m.Sku,
Tax: int(taxRate * 100),
Meta: &ItemMeta{
Name: m.Name,
Image: m.Image,