diff --git a/.gitignore b/.gitignore index 3c6764d..be98a30 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ __debug* go-cart-actor data/*.prot -data/*.go* \ No newline at end of file +data/*.go* +data/se/* \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index dc7bb9d..2d591fa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -66,6 +66,13 @@ RUN --mount=type=cache,target=/go/build-cache \ -X main.BuildDate=${BUILD_DATE}" \ -o /out/go-cart-backoffice ./cmd/backoffice +RUN --mount=type=cache,target=/go/build-cache \ + go build -trimpath -ldflags="-s -w \ + -X main.Version=${VERSION} \ + -X main.GitCommit=${GIT_COMMIT} \ + -X main.BuildDate=${BUILD_DATE}" \ + -o /out/go-cart-inventory ./cmd/inventory + ############################ # Runtime Stage ############################ @@ -75,6 +82,7 @@ WORKDIR / COPY --from=build /out/go-cart-actor /go-cart-actor COPY --from=build /out/go-cart-backoffice /go-cart-backoffice +COPY --from=build /out/go-cart-inventory /go-cart-inventory # Document (not expose forcibly) typical ports: 8080 (HTTP), 1337 (gRPC) EXPOSE 8080 1337 diff --git a/cmd/inventory/main.go b/cmd/inventory/main.go index c1172b9..db5bf1a 100644 --- a/cmd/inventory/main.go +++ b/cmd/inventory/main.go @@ -2,20 +2,81 @@ package main import ( "context" + "encoding/json" "log" - "time" + "net/http" + "os" + "strings" + "sync" - "git.tornberg.me/go-cart-actor/pkg/inventory" + "git.tornberg.me/mats/go-redis-inventory/pkg/inventory" + "github.com/matst80/slask-finder/pkg/index" + "github.com/matst80/slask-finder/pkg/messaging" "github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9/maintnotifications" + + amqp "github.com/rabbitmq/amqp091-go" ) +type Server struct { + service *inventory.RedisInventoryService +} + +func (srv *Server) livezHandler(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) +} + +func (srv *Server) readyzHandler(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) +} + +func (srv *Server) getInventoryHandler(w http.ResponseWriter, r *http.Request) { + // Parse path: /inventory/{sku}/{location} + path := r.URL.Path + parts := strings.Split(strings.Trim(path, "/"), "/") + if len(parts) != 3 || parts[0] != "inventory" { + http.Error(w, "Invalid path", http.StatusBadRequest) + return + } + sku := inventory.SKU(parts[1]) + locationID := inventory.LocationID(parts[2]) + + quantity, err := srv.service.GetInventory(sku, locationID) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + response := map[string]int64{"quantity": quantity} + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +var country = "se" +var redisAddress = "10.10.3.18:6379" +var redisPassword = "slaskredis" + +func init() { + // Override redis config from environment variables if set + if addr, ok := os.LookupEnv("REDIS_ADDRESS"); ok { + redisAddress = addr + } + if password, ok := os.LookupEnv("REDIS_PASSWORD"); ok { + redisPassword = password + } + if ctry, ok := os.LookupEnv("COUNTRY"); ok { + country = ctry + } +} + func main() { var ctx = context.Background() rdb := redis.NewClient(&redis.Options{ - Addr: "10.10.3.18:6379", - Password: "slaskredis", // no password set - DB: 0, // use default DB + Addr: redisAddress, + Password: redisPassword, // no password set + DB: 0, // use default DB MaintNotificationsConfig: &maintnotifications.Config{ Mode: maintnotifications.ModeDisabled, }, @@ -26,51 +87,56 @@ func main() { return } - // Start inventory change listener - listener := inventory.NewInventoryChangeListener(rdb, ctx, func(changes []inventory.InventoryChange) { - for _, change := range changes { - log.Printf("Inventory change detected: SKU %s at location %s now has %d", change.SKU, change.StockLocationID, change.Value) + server := &Server{service: s} + + // Set up HTTP routes + http.HandleFunc("/livez", server.livezHandler) + http.HandleFunc("/readyz", server.readyzHandler) + http.HandleFunc("/inventory/", server.getInventoryHandler) + + stockhandler := &StockHandler{ + MainStockLocationID: inventory.LocationID(country), + rdb: rdb, + ctx: ctx, + svc: *s, + } + + amqpUrl, ok := os.LookupEnv("RABBIT_HOST") + if ok { + conn, err := amqp.DialConfig(amqpUrl, amqp.Config{ + Properties: amqp.NewConnectionProperties(), + }) + //a.conn = conn + if err != nil { + log.Fatalf("Failed to connect to RabbitMQ: %v", err) + } + ch, err := conn.Channel() + if err != nil { + log.Fatalf("Failed to open a channel: %v", err) + } + // items listener + err = messaging.ListenToTopic(ch, country, "item_added", func(d amqp.Delivery) error { + var items []*index.DataItem + err := json.Unmarshal(d.Body, &items) + if err == nil { + log.Printf("Got upserts %d, message count %d", len(items), d.MessageCount) + wg := &sync.WaitGroup{} + for _, item := range items { + stockhandler.HandleItem(item, wg) + } + wg.Wait() + log.Print("Batch done...") + } else { + log.Printf("Failed to unmarshal upsert message %v", err) + } + return err + }) + if err != nil { + log.Fatalf("Failed to listen to item_added topic: %v", err) } - }) - log.Println("Starting inventory change listener...") - go listener.Start() - listener.WaitReady() - log.Println("Listener is ready, proceeding with inventory operations...") - - rdb.Pipelined(ctx, func(p redis.Pipeliner) error { - s.UpdateInventory(p, "1", "1", 10) - s.UpdateInventory(p, "2", "2", 20) - s.UpdateInventory(p, "3", "3", 30) - s.UpdateInventory(p, "4", "4", 40) - return nil - }) - err = s.ReserveInventory( - inventory.ReserveRequest{ - SKU: "1", - LocationID: "1", - Quantity: 3, - }, - inventory.ReserveRequest{ - SKU: "2", - LocationID: "2", - Quantity: 15, - }, - inventory.ReserveRequest{ - SKU: "3", - LocationID: "3", - Quantity: 25, - }, - ) - if err != nil { - log.Printf("Unable to reserve inventory: %v", err) } - v, err := s.GetInventory("1", "1") - if err != nil { - log.Printf("Unable to get inventory: %v", err) - return - } - log.Printf("Inventory after reservation: %v", v) - // Wait a bit for listener to process messages - time.Sleep(2 * time.Second) + // Start HTTP server + log.Println("Starting HTTP server on :8080") + log.Fatal(http.ListenAndServe(":8080", nil)) } diff --git a/cmd/inventory/stockhandler.go b/cmd/inventory/stockhandler.go new file mode 100644 index 0000000..05ac067 --- /dev/null +++ b/cmd/inventory/stockhandler.go @@ -0,0 +1,48 @@ +package main + +import ( + "context" + "log" + "strconv" + "strings" + "sync" + + "git.tornberg.me/mats/go-redis-inventory/pkg/inventory" + "github.com/matst80/slask-finder/pkg/types" + "github.com/redis/go-redis/v9" +) + +type StockHandler struct { + rdb *redis.Client + ctx context.Context + svc inventory.RedisInventoryService + MainStockLocationID inventory.LocationID +} + +func (s *StockHandler) HandleItem(item types.Item, wg *sync.WaitGroup) { + wg.Go(func() { + pipe := s.rdb.Pipeline() + centralStockString, ok := item.GetStringFieldValue(3) + if !ok { + centralStockString = "0" + } + centralStockString = strings.Replace(centralStockString, "+", "", -1) + centralStockString = strings.Replace(centralStockString, "<", "", -1) + centralStockString = strings.Replace(centralStockString, ">", "", -1) + centralStock, err := strconv.ParseInt(centralStockString, 10, 64) + + if err != nil { + log.Printf("unable to parse central stock for item %s: %v", item.GetSku(), err) + centralStock = 0 + } else { + s.svc.UpdateInventory(pipe, inventory.SKU(item.GetSku()), s.MainStockLocationID, int64(centralStock)) + } + for id, value := range item.GetStock() { + s.svc.UpdateInventory(pipe, inventory.SKU(item.GetSku()), inventory.LocationID(id), int64(value)) + } + _, err = pipe.Exec(s.ctx) + if err != nil { + log.Printf("unable to update stock: %v", err) + } + }) +} diff --git a/deployment/deployment.yaml b/deployment/deployment.yaml index 46c2ab6..16dc40f 100644 --- a/deployment/deployment.yaml +++ b/deployment/deployment.yaml @@ -385,3 +385,74 @@ spec: name: cart-backoffice port: number: 8080 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: cart-inventory + arch: amd64 + name: cart-inventory-x86 +spec: + replicas: 1 + selector: + matchLabels: + app: cart-inventory + arch: amd64 + template: + metadata: + labels: + app: cart-inventory + arch: amd64 + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: kubernetes.io/arch + operator: NotIn + values: + - arm64 + imagePullSecrets: + - name: regcred + serviceAccountName: default + containers: + - image: registry.knatofs.se/go-cart-actor-amd64:latest + name: cart-inventory-amd64 + imagePullPolicy: Always + command: ["/go-cart-inventory"] + lifecycle: + preStop: + exec: + command: ["sleep", "15"] + ports: + - containerPort: 8080 + name: web + livenessProbe: + httpGet: + path: /livez + port: web + failureThreshold: 1 + periodSeconds: 30 + readinessProbe: + httpGet: + path: /readyz + port: web + failureThreshold: 2 + initialDelaySeconds: 2 + periodSeconds: 30 + resources: + limits: + memory: "256Mi" + cpu: "500m" + requests: + memory: "50Mi" + cpu: "500m" + env: + - name: TZ + value: "Europe/Stockholm" + - name: REDIS_ADDRESS + value: "10.10.3.18:6379" + - name: REDIS_PASSWORD + value: "slaskredis" diff --git a/go.mod b/go.mod index 475bbb0..b884be2 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( ) require ( + git.tornberg.me/mats/go-redis-inventory v0.0.0-20251110193851-19d7ad0de6e5 // indirect github.com/RoaringBitmap/roaring/v2 v2.13.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.24.1 // indirect diff --git a/go.sum b/go.sum index 4b16d14..38e517a 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +git.tornberg.me/mats/go-redis-inventory v0.0.0-20251110193851-19d7ad0de6e5 h1:54ZKuqppO6reMmnWOYJaFMlPJK947xnPrv3zDbSuknQ= +git.tornberg.me/mats/go-redis-inventory v0.0.0-20251110193851-19d7ad0de6e5/go.mod h1:jrDU55O7sdN2RJr99upmig/FAla/mW1Cdju7834TXug= github.com/RoaringBitmap/roaring/v2 v2.13.0 h1:38BxJ6lGPcBLykIRCyYtViB/By3+a/iS9znKsiBbhNc= github.com/RoaringBitmap/roaring/v2 v2.13.0/go.mod h1:Mpi+oQ+3oCU7g1aF75Ib/XYCTqjTGpHI0f8djSZVY3I= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/pkg/inventory/listener.go b/pkg/inventory/listener.go deleted file mode 100644 index 41b32d2..0000000 --- a/pkg/inventory/listener.go +++ /dev/null @@ -1,85 +0,0 @@ -package inventory - -import ( - "context" - "encoding/json" - "fmt" - "log" - "strings" - - "github.com/redis/go-redis/v9" -) - -type InventoryChange struct { - SKU string `json:"sku"` - StockLocationID string `json:"stock_location_id"` - Value int64 `json:"value"` -} - -type InventoryChangeHandler func(changes []InventoryChange) - -type InventoryChangeListener struct { - client *redis.Client - ctx context.Context - handler InventoryChangeHandler - ready chan struct{} -} - -func NewInventoryChangeListener(client *redis.Client, ctx context.Context, handler InventoryChangeHandler) *InventoryChangeListener { - return &InventoryChangeListener{ - client: client, - ctx: ctx, - handler: handler, - ready: make(chan struct{}), - } -} - -func (l *InventoryChangeListener) WaitReady() { - <-l.ready -} - -func (l *InventoryChangeListener) Start() error { - pubsub := l.client.Subscribe(l.ctx, "inventory_changed") - defer pubsub.Close() - - // Signal that we're ready to receive messages - close(l.ready) - - ch := pubsub.Channel() - - for msg := range ch { - var payload map[string]int64 - if err := json.Unmarshal([]byte(msg.Payload), &payload); err != nil { - log.Printf("Failed to unmarshal inventory change message: %v", err) - continue - } - - changes := make([]InventoryChange, 0, len(payload)) - for key, newValue := range payload { - // Parse key: "inventory:sku:location" - parts := strings.Split(key, ":") - if len(parts) != 3 || parts[0] != "inventory" { - log.Printf("Invalid inventory key format: %s", key) - continue - } - sku := parts[1] - locationID := parts[2] - changes = append(changes, InventoryChange{ - SKU: sku, - StockLocationID: locationID, - Value: newValue, - }) - } - - if l.handler != nil { - l.handler(changes) - } else { - // Default logging - for _, change := range changes { - fmt.Printf("Inventory changed: SKU %s at location %s -> %d\n", change.SKU, change.StockLocationID, change.Value) - } - } - } - - return nil -} diff --git a/pkg/inventory/redis_service.go b/pkg/inventory/redis_service.go deleted file mode 100644 index 51bd872..0000000 --- a/pkg/inventory/redis_service.go +++ /dev/null @@ -1,251 +0,0 @@ -package inventory - -import ( - "context" - "errors" - "fmt" - "strconv" - - "github.com/redis/go-redis/v9" -) - -type RedisInventoryService struct { - client *redis.Client - ctx context.Context - luaScripts map[string]*redis.Script -} - -func NewRedisInventoryService(client *redis.Client, ctx context.Context) (*RedisInventoryService, error) { - rdb := client - - // Ping Redis to check connection - _, err := rdb.Ping(ctx).Result() - if err != nil { - return nil, err - } - - return &RedisInventoryService{ - client: rdb, - ctx: ctx, - luaScripts: make(map[string]*redis.Script), - }, nil -} - -func (s *RedisInventoryService) LoadLuaScript(key string) error { - // Get the script from Redis - script, err := s.client.Get(s.ctx, key).Result() - if err != nil { - return err - } - - // Load the script into the luaScripts cache - s.luaScripts[key] = redis.NewScript(script) - return nil -} - -func (s *RedisInventoryService) AddWarehouse(warehouse *Warehouse) error { - // Convert warehouse to Redis-friendly format - data := map[string]interface{}{ - "id": string(warehouse.ID), - "name": warehouse.Name, - "inventory": warehouse.Inventory, - } - - // Store in Redis with a key pattern like "warehouse:" - key := "warehouse:" + string(warehouse.ID) - _, err := s.client.HMSet(s.ctx, key, data).Result() - return err -} - -func (s *RedisInventoryService) GetInventory(sku SKU, locationID LocationID) (int64, error) { - - cmd := s.client.Get(s.ctx, getInventoryKey(sku, locationID)) - if err := cmd.Err(); err != nil { - return 0, err - } - - i, err := cmd.Int64() - if err != nil { - return 0, err - } - - return i, nil -} - -func getInventoryKey(sku SKU, locationID LocationID) string { - return fmt.Sprintf("inventory:%s:%s", sku, locationID) -} - -func (s *RedisInventoryService) UpdateInventory(rdb redis.Pipeliner, sku SKU, locationID LocationID, quantity int64) error { - key := getInventoryKey(sku, locationID) - cmd := rdb.Set(s.ctx, key, quantity, 0) - return cmd.Err() -} - -var ( - ErrInsufficientInventory = errors.New("insufficient inventory") - ErrInvalidQuantity = errors.New("invalid quantity") - ErrMissingReservation = errors.New("missing reservation") -) - -func makeKeysAndArgs(req ...ReserveRequest) ([]string, []string) { - keys := make([]string, len(req)) - args := make([]string, len(req)) - for i, r := range req { - if r.Quantity <= 0 { - return nil, nil - } - keys[i] = getInventoryKey(r.SKU, r.LocationID) - args[i] = strconv.Itoa(int(r.Quantity)) - } - return keys, args -} - -func (s *RedisInventoryService) ReservationCheck(req ...ReserveRequest) error { - if len(req) == 0 { - return ErrMissingReservation - } - - keys, args := makeKeysAndArgs(req...) - if keys == nil || args == nil { - return ErrInvalidQuantity - } - - cmd := reservationCheck.Run(s.ctx, s.client, keys, args) - if err := cmd.Err(); err != nil { - return err - } - if val, err := cmd.Int(); err != nil { - return err - } else if val != 1 { - return ErrInsufficientInventory - } - return nil -} - -func (s *RedisInventoryService) ReserveInventory(req ...ReserveRequest) error { - if len(req) == 0 { - return ErrMissingReservation - } - - keys, args := makeKeysAndArgs(req...) - if keys == nil || args == nil { - return ErrInvalidQuantity - } - cmd := reserveScript.Run(s.ctx, s.client, keys, args) - if err := cmd.Err(); err != nil { - return err - } - if val, err := cmd.Int(); err != nil { - return err - } else if val != 1 { - return ErrInsufficientInventory - } - return nil -} - -var reservationCheck = redis.NewScript(` --- Get the number of keys passed -local num_keys = #KEYS - --- Ensure the number of keys matches the number of quantities -if num_keys ~= #ARGV then - return {err = "Script requires the same number of keys and quantities."} -end - -local new_values = {} -local payload = {} - --- --- --- 1. CHECK PHASE --- --- --- Loop through all keys to check their values first -for i = 1, num_keys do - local key = KEYS[i] - local quantity_to_check = tonumber(ARGV[i]) - - -- Fail if the quantity is not a valid number - if not quantity_to_check then - return {err = "Invalid quantity provided for key: " .. key} - end - - -- Get the current value stored at the key - local current_val = tonumber(redis.call('GET', key)) - - -- Check the condition - -- Fail if: - -- 1. The key doesn't exist (current_val is nil) - -- 2. The value is not > the required quantity - if not current_val or current_val <= quantity_to_check then - -- Return 0 to indicate the operation failed and no changes were made - return 0 - end -end - -return 1 -`) - -var reserveScript = redis.NewScript(` --- Get the number of keys passed -local num_keys = #KEYS - --- Ensure the number of keys matches the number of quantities -if num_keys ~= #ARGV then - return {err = "Script requires the same number of keys and quantities."} -end - -local new_values = {} -local payload = {} - --- --- --- 1. CHECK PHASE --- --- --- Loop through all keys to check their values first -for i = 1, num_keys do - local key = KEYS[i] - local quantity_to_check = tonumber(ARGV[i]) - - -- Fail if the quantity is not a valid number - if not quantity_to_check then - return {err = "Invalid quantity provided for key: " .. key} - end - - -- Get the current value stored at the key - local current_val = tonumber(redis.call('GET', key)) - - -- Check the condition - -- Fail if: - -- 1. The key doesn't exist (current_val is nil) - -- 2. The value is not > the required quantity - if not current_val or current_val <= quantity_to_check then - -- Return 0 to indicate the operation failed and no changes were made - return 0 - end - - -- If the check passes, store the new value - local new_val = current_val - quantity_to_check - table.insert(new_values, new_val) - - -- Add this key and its *new* value to our payload map - payload[key] = new_val -end - --- --- --- 2. UPDATE PHASE --- --- --- If the script reaches this point, all checks passed. --- Now, loop again and apply all the updates. -for i = 1, num_keys do - local key = KEYS[i] - local new_val = new_values[i] - - -- Set the key to its new calculated value - redis.call('SET', key, new_val) -end -local message_payload = cjson.encode(payload) - --- Publish the JSON-encoded message to the specified channel -redis.call('PUBLISH', "inventory_changed", message_payload) --- Return 1 to indicate the operation was successful -return 1 -`) diff --git a/pkg/inventory/types.go b/pkg/inventory/types.go deleted file mode 100644 index 4ae6be5..0000000 --- a/pkg/inventory/types.go +++ /dev/null @@ -1,30 +0,0 @@ -package inventory - -import "time" - -type SKU string - -type LocationID string - -type InventoryItem struct { - SKU SKU - Quantity int - LastUpdate time.Time -} - -type Warehouse struct { - ID LocationID - Name string - Inventory []InventoryItem -} - -type InventoryService interface { - GetInventory(sku SKU, locationID LocationID) (int64, error) - ReserveInventory(req ...ReserveRequest) error -} - -type ReserveRequest struct { - SKU SKU - LocationID LocationID - Quantity uint32 -}