capture and split notification if it has multiple hosts
Some checks failed
Build and Publish / BuildAndDeployAmd64 (push) Successful in 46s
Build and Publish / BuildAndDeployArm64 (push) Has been cancelled

This commit is contained in:
matst80
2025-11-28 16:44:03 +01:00
parent 330093bdec
commit 90a525bd98
7 changed files with 160 additions and 1579 deletions

View File

@@ -3,10 +3,11 @@ package main
import (
"encoding/json"
"fmt"
"net/http"
"git.k6n.net/go-cart-actor/pkg/cart"
"github.com/adyen/adyen-go-api-library/v14/src/checkout"
"github.com/adyen/adyen-go-api-library/v14/src/common"
"github.com/adyen/adyen-go-api-library/v21/src/checkout"
"github.com/adyen/adyen-go-api-library/v21/src/common"
)
// CheckoutMeta carries the external / URL metadata required to build a
@@ -123,6 +124,18 @@ func BuildCheckoutOrderPayload(grain *cart.CartGrain, meta *CheckoutMeta) ([]byt
return payload, order, nil
}
func GetCheckoutMetaFromRequest(r *http.Request) *CheckoutMeta {
host := getOriginalHost(r)
country := getCountryFromHost(host)
return &CheckoutMeta{
ClientIp: getClientIp(r),
SiteUrl: fmt.Sprintf("https://%s", host),
Country: country,
Currency: getCurrency(country),
Locale: getLocale(country),
}
}
func BuildAdyenCheckoutSession(grain *cart.CartGrain, meta *CheckoutMeta) (*checkout.CreateCheckoutSessionRequest, error) {
if grain == nil {
return nil, fmt.Errorf("nil grain")

View File

@@ -19,8 +19,8 @@ import (
"git.k6n.net/go-cart-actor/pkg/promotions"
"git.k6n.net/go-cart-actor/pkg/proxy"
"git.k6n.net/go-cart-actor/pkg/voucher"
"github.com/adyen/adyen-go-api-library/v14/src/adyen"
"github.com/adyen/adyen-go-api-library/v14/src/common"
"github.com/adyen/adyen-go-api-library/v21/src/adyen"
"github.com/adyen/adyen-go-api-library/v21/src/common"
"github.com/matst80/go-redis-inventory/pkg/inventory"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
@@ -15,10 +14,13 @@ import (
"git.k6n.net/go-cart-actor/pkg/actor"
"git.k6n.net/go-cart-actor/pkg/cart"
messages "git.k6n.net/go-cart-actor/pkg/messages"
"git.k6n.net/go-cart-actor/pkg/proxy"
"git.k6n.net/go-cart-actor/pkg/voucher"
"github.com/adyen/adyen-go-api-library/v14/src/adyen"
"github.com/adyen/adyen-go-api-library/v14/src/hmacvalidator"
"github.com/adyen/adyen-go-api-library/v14/src/webhook"
"github.com/adyen/adyen-go-api-library/v21/src/adyen"
"github.com/adyen/adyen-go-api-library/v21/src/checkout"
"github.com/adyen/adyen-go-api-library/v21/src/hmacvalidator"
"github.com/adyen/adyen-go-api-library/v21/src/webhook"
"github.com/google/uuid"
"github.com/matst80/go-redis-inventory/pkg/inventory"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -375,15 +377,7 @@ func getClientIp(r *http.Request) string {
}
func (s *PoolServer) CreateOrUpdateCheckout(r *http.Request, id cart.CartId) (*CheckoutOrder, error) {
host := getOriginalHost(r)
country := getCountryFromHost(host)
meta := &CheckoutMeta{
ClientIp: getClientIp(r),
SiteUrl: fmt.Sprintf("https://%s", host),
Country: country,
Currency: getCurrency(country),
Locale: getLocale(country),
}
meta := GetCheckoutMetaFromRequest(r)
// Get current grain state (may be local or remote)
grain, err := s.Get(r.Context(), uint64(id))
@@ -534,7 +528,7 @@ func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request
span.SetAttributes(hostAttr)
logger.InfoContext(ctx, "cart proxyed", "result", ownerHost.Name())
proxyCalls.Add(ctx, 1, metric.WithAttributes(hostAttr))
handled, err := ownerHost.Proxy(uint64(cartId), w, r)
handled, err := ownerHost.Proxy(uint64(cartId), w, r, nil)
grainLookups.Inc()
if err == nil && handled {
@@ -735,15 +729,8 @@ func (s *PoolServer) AdyenSessionHandler(w http.ResponseWriter, r *http.Request,
if err != nil {
return err
}
host := getOriginalHost(r)
country := getCountryFromHost(host)
meta := &CheckoutMeta{
ClientIp: getClientIp(r),
SiteUrl: fmt.Sprintf("https://%s", host),
Country: country,
Currency: getCurrency(country),
Locale: getLocale(country),
}
meta := GetCheckoutMetaFromRequest(r)
sessionData, err := BuildAdyenCheckoutSession(grain, meta)
if err != nil {
return err
@@ -751,12 +738,103 @@ func (s *PoolServer) AdyenSessionHandler(w http.ResponseWriter, r *http.Request,
service := s.adyenClient.Checkout()
req := service.PaymentsApi.SessionsInput().CreateCheckoutSessionRequest(*sessionData)
res, _, err := service.PaymentsApi.Sessions(r.Context(), req)
// apply checkout started
if err != nil {
return err
}
return s.WriteResult(w, res)
}
func (s *PoolServer) AdyenHookHandler(w http.ResponseWriter, r *http.Request) {
var notificationRequest webhook.Webhook
service := s.adyenClient.Checkout()
if err := json.NewDecoder(r.Body).Decode(&notificationRequest); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
cartHostMap := make(map[actor.Host][]webhook.NotificationItem)
for _, notificationItem := range *notificationRequest.NotificationItems {
item := notificationItem.NotificationRequestItem
log.Printf("Recieved notification event code: %s, %v", item.EventCode, item)
cartId, ok := cart.ParseCartId(item.OriginalReference)
if !ok {
log.Printf("invalid cart id %s", item.OriginalReference)
http.Error(w, "Invalid cart id", http.StatusBadRequest)
return
}
if host, ok := s.OwnerHost(uint64(cartId)); ok {
cartHostMap[host] = append(cartHostMap[host], notificationItem)
continue
}
isValid := hmacvalidator.ValidateHmac(item, hmacKey)
if !isValid {
log.Printf("notification hmac not valid %s, %v", item.EventCode, item)
http.Error(w, "Invalid HMAC", http.StatusUnauthorized)
return
} else {
switch item.EventCode {
case "AUTHORISATION":
grain, err := s.Get(r.Context(), uint64(cartId))
if err != nil {
log.Printf("Error getting cart: %v", err)
http.Error(w, "Cart not found", http.StatusBadRequest)
return
}
meta := GetCheckoutMetaFromRequest(r)
pspReference := item.PspReference
uid := uuid.New().String()
ref := uuid.New().String()
req := service.ModificationsApi.CaptureAuthorisedPaymentInput(pspReference).IdempotencyKey(uid).PaymentCaptureRequest(checkout.PaymentCaptureRequest{
Amount: checkout.Amount{
Currency: meta.Currency,
Value: grain.TotalPrice.IncVat,
},
MerchantAccount: "ElgigantenECOM",
Reference: &ref,
})
res, _, err := service.ModificationsApi.CaptureAuthorisedPayment(r.Context(), req)
if err != nil {
log.Printf("Error capturing payment: %v", err)
} else {
log.Printf("Payment captured successfully: %v", res)
s.Apply(r.Context(), uint64(cartId), &messages.OrderCreated{
OrderId: res.PaymentPspReference,
Status: item.EventCode,
})
}
}
}
}
var failed bool = false
var lastMock *proxy.MockResponseWriter
for host, items := range cartHostMap {
notificationRequest.NotificationItems = &items
bodyBytes, err := json.Marshal(notificationRequest)
if err != nil {
log.Printf("error marshaling notification: %v", err)
continue
}
customBody := bytes.NewReader(bodyBytes)
mockW := proxy.NewMockResponseWriter()
handled, err := host.Proxy(0, mockW, r, customBody)
if err != nil {
log.Printf("proxy failed for %s: %v", host.Name(), err)
failed = true
lastMock = mockW
} else if handled {
log.Printf("notification proxied to %s", host.Name())
}
}
if failed {
w.WriteHeader(lastMock.StatusCode)
w.Write(lastMock.Body.Bytes())
} else {
w.WriteHeader(http.StatusAccepted)
}
}
func (s *PoolServer) Serve(mux *http.ServeMux) {
// mux.HandleFunc("OPTIONS /cart", func(w http.ResponseWriter, r *http.Request) {
@@ -780,34 +858,7 @@ func (s *PoolServer) Serve(mux *http.ServeMux) {
}))
}
handleFunc("/adyen_hook", func(w http.ResponseWriter, r *http.Request) {
var notificationRequest webhook.Webhook
if err := json.NewDecoder(r.Body).Decode(&notificationRequest); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
for _, notification := range notificationRequest.GetNotificationItems() {
isValid := hmacvalidator.ValidateHmac(*notification, hmacKey)
if !isValid {
http.Error(w, "Invalid HMAC", http.StatusUnauthorized)
return
} else {
cartId, ok := cart.ParseCartId(notification.OriginalReference)
log.Printf("Recieved notification event code: %s, %v", notification.EventCode, notification)
if ok {
host, ok := s.OwnerHost(uint64(cartId))
if ok {
log.Printf("Not owner of %d, owner: %s", cartId, host.Name())
//host.Apply(r.Context(), cartId)
} else {
log.Printf("I'm the owner of %d", cartId)
}
}
}
}
w.WriteHeader(http.StatusAccepted)
})
handleFunc("/adyen_hook", s.AdyenHookHandler)
handleFunc("GET /cart", CookieCartIdHandler(s.ProxyHandler(s.GetCartHandler)))
handleFunc("GET /cart/add/{sku}", CookieCartIdHandler(s.ProxyHandler(s.AddSkuToCartHandler)))

4
go.mod
View File

@@ -3,7 +3,7 @@ module git.k6n.net/go-cart-actor
go 1.25.4
require (
github.com/gogo/protobuf v1.3.2
github.com/adyen/adyen-go-api-library/v21 v21.1.0
github.com/google/uuid v1.6.0
github.com/matst80/go-redis-inventory v0.0.0-20251126173508-51b30de2d86e
github.com/matst80/slask-finder v0.0.0-20251125182907-9e57f193127a
@@ -31,7 +31,6 @@ require (
require (
github.com/RoaringBitmap/roaring/v2 v2.14.4 // indirect
github.com/adyen/adyen-go-api-library/v14 v14.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.24.4 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
@@ -59,6 +58,7 @@ require (
github.com/go-openapi/swag/stringutils v0.25.4 // indirect
github.com/go-openapi/swag/typeutils v0.25.4 // indirect
github.com/go-openapi/swag/yamlutils v0.25.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/gnostic-models v0.7.1 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/gorilla/schema v1.4.1 // indirect

1522
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,7 @@ package actor
import (
"context"
"io"
"net/http"
"google.golang.org/protobuf/proto"
@@ -34,7 +35,7 @@ type Host interface {
AnnounceExpiry(ids []uint64)
Negotiate(otherHosts []string) ([]string, error)
Name() string
Proxy(id uint64, w http.ResponseWriter, r *http.Request) (bool, error)
Proxy(id uint64, w http.ResponseWriter, r *http.Request, customBody io.Reader) (bool, error)
Apply(ctx context.Context, id uint64, mutation ...proto.Message) (bool, error)
GetActorIds() []uint64
Close() error

View File

@@ -1,6 +1,7 @@
package proxy
import (
"bytes"
"context"
"errors"
"fmt"
@@ -39,6 +40,33 @@ var (
logger = otelslog.NewLogger(name)
)
// MockResponseWriter implements http.ResponseWriter to capture responses for proxy calls.
type MockResponseWriter struct {
StatusCode int
HeaderMap http.Header
Body *bytes.Buffer
}
func NewMockResponseWriter() *MockResponseWriter {
return &MockResponseWriter{
StatusCode: 200,
HeaderMap: make(http.Header),
Body: &bytes.Buffer{},
}
}
func (m *MockResponseWriter) Header() http.Header {
return m.HeaderMap
}
func (m *MockResponseWriter) Write(data []byte) (int, error) {
return m.Body.Write(data)
}
func (m *MockResponseWriter) WriteHeader(statusCode int) {
m.StatusCode = statusCode
}
func NewRemoteHost(host string) (*RemoteHost, error) {
target := fmt.Sprintf("%s:1337", host)
@@ -182,7 +210,7 @@ func (h *RemoteHost) AnnounceExpiry(uids []uint64) {
h.missedPings = 0
}
func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (bool, error) {
func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request, customBody io.Reader) (bool, error) {
target := fmt.Sprintf("%s%s", h.httpBase, r.URL.RequestURI())
ctx, span := tracer.Start(r.Context(), "remote_proxy")
@@ -196,7 +224,11 @@ func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (b
)
logger.InfoContext(ctx, "proxying request", "cartid", id, "host", h.host, "method", r.Method)
req, err := http.NewRequestWithContext(ctx, r.Method, target, r.Body)
var bdy io.Reader = r.Body
if customBody != nil {
bdy = customBody
}
req, err := http.NewRequestWithContext(ctx, r.Method, target, bdy)
if err != nil {
span.RecordError(err)
http.Error(w, "proxy build error", http.StatusBadGateway)