From 5f48a845b14f742a6055d69520e9d05dc17187bd Mon Sep 17 00:00:00 2001 From: matst80 Date: Mon, 11 Nov 2024 17:28:49 +0100 Subject: [PATCH] add some locks --- cart-grain.go | 9 +++++++++ grain-pool.go | 8 ++++++++ 2 files changed, 17 insertions(+) diff --git a/cart-grain.go b/cart-grain.go index ca97cc4..447f032 100644 --- a/cart-grain.go +++ b/cart-grain.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "slices" + "sync" "time" messages "git.tornberg.me/go-cart-actor/proto" @@ -41,6 +42,7 @@ type CartDelivery struct { } type CartGrain struct { + mu sync.RWMutex lastItemId int lastDeliveryId int storageMessages []Message @@ -111,6 +113,8 @@ func (c *CartGrain) AddItem(sku string, qty int) ([]byte, error) { func (c *CartGrain) GetStorageMessage(since int64) []StorableMessage { ret := make([]StorableMessage, 0) + c.mu.RLock() + defer c.mu.RUnlock() for _, message := range c.storageMessages { if *message.TimeStamp > since { ret = append(ret, message) @@ -189,6 +193,7 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, erro } } if !found { + c.mu.Lock() c.lastItemId++ c.Items = append(c.Items, &CartItem{ Id: c.lastItemId, @@ -199,6 +204,7 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, erro Image: msg.Image, }) c.TotalPrice += msg.Price * int64(msg.Quantity) + c.mu.Unlock() } } case ChangeQuantityType: @@ -269,8 +275,11 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, erro if err != nil { return nil, err } + if !isReplay { + c.mu.Lock() c.storageMessages = append(c.storageMessages, *message) + c.mu.Unlock() } return json.Marshal(c) } diff --git a/grain-pool.go b/grain-pool.go index 1a8827c..9a26804 100644 --- a/grain-pool.go +++ b/grain-pool.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -36,6 +37,7 @@ type Ttl struct { } type GrainLocalPool struct { + mu sync.RWMutex grains map[CartId]*CartGrain expiry []Ttl spawn func(id CartId) (*CartGrain, error) @@ -62,6 +64,8 @@ func NewGrainLocalPool(size int, ttl time.Duration, spawn func(id CartId) (*Cart } func (p *GrainLocalPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) { + p.mu.Lock() + defer p.mu.Unlock() for id := range availableWithLastChangeUnix { if _, ok := p.grains[id]; !ok { p.grains[id] = nil @@ -76,6 +80,8 @@ func (p *GrainLocalPool) SetAvailable(availableWithLastChangeUnix map[CartId]int func (p *GrainLocalPool) Purge() { lastChangeTime := time.Now().Add(-p.Ttl) keepChanged := lastChangeTime.Unix() + p.mu.Lock() + defer p.mu.Unlock() for i := 0; i < len(p.expiry); i++ { item := p.expiry[i] if item.Expires.Before(time.Now()) { @@ -100,6 +106,8 @@ func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain { func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) { var err error + p.mu.RLock() + defer p.mu.RUnlock() grain, ok := p.grains[id] grainLookups.Inc() if grain == nil || !ok {