package main import ( "encoding/json" "fmt" "log" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) var ( poolGrains = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_grains_in_pool", Help: "The total number of grains in the pool", }) poolSize = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_pool_size", Help: "The total number of mutations", }) poolUsage = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_grain_pool_usage", Help: "The current usage of the grain pool", }) ) type GrainPool interface { Process(id CartId, messages ...Message) ([]byte, error) Get(id CartId) ([]byte, error) } type Ttl struct { Expires time.Time Grain *CartGrain } type GrainLocalPool struct { grains map[CartId]*CartGrain expiry []Ttl spawn func(id CartId) (*CartGrain, error) Ttl time.Duration PoolSize int } func NewGrainLocalPool(size int, ttl time.Duration, spawn func(id CartId) (*CartGrain, error)) *GrainLocalPool { ret := &GrainLocalPool{ spawn: spawn, grains: make(map[CartId]*CartGrain), expiry: make([]Ttl, 0), Ttl: ttl, PoolSize: size, } cartPurge := time.NewTicker(time.Minute) go func() { <-cartPurge.C ret.Purge() }() return ret } func (p *GrainLocalPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) { for id := range availableWithLastChangeUnix { if _, ok := p.grains[id]; !ok { p.grains[id] = nil p.expiry = append(p.expiry, Ttl{ Expires: time.Now().Add(p.Ttl), Grain: nil, }) } } } func (p *GrainLocalPool) Purge() { lastChangeTime := time.Now().Add(-p.Ttl) keepChanged := lastChangeTime.Unix() for i := 0; i < len(p.expiry); i++ { item := p.expiry[i] if item.Expires.Before(time.Now()) { if item.Grain.GetLastChange() > keepChanged { log.Printf("Expired item %s changed, keeping", item.Grain.GetId()) p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) p.expiry = append(p.expiry, item) } else { log.Printf("Item %s expired", item.Grain.GetId()) delete(p.grains, item.Grain.GetId()) p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) } } else { break } } } func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain { return p.grains } func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) { var err error grain, ok := p.grains[id] grainLookups.Inc() if grain == nil || !ok { if len(p.grains) >= p.PoolSize { if p.expiry[0].Expires.Before(time.Now()) { delete(p.grains, p.expiry[0].Grain.GetId()) p.expiry = p.expiry[1:] } else { return nil, fmt.Errorf("pool is full") } } grain, err = p.spawn(id) p.grains[id] = grain } go func() { l := float64(len(p.grains)) ps := float64(p.PoolSize) poolUsage.Set(l / ps) poolGrains.Set(l) poolSize.Set(ps) }() return grain, err } func (p *GrainLocalPool) Process(id CartId, messages ...Message) ([]byte, error) { grain, err := p.GetGrain(id) if err == nil && grain != nil { for _, message := range messages { _, err = grain.HandleMessage(&message, false) } } if err != nil { return nil, err } return json.Marshal(grain) } func (p *GrainLocalPool) Get(id CartId) ([]byte, error) { grain, err := p.GetGrain(id) if err != nil { return nil, err } return json.Marshal(grain) }