add some locks
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m52s
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m52s
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
messages "git.tornberg.me/go-cart-actor/proto"
|
messages "git.tornberg.me/go-cart-actor/proto"
|
||||||
@@ -41,6 +42,7 @@ type CartDelivery struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CartGrain struct {
|
type CartGrain struct {
|
||||||
|
mu sync.RWMutex
|
||||||
lastItemId int
|
lastItemId int
|
||||||
lastDeliveryId int
|
lastDeliveryId int
|
||||||
storageMessages []Message
|
storageMessages []Message
|
||||||
@@ -111,6 +113,8 @@ func (c *CartGrain) AddItem(sku string, qty int) ([]byte, error) {
|
|||||||
func (c *CartGrain) GetStorageMessage(since int64) []StorableMessage {
|
func (c *CartGrain) GetStorageMessage(since int64) []StorableMessage {
|
||||||
|
|
||||||
ret := make([]StorableMessage, 0)
|
ret := make([]StorableMessage, 0)
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
for _, message := range c.storageMessages {
|
for _, message := range c.storageMessages {
|
||||||
if *message.TimeStamp > since {
|
if *message.TimeStamp > since {
|
||||||
ret = append(ret, message)
|
ret = append(ret, message)
|
||||||
@@ -189,6 +193,7 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, erro
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
|
c.mu.Lock()
|
||||||
c.lastItemId++
|
c.lastItemId++
|
||||||
c.Items = append(c.Items, &CartItem{
|
c.Items = append(c.Items, &CartItem{
|
||||||
Id: c.lastItemId,
|
Id: c.lastItemId,
|
||||||
@@ -199,6 +204,7 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, erro
|
|||||||
Image: msg.Image,
|
Image: msg.Image,
|
||||||
})
|
})
|
||||||
c.TotalPrice += msg.Price * int64(msg.Quantity)
|
c.TotalPrice += msg.Price * int64(msg.Quantity)
|
||||||
|
c.mu.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case ChangeQuantityType:
|
case ChangeQuantityType:
|
||||||
@@ -269,8 +275,11 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, erro
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !isReplay {
|
if !isReplay {
|
||||||
|
c.mu.Lock()
|
||||||
c.storageMessages = append(c.storageMessages, *message)
|
c.storageMessages = append(c.storageMessages, *message)
|
||||||
|
c.mu.Unlock()
|
||||||
}
|
}
|
||||||
return json.Marshal(c)
|
return json.Marshal(c)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
@@ -36,6 +37,7 @@ type Ttl struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type GrainLocalPool struct {
|
type GrainLocalPool struct {
|
||||||
|
mu sync.RWMutex
|
||||||
grains map[CartId]*CartGrain
|
grains map[CartId]*CartGrain
|
||||||
expiry []Ttl
|
expiry []Ttl
|
||||||
spawn func(id CartId) (*CartGrain, error)
|
spawn func(id CartId) (*CartGrain, error)
|
||||||
@@ -62,6 +64,8 @@ func NewGrainLocalPool(size int, ttl time.Duration, spawn func(id CartId) (*Cart
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *GrainLocalPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) {
|
func (p *GrainLocalPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
for id := range availableWithLastChangeUnix {
|
for id := range availableWithLastChangeUnix {
|
||||||
if _, ok := p.grains[id]; !ok {
|
if _, ok := p.grains[id]; !ok {
|
||||||
p.grains[id] = nil
|
p.grains[id] = nil
|
||||||
@@ -76,6 +80,8 @@ func (p *GrainLocalPool) SetAvailable(availableWithLastChangeUnix map[CartId]int
|
|||||||
func (p *GrainLocalPool) Purge() {
|
func (p *GrainLocalPool) Purge() {
|
||||||
lastChangeTime := time.Now().Add(-p.Ttl)
|
lastChangeTime := time.Now().Add(-p.Ttl)
|
||||||
keepChanged := lastChangeTime.Unix()
|
keepChanged := lastChangeTime.Unix()
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
for i := 0; i < len(p.expiry); i++ {
|
for i := 0; i < len(p.expiry); i++ {
|
||||||
item := p.expiry[i]
|
item := p.expiry[i]
|
||||||
if item.Expires.Before(time.Now()) {
|
if item.Expires.Before(time.Now()) {
|
||||||
@@ -100,6 +106,8 @@ func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain {
|
|||||||
|
|
||||||
func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) {
|
func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) {
|
||||||
var err error
|
var err error
|
||||||
|
p.mu.RLock()
|
||||||
|
defer p.mu.RUnlock()
|
||||||
grain, ok := p.grains[id]
|
grain, ok := p.grains[id]
|
||||||
grainLookups.Inc()
|
grainLookups.Inc()
|
||||||
if grain == nil || !ok {
|
if grain == nil || !ok {
|
||||||
|
|||||||
Reference in New Issue
Block a user