From 19d7ad0de6e5507ab2f9c05eacdb806ec301595e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mats=20T=C3=B6rnberg?= Date: Mon, 10 Nov 2025 20:38:51 +0100 Subject: [PATCH] package --- README.md | 0 go.mod | 10 ++ go.sum | 10 ++ pkg/inventory/listener.go | 85 +++++++++++ pkg/inventory/redis_service.go | 249 +++++++++++++++++++++++++++++++++ pkg/inventory/types.go | 31 ++++ 6 files changed, 385 insertions(+) create mode 100644 README.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 pkg/inventory/listener.go create mode 100644 pkg/inventory/redis_service.go create mode 100644 pkg/inventory/types.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..beb42ac --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module git.tornberg.me/mats/go-redis-inventory + +go 1.25.1 + +require github.com/redis/go-redis/v9 v9.16.0 + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e578cfc --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4= +github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= diff --git a/pkg/inventory/listener.go b/pkg/inventory/listener.go new file mode 100644 index 0000000..41b32d2 --- /dev/null +++ b/pkg/inventory/listener.go @@ -0,0 +1,85 @@ +package inventory + +import ( + "context" + "encoding/json" + "fmt" + "log" + "strings" + + "github.com/redis/go-redis/v9" +) + +type InventoryChange struct { + SKU string `json:"sku"` + StockLocationID string `json:"stock_location_id"` + Value int64 `json:"value"` +} + +type InventoryChangeHandler func(changes []InventoryChange) + +type InventoryChangeListener struct { + client *redis.Client + ctx context.Context + handler InventoryChangeHandler + ready chan struct{} +} + +func NewInventoryChangeListener(client *redis.Client, ctx context.Context, handler InventoryChangeHandler) *InventoryChangeListener { + return &InventoryChangeListener{ + client: client, + ctx: ctx, + handler: handler, + ready: make(chan struct{}), + } +} + +func (l *InventoryChangeListener) WaitReady() { + <-l.ready +} + +func (l *InventoryChangeListener) Start() error { + pubsub := l.client.Subscribe(l.ctx, "inventory_changed") + defer pubsub.Close() + + // Signal that we're ready to receive messages + close(l.ready) + + ch := pubsub.Channel() + + for msg := range ch { + var payload map[string]int64 + if err := json.Unmarshal([]byte(msg.Payload), &payload); err != nil { + log.Printf("Failed to unmarshal inventory change message: %v", err) + continue + } + + changes := make([]InventoryChange, 0, len(payload)) + for key, newValue := range payload { + // Parse key: "inventory:sku:location" + parts := strings.Split(key, ":") + if len(parts) != 3 || parts[0] != "inventory" { + log.Printf("Invalid inventory key format: %s", key) + continue + } + sku := parts[1] + locationID := parts[2] + changes = append(changes, InventoryChange{ + SKU: sku, + StockLocationID: locationID, + Value: newValue, + }) + } + + if l.handler != nil { + l.handler(changes) + } else { + // Default logging + for _, change := range changes { + fmt.Printf("Inventory changed: SKU %s at location %s -> %d\n", change.SKU, change.StockLocationID, change.Value) + } + } + } + + return nil +} diff --git a/pkg/inventory/redis_service.go b/pkg/inventory/redis_service.go new file mode 100644 index 0000000..22c41a6 --- /dev/null +++ b/pkg/inventory/redis_service.go @@ -0,0 +1,249 @@ +package inventory + +import ( + "context" + "errors" + "fmt" + "strconv" + + "github.com/redis/go-redis/v9" +) + +type RedisInventoryService struct { + client *redis.Client + ctx context.Context + luaScripts map[string]*redis.Script +} + +func NewRedisInventoryService(client *redis.Client, ctx context.Context) (*RedisInventoryService, error) { + rdb := client + + // Ping Redis to check connection + _, err := rdb.Ping(ctx).Result() + if err != nil { + return nil, err + } + + return &RedisInventoryService{ + client: rdb, + ctx: ctx, + luaScripts: make(map[string]*redis.Script), + }, nil +} + +func (s *RedisInventoryService) LoadLuaScript(key string) error { + // Get the script from Redis + script, err := s.client.Get(s.ctx, key).Result() + if err != nil { + return err + } + + // Load the script into the luaScripts cache + s.luaScripts[key] = redis.NewScript(script) + return nil +} + +func (s *RedisInventoryService) AddWarehouse(warehouse *Warehouse) error { + // Convert warehouse to Redis-friendly format + data := map[string]interface{}{ + "id": string(warehouse.ID), + "name": warehouse.Name, + "inventory": warehouse.Inventory, + } + + // Store in Redis with a key pattern like "warehouse:" + key := "warehouse:" + string(warehouse.ID) + _, err := s.client.HMSet(s.ctx, key, data).Result() + return err +} + +func (s *RedisInventoryService) GetInventory(sku SKU, locationID LocationID) (int64, error) { + + cmd := s.client.Get(s.ctx, getInventoryKey(sku, locationID)) + if err := cmd.Err(); err != nil { + return 0, err + } + + i, err := cmd.Int64() + if err != nil { + return 0, err + } + + return i, nil +} + +func getInventoryKey(sku SKU, locationID LocationID) string { + return fmt.Sprintf("inventory:%s:%s", sku, locationID) +} + +func (s *RedisInventoryService) UpdateInventory(rdb redis.Pipeliner, sku SKU, locationID LocationID, quantity int64) error { + key := getInventoryKey(sku, locationID) + cmd := rdb.Set(s.ctx, key, quantity, 0) + return cmd.Err() +} + +var ( + ErrInsufficientInventory = errors.New("insufficient inventory") + ErrInvalidQuantity = errors.New("invalid quantity") + ErrMissingReservation = errors.New("missing reservation") +) + +func makeKeysAndArgs(req ...ReserveRequest) ([]string, []string) { + keys := make([]string, len(req)) + args := make([]string, len(req)) + for i, r := range req { + if r.Quantity <= 0 { + return nil, nil + } + keys[i] = getInventoryKey(r.SKU, r.LocationID) + args[i] = strconv.Itoa(int(r.Quantity)) + } + return keys, args +} + +func (s *RedisInventoryService) ReservationCheck(req ...ReserveRequest) (*ReserveRequest, error) { + if len(req) == 0 { + return nil, ErrMissingReservation + } + + keys, args := makeKeysAndArgs(req...) + if keys == nil || args == nil { + return nil, ErrInvalidQuantity + } + + cmd := reservationCheck.Run(s.ctx, s.client, keys, args) + if err := cmd.Err(); err != nil { + return nil, err + } + val := cmd.Val().(int64) + if val > 0 { + return nil, nil // success + } + // val is negative index of failing request + failingIndex := int(-val) - 1 // 0-based + return &req[failingIndex], ErrInsufficientInventory +} + +func (s *RedisInventoryService) ReserveInventory(req ...ReserveRequest) error { + if len(req) == 0 { + return ErrMissingReservation + } + + keys, args := makeKeysAndArgs(req...) + if keys == nil || args == nil { + return ErrInvalidQuantity + } + cmd := reserveScript.Run(s.ctx, s.client, keys, args) + if err := cmd.Err(); err != nil { + return err + } + if val, err := cmd.Int(); err != nil { + return err + } else if val != 1 { + return ErrInsufficientInventory + } + return nil +} + +var reservationCheck = redis.NewScript(` +-- Get the number of keys passed +local num_keys = #KEYS + +-- Ensure the number of keys matches the number of quantities +if num_keys ~= #ARGV then + return {err = "Script requires the same number of keys and quantities."} +end + +-- --- +-- CHECK PHASE +-- --- +-- Loop through all keys to check their values first +for i = 1, num_keys do + local key = KEYS[i] + local quantity_to_check = tonumber(ARGV[i]) + + -- Fail if the quantity is not a valid number + if not quantity_to_check then + return {err = "Invalid quantity provided for key: " .. key} + end + + -- Get the current value stored at the key + local current_val = tonumber(redis.call('GET', key)) + + -- Check the condition + -- Fail if: + -- 1. The key doesn't exist (current_val is nil) + -- 2. The value is not > the required quantity + if not current_val or current_val < quantity_to_check then + -- Return negative index (1-based) to indicate which request failed + return -i + end +end + +return 1 +`) + +var reserveScript = redis.NewScript(` +-- Get the number of keys passed +local num_keys = #KEYS + +-- Ensure the number of keys matches the number of quantities +if num_keys ~= #ARGV then + return {err = "Script requires the same number of keys and quantities."} +end + +local new_values = {} +local payload = {} + +-- --- +-- 1. CHECK PHASE +-- --- +-- Loop through all keys to check their values first +for i = 1, num_keys do + local key = KEYS[i] + local quantity_to_check = tonumber(ARGV[i]) + + -- Fail if the quantity is not a valid number + if not quantity_to_check then + return {err = "Invalid quantity provided for key: " .. key} + end + + -- Get the current value stored at the key + local current_val = tonumber(redis.call('GET', key)) + + -- Check the condition + -- Fail if: + -- 1. The key doesn't exist (current_val is nil) + -- 2. The value is not > the required quantity + if not current_val or current_val < quantity_to_check then + -- Return 0 to indicate the operation failed and no changes were made + return 0 + end + + -- If the check passes, store the new value + local new_val = current_val - quantity_to_check + table.insert(new_values, new_val) + + -- Add this key and its *new* value to our payload map + payload[key] = new_val +end + +-- --- +-- 2. UPDATE PHASE +-- --- +-- If the script reaches this point, all checks passed. +-- Now, loop again and apply all the updates. +for i = 1, num_keys do + local key = KEYS[i] + local new_val = new_values[i] + + -- Set the key to its new calculated value + redis.call('SET', key, new_val) +end +local message_payload = cjson.encode(payload) + +-- Publish the JSON-encoded message to the specified channel +redis.call('PUBLISH', "inventory_changed", message_payload) +-- Return 1 to indicate the operation was successful +return 1 +`) diff --git a/pkg/inventory/types.go b/pkg/inventory/types.go new file mode 100644 index 0000000..0f12171 --- /dev/null +++ b/pkg/inventory/types.go @@ -0,0 +1,31 @@ +package inventory + +import "time" + +type SKU string + +type LocationID string + +type InventoryItem struct { + SKU SKU + Quantity int + LastUpdate time.Time +} + +type Warehouse struct { + ID LocationID + Name string + Inventory []InventoryItem +} + +type InventoryService interface { + GetInventory(sku SKU, locationID LocationID) (int64, error) + ReserveInventory(req ...ReserveRequest) error + ReservationCheck(req ...ReserveRequest) (*ReserveRequest, error) +} + +type ReserveRequest struct { + SKU SKU + LocationID LocationID + Quantity uint32 +}