281 lines
7.4 KiB
Go
281 lines
7.4 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
)
|
|
|
|
// grain-pool.go
|
|
//
|
|
// Migration Note:
|
|
// This file has been migrated to use uint64 cart keys internally (derived
|
|
// from the new CartID base62 representation). For backward compatibility,
|
|
// a deprecated legacy map keyed by CartId is maintained so existing code
|
|
// that directly indexes pool.grains with a CartId continues to compile
|
|
// until the full refactor across SyncedPool is completed.
|
|
//
|
|
// Authoritative storage: grains (map[uint64]*CartGrain)
|
|
// Legacy compatibility: grainsLegacy (map[CartId]*CartGrain) - kept in sync.
|
|
//
|
|
// Once all external usages are updated to rely on helper accessors,
|
|
// grainsLegacy can be removed.
|
|
//
|
|
// ---------------------------------------------------------------------------
|
|
|
|
var (
|
|
poolGrains = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_grains_in_pool",
|
|
Help: "The total number of grains in the pool",
|
|
})
|
|
poolSize = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_pool_size",
|
|
Help: "The total number of mutations",
|
|
})
|
|
poolUsage = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_grain_pool_usage",
|
|
Help: "The current usage of the grain pool",
|
|
})
|
|
)
|
|
|
|
// GrainPool interface remains legacy-compatible.
|
|
type GrainPool interface {
|
|
Apply(id CartId, mutation interface{}) (*CartGrain, error)
|
|
Get(id CartId) (*CartGrain, error)
|
|
// OwnerHost returns the primary owner host for a given cart id.
|
|
OwnerHost(id CartId) string
|
|
// Hostname returns the hostname of the local pool implementation.
|
|
Hostname() string
|
|
}
|
|
|
|
// Ttl keeps expiry info
|
|
type Ttl struct {
|
|
Expires time.Time
|
|
Grain *CartGrain
|
|
}
|
|
|
|
// GrainLocalPool now stores grains keyed by uint64 (CartKey).
|
|
type GrainLocalPool struct {
|
|
mu sync.RWMutex
|
|
grains map[uint64]*CartGrain // authoritative only
|
|
expiry []Ttl
|
|
spawn func(id CartId) (*CartGrain, error)
|
|
Ttl time.Duration
|
|
PoolSize int
|
|
}
|
|
|
|
// NewGrainLocalPool constructs a new pool.
|
|
func NewGrainLocalPool(size int, ttl time.Duration, spawn func(id CartId) (*CartGrain, error)) *GrainLocalPool {
|
|
ret := &GrainLocalPool{
|
|
spawn: spawn,
|
|
grains: make(map[uint64]*CartGrain),
|
|
expiry: make([]Ttl, 0),
|
|
Ttl: ttl,
|
|
PoolSize: size,
|
|
}
|
|
cartPurge := time.NewTicker(time.Minute)
|
|
go func() {
|
|
for range cartPurge.C {
|
|
ret.Purge()
|
|
}
|
|
}()
|
|
return ret
|
|
}
|
|
|
|
// keyFromCartId derives the uint64 key from a legacy CartId deterministically.
|
|
func keyFromCartId(id CartId) uint64 {
|
|
return LegacyToCartKey(id)
|
|
}
|
|
|
|
// storeGrain indexes a grain in both maps.
|
|
func (p *GrainLocalPool) storeGrain(id CartId, g *CartGrain) {
|
|
k := keyFromCartId(id)
|
|
p.grains[k] = g
|
|
}
|
|
|
|
// deleteGrain removes a grain from both maps.
|
|
func (p *GrainLocalPool) deleteGrain(id CartId) {
|
|
k := keyFromCartId(id)
|
|
delete(p.grains, k)
|
|
}
|
|
|
|
// SetAvailable pre-populates placeholder entries (legacy signature).
|
|
func (p *GrainLocalPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
for id := range availableWithLastChangeUnix {
|
|
k := keyFromCartId(id)
|
|
if _, ok := p.grains[k]; !ok {
|
|
p.grains[k] = nil
|
|
p.expiry = append(p.expiry, Ttl{
|
|
Expires: time.Now().Add(p.Ttl),
|
|
Grain: nil,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Purge removes expired grains.
|
|
func (p *GrainLocalPool) Purge() {
|
|
lastChangeTime := time.Now().Add(-p.Ttl)
|
|
keepChanged := lastChangeTime.Unix()
|
|
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
for i := 0; i < len(p.expiry); i++ {
|
|
item := p.expiry[i]
|
|
if item.Grain == nil {
|
|
continue
|
|
}
|
|
if item.Expires.Before(time.Now()) {
|
|
if item.Grain.GetLastChange() > keepChanged {
|
|
log.Printf("Expired item %s changed, keeping", item.Grain.GetId())
|
|
if i < len(p.expiry)-1 {
|
|
p.expiry = append(p.expiry[:i], p.expiry[i+1:]...)
|
|
p.expiry = append(p.expiry, item)
|
|
} else {
|
|
// move last to end (noop)
|
|
p.expiry = append(p.expiry[:i], item)
|
|
}
|
|
} else {
|
|
log.Printf("Item %s expired", item.Grain.GetId())
|
|
p.deleteGrain(item.Grain.GetId())
|
|
if i < len(p.expiry)-1 {
|
|
p.expiry = append(p.expiry[:i], p.expiry[i+1:]...)
|
|
} else {
|
|
p.expiry = p.expiry[:i]
|
|
}
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// RefreshExpiry resets the expiry timestamp for a living grain to now + TTL.
|
|
// Called after successful mutations to implement a sliding inactivity window.
|
|
func (p *GrainLocalPool) RefreshExpiry(id CartId) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
for i := range p.expiry {
|
|
g := p.expiry[i].Grain
|
|
if g != nil && g.Id == id {
|
|
p.expiry[i].Expires = time.Now().Add(p.Ttl)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetGrains returns a legacy view of grains (copy) for compatibility.
|
|
func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
out := make(map[CartId]*CartGrain, len(p.grains))
|
|
for _, g := range p.grains {
|
|
if g != nil {
|
|
out[g.GetId()] = g
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// statsUpdate updates Prometheus gauges asynchronously.
|
|
func (p *GrainLocalPool) statsUpdate() {
|
|
go func(size int) {
|
|
l := float64(size)
|
|
ps := float64(p.PoolSize)
|
|
poolUsage.Set(l / ps)
|
|
poolGrains.Set(l)
|
|
poolSize.Set(ps)
|
|
}(len(p.grains))
|
|
}
|
|
|
|
// GetGrain retrieves or spawns a grain (legacy id signature).
|
|
func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) {
|
|
grainLookups.Inc()
|
|
k := keyFromCartId(id)
|
|
|
|
p.mu.RLock()
|
|
grain, ok := p.grains[k]
|
|
p.mu.RUnlock()
|
|
|
|
var err error
|
|
if grain == nil || !ok {
|
|
p.mu.Lock()
|
|
// Re-check under write lock
|
|
grain, ok = p.grains[k]
|
|
if grain == nil || !ok {
|
|
// Capacity check
|
|
if len(p.grains) >= p.PoolSize && len(p.expiry) > 0 {
|
|
if p.expiry[0].Expires.Before(time.Now()) && p.expiry[0].Grain != nil {
|
|
oldId := p.expiry[0].Grain.GetId()
|
|
p.deleteGrain(oldId)
|
|
p.expiry = p.expiry[1:]
|
|
} else {
|
|
p.mu.Unlock()
|
|
return nil, fmt.Errorf("pool is full")
|
|
}
|
|
}
|
|
grain, err = p.spawn(id)
|
|
if err == nil {
|
|
p.storeGrain(id, grain)
|
|
}
|
|
}
|
|
p.mu.Unlock()
|
|
p.statsUpdate()
|
|
}
|
|
|
|
return grain, err
|
|
}
|
|
|
|
// Apply applies a mutation (legacy compatibility).
|
|
func (p *GrainLocalPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) {
|
|
grain, err := p.GetGrain(id)
|
|
if err != nil || grain == nil {
|
|
return nil, err
|
|
}
|
|
result, applyErr := grain.Apply(mutation, false)
|
|
// Sliding TTL: refresh expiry on successful non-replay mutation (Apply always non-replay here)
|
|
if applyErr == nil && result != nil {
|
|
p.RefreshExpiry(id)
|
|
}
|
|
return result, applyErr
|
|
}
|
|
|
|
// Get returns current state (legacy wrapper).
|
|
func (p *GrainLocalPool) Get(id CartId) (*CartGrain, error) {
|
|
return p.GetGrain(id)
|
|
}
|
|
|
|
// DebugGrainCount returns counts for debugging.
|
|
func (p *GrainLocalPool) DebugGrainCount() (authoritative int) {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
return len(p.grains)
|
|
}
|
|
|
|
// UnsafePointerToLegacyMap exposes the legacy map pointer (for transitional
|
|
// tests that still poke the field directly). DO NOT rely on this long-term.
|
|
func (p *GrainLocalPool) UnsafePointerToLegacyMap() uintptr {
|
|
// Legacy map removed; retained only to satisfy any transitional callers.
|
|
return 0
|
|
}
|
|
|
|
// OwnerHost implements the extended GrainPool interface for the standalone
|
|
// local pool. Since the local pool has no concept of multi-host ownership,
|
|
// it returns an empty string. Callers can treat empty as "local host".
|
|
func (p *GrainLocalPool) OwnerHost(id CartId) string {
|
|
return ""
|
|
}
|
|
|
|
// Hostname returns a blank string because GrainLocalPool does not track a node
|
|
// identity. (SyncedPool will return the real hostname.)
|
|
func (p *GrainLocalPool) Hostname() string {
|
|
return ""
|
|
}
|