168 lines
3.7 KiB
Go
168 lines
3.7 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
)
|
|
|
|
var (
|
|
poolGrains = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_grains_in_pool",
|
|
Help: "The total number of grains in the pool",
|
|
})
|
|
poolSize = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_pool_size",
|
|
Help: "The total number of mutations",
|
|
})
|
|
poolUsage = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_grain_pool_usage",
|
|
Help: "The current usage of the grain pool",
|
|
})
|
|
)
|
|
|
|
type GrainPool interface {
|
|
Process(id CartId, messages ...Message) (*FrameWithPayload, error)
|
|
Get(id CartId) (*FrameWithPayload, error)
|
|
}
|
|
|
|
type Ttl struct {
|
|
Expires time.Time
|
|
Grain *CartGrain
|
|
}
|
|
|
|
type GrainLocalPool struct {
|
|
mu sync.RWMutex
|
|
grains map[CartId]*CartGrain
|
|
expiry []Ttl
|
|
spawn func(id CartId) (*CartGrain, error)
|
|
Ttl time.Duration
|
|
PoolSize int
|
|
}
|
|
|
|
func NewGrainLocalPool(size int, ttl time.Duration, spawn func(id CartId) (*CartGrain, error)) *GrainLocalPool {
|
|
|
|
ret := &GrainLocalPool{
|
|
spawn: spawn,
|
|
grains: make(map[CartId]*CartGrain),
|
|
expiry: make([]Ttl, 0),
|
|
Ttl: ttl,
|
|
PoolSize: size,
|
|
}
|
|
|
|
cartPurge := time.NewTicker(time.Minute)
|
|
go func() {
|
|
<-cartPurge.C
|
|
ret.Purge()
|
|
}()
|
|
return ret
|
|
}
|
|
|
|
func (p *GrainLocalPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
for id := range availableWithLastChangeUnix {
|
|
if _, ok := p.grains[id]; !ok {
|
|
p.grains[id] = nil
|
|
p.expiry = append(p.expiry, Ttl{
|
|
Expires: time.Now().Add(p.Ttl),
|
|
Grain: nil,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *GrainLocalPool) Purge() {
|
|
lastChangeTime := time.Now().Add(-p.Ttl)
|
|
keepChanged := lastChangeTime.Unix()
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
for i := 0; i < len(p.expiry); i++ {
|
|
item := p.expiry[i]
|
|
if item.Expires.Before(time.Now()) {
|
|
if item.Grain.GetLastChange() > keepChanged {
|
|
log.Printf("Expired item %s changed, keeping", item.Grain.GetId())
|
|
if i < len(p.expiry)-1 {
|
|
p.expiry = append(p.expiry[:i], p.expiry[i+1:]...)
|
|
p.expiry = append(p.expiry, item)
|
|
} else {
|
|
p.expiry = append(p.expiry[:i], item)
|
|
}
|
|
|
|
} else {
|
|
log.Printf("Item %s expired", item.Grain.GetId())
|
|
delete(p.grains, item.Grain.GetId())
|
|
if i < len(p.expiry)-1 {
|
|
p.expiry = append(p.expiry[:i], p.expiry[i+1:]...)
|
|
} else {
|
|
p.expiry = p.expiry[:i]
|
|
}
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain {
|
|
return p.grains
|
|
}
|
|
|
|
func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) {
|
|
var err error
|
|
// p.mu.RLock()
|
|
// defer p.mu.RUnlock()
|
|
grain, ok := p.grains[id]
|
|
grainLookups.Inc()
|
|
if grain == nil || !ok {
|
|
if len(p.grains) >= p.PoolSize {
|
|
if p.expiry[0].Expires.Before(time.Now()) {
|
|
delete(p.grains, p.expiry[0].Grain.GetId())
|
|
p.expiry = p.expiry[1:]
|
|
} else {
|
|
return nil, fmt.Errorf("pool is full")
|
|
}
|
|
}
|
|
grain, err = p.spawn(id)
|
|
|
|
p.grains[id] = grain
|
|
}
|
|
go func() {
|
|
l := float64(len(p.grains))
|
|
ps := float64(p.PoolSize)
|
|
poolUsage.Set(l / ps)
|
|
poolGrains.Set(l)
|
|
poolSize.Set(ps)
|
|
}()
|
|
return grain, err
|
|
}
|
|
|
|
func (p *GrainLocalPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) {
|
|
grain, err := p.GetGrain(id)
|
|
var result *FrameWithPayload
|
|
if err == nil && grain != nil {
|
|
for _, message := range messages {
|
|
result, err = grain.HandleMessage(&message, false)
|
|
}
|
|
}
|
|
return result, err
|
|
}
|
|
|
|
func (p *GrainLocalPool) Get(id CartId) (*FrameWithPayload, error) {
|
|
grain, err := p.GetGrain(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
data, err := json.Marshal(grain)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ret := MakeFrameWithPayload(0, 200, data)
|
|
return &ret, nil
|
|
}
|