diff --git a/cmd/cart/main.go b/cmd/cart/main.go index d81d5a1..e1e8601 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "fmt" "log" @@ -206,9 +207,15 @@ func main() { done := make(chan bool, 1) signal.Notify(sigs, syscall.SIGTERM) + otelShutdown, err := setupOTelSDK(context.Background()) + if err != nil { + log.Fatalf("Unable to start otel %v", err) + } + go func() { sig := <-sigs fmt.Println("Shutting down due to signal:", sig) + otelShutdown(context.Background()) diskStorage.Close() pool.Close() diff --git a/cmd/cart/otel.go b/cmd/cart/otel.go new file mode 100644 index 0000000..d36c4f9 --- /dev/null +++ b/cmd/cart/otel.go @@ -0,0 +1,122 @@ +package main + +import ( + "context" + "errors" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/stdout/stdoutlog" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/log/global" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/log" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/trace" +) + +// setupOTelSDK bootstraps the OpenTelemetry pipeline. +// If it does not return an error, make sure to call shutdown for proper cleanup. +func setupOTelSDK(ctx context.Context) (func(context.Context) error, error) { + var shutdownFuncs []func(context.Context) error + var err error + + // shutdown calls cleanup functions registered via shutdownFuncs. + // The errors from the calls are joined. + // Each registered cleanup will be invoked once. + shutdown := func(ctx context.Context) error { + var err error + for _, fn := range shutdownFuncs { + err = errors.Join(err, fn(ctx)) + } + shutdownFuncs = nil + return err + } + + // handleErr calls shutdown for cleanup and makes sure that all errors are returned. + handleErr := func(inErr error) { + err = errors.Join(inErr, shutdown(ctx)) + } + + // Set up propagator. + prop := newPropagator() + otel.SetTextMapPropagator(prop) + + // Set up trace provider. + tracerProvider, err := newTracerProvider() + if err != nil { + handleErr(err) + return shutdown, err + } + shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown) + otel.SetTracerProvider(tracerProvider) + + // Set up meter provider. + meterProvider, err := newMeterProvider() + if err != nil { + handleErr(err) + return shutdown, err + } + shutdownFuncs = append(shutdownFuncs, meterProvider.Shutdown) + otel.SetMeterProvider(meterProvider) + + // Set up logger provider. + loggerProvider, err := newLoggerProvider() + if err != nil { + handleErr(err) + return shutdown, err + } + shutdownFuncs = append(shutdownFuncs, loggerProvider.Shutdown) + global.SetLoggerProvider(loggerProvider) + + return shutdown, err +} + +func newPropagator() propagation.TextMapPropagator { + return propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ) +} + +func newTracerProvider() (*trace.TracerProvider, error) { + traceExporter, err := stdouttrace.New( + stdouttrace.WithPrettyPrint()) + if err != nil { + return nil, err + } + + tracerProvider := trace.NewTracerProvider( + trace.WithBatcher(traceExporter, + // Default is 5s. Set to 1s for demonstrative purposes. + trace.WithBatchTimeout(time.Second)), + ) + return tracerProvider, nil +} + +func newMeterProvider() (*metric.MeterProvider, error) { + metricExporter, err := stdoutmetric.New() + if err != nil { + return nil, err + } + + meterProvider := metric.NewMeterProvider( + metric.WithReader(metric.NewPeriodicReader(metricExporter, + // Default is 1m. Set to 3s for demonstrative purposes. + metric.WithInterval(3*time.Second))), + ) + return meterProvider, nil +} + +func newLoggerProvider() (*log.LoggerProvider, error) { + logExporter, err := stdoutlog.New() + if err != nil { + return nil, err + } + + loggerProvider := log.NewLoggerProvider( + log.WithProcessor(log.NewBatchProcessor(logExporter)), + ) + return loggerProvider, nil +} diff --git a/cmd/cart/pool-server.go b/cmd/cart/pool-server.go index 011d3a0..2127935 100644 --- a/cmd/cart/pool-server.go +++ b/cmd/cart/pool-server.go @@ -17,6 +17,9 @@ import ( "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/contrib/bridges/otelslog" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" ) var ( @@ -449,7 +452,10 @@ func CartIdHandler(fn func(cartId cart.CartId, w http.ResponseWriter, r *http.Re func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error) func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error { return func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error { if ownerHost, ok := s.OwnerHost(uint64(cartId)); ok { + ctx, span := tracer.Start(r.Context(), "proxy") + defer span.End() handled, err := ownerHost.Proxy(uint64(cartId), w, r) + logger.InfoContext(ctx, fmt.Sprintf("owner host %s", ownerHost.Name())) grainLookups.Inc() if err == nil && handled { return nil @@ -461,6 +467,15 @@ func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request } } +var ( + tracer = otel.Tracer(name) + + // meter = otel.Meter(name) + logger = otelslog.NewLogger(name) + +// rollCnt metric.Int64Counter +) + type AddVoucherRequest struct { VoucherCode string `json:"code"` } @@ -555,20 +570,26 @@ func (s *PoolServer) Serve() *http.ServeMux { w.WriteHeader(http.StatusOK) }) - mux.HandleFunc("GET /", CookieCartIdHandler(s.ProxyHandler(s.GetCartHandler))) - mux.HandleFunc("GET /add/{sku}", CookieCartIdHandler(s.ProxyHandler(s.AddSkuToCartHandler))) - mux.HandleFunc("POST /add", CookieCartIdHandler(s.ProxyHandler(s.AddMultipleItemHandler))) - mux.HandleFunc("POST /", CookieCartIdHandler(s.ProxyHandler(s.AddSkuRequestHandler))) - mux.HandleFunc("POST /set", CookieCartIdHandler(s.ProxyHandler(s.SetCartItemsHandler))) - mux.HandleFunc("DELETE /{itemId}", CookieCartIdHandler(s.ProxyHandler(s.DeleteItemHandler))) - mux.HandleFunc("PUT /", CookieCartIdHandler(s.ProxyHandler(s.QuantityChangeHandler))) - mux.HandleFunc("DELETE /", CookieCartIdHandler(s.ProxyHandler(s.RemoveCartCookie))) - mux.HandleFunc("POST /delivery", CookieCartIdHandler(s.ProxyHandler(s.SetDeliveryHandler))) - mux.HandleFunc("DELETE /delivery/{deliveryId}", CookieCartIdHandler(s.ProxyHandler(s.RemoveDeliveryHandler))) - mux.HandleFunc("PUT /delivery/{deliveryId}/pickupPoint", CookieCartIdHandler(s.ProxyHandler(s.SetPickupPointHandler))) - mux.HandleFunc("PUT /voucher", CookieCartIdHandler(s.ProxyHandler(s.AddVoucherHandler))) - mux.HandleFunc("PUT /subscription-details", CookieCartIdHandler(s.ProxyHandler(s.SubscriptionDetailsHandler))) - mux.HandleFunc("DELETE /voucher/{voucherId}", CookieCartIdHandler(s.ProxyHandler(s.RemoveVoucherHandler))) + handleFunc := func(pattern string, handlerFunc func(http.ResponseWriter, *http.Request)) { + // Configure the "http.route" for the HTTP instrumentation. + handler := otelhttp.WithRouteTag(pattern, http.HandlerFunc(handlerFunc)) + mux.Handle(pattern, handler) + } + + handleFunc("GET /", CookieCartIdHandler(s.ProxyHandler(s.GetCartHandler))) + handleFunc("GET /add/{sku}", CookieCartIdHandler(s.ProxyHandler(s.AddSkuToCartHandler))) + handleFunc("POST /add", CookieCartIdHandler(s.ProxyHandler(s.AddMultipleItemHandler))) + handleFunc("POST /", CookieCartIdHandler(s.ProxyHandler(s.AddSkuRequestHandler))) + handleFunc("POST /set", CookieCartIdHandler(s.ProxyHandler(s.SetCartItemsHandler))) + handleFunc("DELETE /{itemId}", CookieCartIdHandler(s.ProxyHandler(s.DeleteItemHandler))) + handleFunc("PUT /", CookieCartIdHandler(s.ProxyHandler(s.QuantityChangeHandler))) + handleFunc("DELETE /", CookieCartIdHandler(s.ProxyHandler(s.RemoveCartCookie))) + handleFunc("POST /delivery", CookieCartIdHandler(s.ProxyHandler(s.SetDeliveryHandler))) + handleFunc("DELETE /delivery/{deliveryId}", CookieCartIdHandler(s.ProxyHandler(s.RemoveDeliveryHandler))) + handleFunc("PUT /delivery/{deliveryId}/pickupPoint", CookieCartIdHandler(s.ProxyHandler(s.SetPickupPointHandler))) + handleFunc("PUT /voucher", CookieCartIdHandler(s.ProxyHandler(s.AddVoucherHandler))) + handleFunc("PUT /subscription-details", CookieCartIdHandler(s.ProxyHandler(s.SubscriptionDetailsHandler))) + handleFunc("DELETE /voucher/{voucherId}", CookieCartIdHandler(s.ProxyHandler(s.RemoveVoucherHandler))) //mux.HandleFunc("GET /checkout", CookieCartIdHandler(s.ProxyHandler(s.HandleCheckout))) //mux.HandleFunc("GET /confirmation/{orderId}", CookieCartIdHandler(s.ProxyHandler(s.HandleConfirmation))) diff --git a/go.mod b/go.mod index 1d419b1..81ae66f 100644 --- a/go.mod +++ b/go.mod @@ -25,9 +25,11 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dprotaso/go-yit v0.0.0-20220510233725-9ba8df137936 // indirect github.com/emicklei/go-restful/v3 v3.13.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/getkin/kin-openapi v0.132.0 // indirect github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.22.1 // indirect github.com/go-openapi/jsonreference v0.21.2 // indirect github.com/go-openapi/swag v0.25.1 // indirect @@ -66,6 +68,19 @@ require ( github.com/spf13/pflag v1.0.10 // indirect github.com/vmware-labs/yaml-jsonpath v0.3.2 // indirect github.com/x448/float16 v0.8.4 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/contrib/bridges/otelslog v0.13.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect + go.opentelemetry.io/otel v1.38.0 // indirect + go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.14.0 // indirect + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0 // indirect + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.38.0 // indirect + go.opentelemetry.io/otel/log v0.14.0 // indirect + go.opentelemetry.io/otel/metric v1.38.0 // indirect + go.opentelemetry.io/otel/sdk v1.38.0 // indirect + go.opentelemetry.io/otel/sdk/log v0.14.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.38.0 // indirect + go.opentelemetry.io/otel/trace v1.38.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/mod v0.29.0 // indirect diff --git a/go.sum b/go.sum index abe6863..2176606 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,8 @@ github.com/dprotaso/go-yit v0.0.0-20220510233725-9ba8df137936 h1:PRxIJD8XjimM5aT github.com/dprotaso/go-yit v0.0.0-20220510233725-9ba8df137936/go.mod h1:ttYvX5qlB+mlV1okblJqcSMtR4c52UKxDiX9GRBS8+Q= github.com/emicklei/go-restful/v3 v3.13.0 h1:C4Bl2xDndpU6nJ4bc1jXd+uTmYPVUwkD6bFY/oTyCes= github.com/emicklei/go-restful/v3 v3.13.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -31,6 +33,7 @@ github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sa github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= github.com/getkin/kin-openapi v0.132.0 h1:3ISeLMsQzcb5v26yeJrBcdTCEQTag36ZjaGk7MIRUwk= github.com/getkin/kin-openapi v0.132.0/go.mod h1:3OlG51PCYNsPByuiMB0t4fjnNlIDnaEDsjiKUV8nL58= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -204,12 +207,26 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/bridges/otelslog v0.13.0 h1:bwnLpizECbPr1RrQ27waeY2SPIPeccCx/xLuoYADZ9s= +go.opentelemetry.io/contrib/bridges/otelslog v0.13.0/go.mod h1:3nWlOiiqA9UtUnrcNk82mYasNxD8ehOspL0gOfEo6Y4= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 h1:RbKq8BG0FI8OiXhBfcRtqqHcZcka+gU3cskNuf05R18= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0/go.mod h1:h06DGIukJOevXaj/xrNjhi/2098RZzcLTbc0jDAUbsg= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.14.0 h1:B/g+qde6Mkzxbry5ZZag0l7QrQBCtVm7lVjaLgmpje8= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.14.0/go.mod h1:mOJK8eMmgW6ocDJn6Bn11CcZ05gi3P8GylBXEkZtbgA= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0 h1:wm/Q0GAAykXv83wzcKzGGqAnnfLFyFe7RslekZuv+VI= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0/go.mod h1:ra3Pa40+oKjvYh+ZD3EdxFZZB0xdMfuileHAm4nNN7w= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.38.0 h1:kJxSDN4SgWWTjG/hPp3O7LCGLcHXFlvS2/FFOrwL+SE= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.38.0/go.mod h1:mgIOzS7iZeKJdeB8/NYHrJ48fdGc71Llo5bJ1J4DWUE= +go.opentelemetry.io/otel/log v0.14.0 h1:2rzJ+pOAZ8qmZ3DDHg73NEKzSZkhkGIua9gXtxNGgrM= +go.opentelemetry.io/otel/log v0.14.0/go.mod h1:5jRG92fEAgx0SU/vFPxmJvhIuDU9E1SUnEQrMlJpOno= go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/log v0.14.0 h1:JU/U3O7N6fsAXj0+CXz21Czg532dW2V4gG1HE/e8Zrg= +go.opentelemetry.io/otel/sdk/log v0.14.0/go.mod h1:imQvII+0ZylXfKU7/wtOND8Hn4OpT3YUoIgqJVksUkM= go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= diff --git a/pkg/inventory/redis_service.go b/pkg/inventory/redis_service.go index 24874e9..5ed8635 100644 --- a/pkg/inventory/redis_service.go +++ b/pkg/inventory/redis_service.go @@ -88,19 +88,49 @@ var ( ErrMissingReservation = errors.New("missing reservation") ) +func makeKeysAndArgs(req ...ReserveRequest) ([]string, []string) { + keys := make([]string, len(req)) + args := make([]string, len(req)) + for i, r := range req { + if r.Quantity <= 0 { + return nil, nil + } + keys[i] = getInventoryKey(r.SKU, r.LocationID) + args[i] = strconv.Itoa(int(r.Quantity)) + } + return keys, args +} + +func (s *RedisInventoryService) ReservationCheck(req ...ReserveRequest) error { + if len(req) == 0 { + return ErrMissingReservation + } + + keys, args := makeKeysAndArgs(req...) + if keys == nil || args == nil { + return ErrInvalidQuantity + } + + cmd := reserveScript.Run(s.ctx, s.client, keys, args) + if err := cmd.Err(); err != nil { + return err + } + if val, err := cmd.Int(); err != nil { + return err + } else if val != 1 { + return ErrInsufficientInventory + } + return nil +} + func (s *RedisInventoryService) ReserveInventory(req ...ReserveRequest) error { if len(req) == 0 { return ErrMissingReservation } - keys := make([]string, len(req)) - args := make([]string, len(req)) - for i, r := range req { - if r.Quantity <= 0 { - return ErrInvalidQuantity - } - keys[i] = getInventoryKey(r.SKU, r.LocationID) - args[i] = strconv.Itoa(int(r.Quantity)) + keys, args := makeKeysAndArgs(req...) + if keys == nil || args == nil { + return ErrInvalidQuantity } cmd := reserveScript.Run(s.ctx, s.client, keys, args) if err := cmd.Err(); err != nil { @@ -114,6 +144,47 @@ func (s *RedisInventoryService) ReserveInventory(req ...ReserveRequest) error { return nil } +var reservationCheck = redis.NewScript(` +-- Get the number of keys passed +local num_keys = #KEYS + +-- Ensure the number of keys matches the number of quantities +if num_keys ~= #ARGV then + return {err = "Script requires the same number of keys and quantities."} +end + +local new_values = {} +local payload = {} + +-- --- +-- 1. CHECK PHASE +-- --- +-- Loop through all keys to check their values first +for i = 1, num_keys do + local key = KEYS[i] + local quantity_to_check = tonumber(ARGV[i]) + + -- Fail if the quantity is not a valid number + if not quantity_to_check then + return {err = "Invalid quantity provided for key: " .. key} + end + + -- Get the current value stored at the key + local current_val = tonumber(redis.call('GET', key)) + + -- Check the condition + -- Fail if: + -- 1. The key doesn't exist (current_val is nil) + -- 2. The value is not > the required quantity + if not current_val or current_val <= quantity_to_check then + -- Return 0 to indicate the operation failed and no changes were made + return 0 + end +end + +return 1 +`) + var reserveScript = redis.NewScript(` -- Get the number of keys passed local num_keys = #KEYS