package main import ( "fmt" "log" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) // grain-pool.go // // Migration Note: // This file has been migrated to use uint64 cart keys internally (derived // from the new CartID base62 representation). For backward compatibility, // a deprecated legacy map keyed by CartId is maintained so existing code // that directly indexes pool.grains with a CartId continues to compile // until the full refactor across SyncedPool / remoteIndex is completed. // // Authoritative storage: grains (map[uint64]*CartGrain) // Legacy compatibility: grainsLegacy (map[CartId]*CartGrain) - kept in sync. // // Once all external usages are updated to rely on helper accessors, // grainsLegacy can be removed. // // --------------------------------------------------------------------------- var ( poolGrains = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_grains_in_pool", Help: "The total number of grains in the pool", }) poolSize = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_pool_size", Help: "The total number of mutations", }) poolUsage = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_grain_pool_usage", Help: "The current usage of the grain pool", }) ) // GrainPool interface remains legacy-compatible. type GrainPool interface { Apply(id CartId, mutation interface{}) (*CartGrain, error) Get(id CartId) (*CartGrain, error) } // Ttl keeps expiry info type Ttl struct { Expires time.Time Grain *CartGrain } // GrainLocalPool now stores grains keyed by uint64 (CartKey). type GrainLocalPool struct { mu sync.RWMutex grains map[uint64]*CartGrain // authoritative only expiry []Ttl spawn func(id CartId) (*CartGrain, error) Ttl time.Duration PoolSize int } // NewGrainLocalPool constructs a new pool. func NewGrainLocalPool(size int, ttl time.Duration, spawn func(id CartId) (*CartGrain, error)) *GrainLocalPool { ret := &GrainLocalPool{ spawn: spawn, grains: make(map[uint64]*CartGrain), expiry: make([]Ttl, 0), Ttl: ttl, PoolSize: size, } cartPurge := time.NewTicker(time.Minute) go func() { for range cartPurge.C { ret.Purge() } }() return ret } // keyFromCartId derives the uint64 key from a legacy CartId deterministically. func keyFromCartId(id CartId) uint64 { return LegacyToCartKey(id) } // storeGrain indexes a grain in both maps. func (p *GrainLocalPool) storeGrain(id CartId, g *CartGrain) { k := keyFromCartId(id) p.grains[k] = g } // deleteGrain removes a grain from both maps. func (p *GrainLocalPool) deleteGrain(id CartId) { k := keyFromCartId(id) delete(p.grains, k) } // SetAvailable pre-populates placeholder entries (legacy signature). func (p *GrainLocalPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) { p.mu.Lock() defer p.mu.Unlock() for id := range availableWithLastChangeUnix { k := keyFromCartId(id) if _, ok := p.grains[k]; !ok { p.grains[k] = nil p.expiry = append(p.expiry, Ttl{ Expires: time.Now().Add(p.Ttl), Grain: nil, }) } } } // Purge removes expired grains. func (p *GrainLocalPool) Purge() { lastChangeTime := time.Now().Add(-p.Ttl) keepChanged := lastChangeTime.Unix() p.mu.Lock() defer p.mu.Unlock() for i := 0; i < len(p.expiry); i++ { item := p.expiry[i] if item.Grain == nil { continue } if item.Expires.Before(time.Now()) { if item.Grain.GetLastChange() > keepChanged { log.Printf("Expired item %s changed, keeping", item.Grain.GetId()) if i < len(p.expiry)-1 { p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) p.expiry = append(p.expiry, item) } else { // move last to end (noop) p.expiry = append(p.expiry[:i], item) } } else { log.Printf("Item %s expired", item.Grain.GetId()) p.deleteGrain(item.Grain.GetId()) if i < len(p.expiry)-1 { p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) } else { p.expiry = p.expiry[:i] } } } else { break } } } // GetGrains returns a legacy view of grains (copy) for compatibility. func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain { p.mu.RLock() defer p.mu.RUnlock() out := make(map[CartId]*CartGrain, len(p.grains)) for _, g := range p.grains { if g != nil { out[g.GetId()] = g } } return out } // statsUpdate updates Prometheus gauges asynchronously. func (p *GrainLocalPool) statsUpdate() { go func(size int) { l := float64(size) ps := float64(p.PoolSize) poolUsage.Set(l / ps) poolGrains.Set(l) poolSize.Set(ps) }(len(p.grains)) } // GetGrain retrieves or spawns a grain (legacy id signature). func (p *GrainLocalPool) GetGrain(id CartId) (*CartGrain, error) { grainLookups.Inc() k := keyFromCartId(id) p.mu.RLock() grain, ok := p.grains[k] p.mu.RUnlock() var err error if grain == nil || !ok { p.mu.Lock() // Re-check under write lock grain, ok = p.grains[k] if grain == nil || !ok { // Capacity check if len(p.grains) >= p.PoolSize && len(p.expiry) > 0 { if p.expiry[0].Expires.Before(time.Now()) && p.expiry[0].Grain != nil { oldId := p.expiry[0].Grain.GetId() p.deleteGrain(oldId) p.expiry = p.expiry[1:] } else { p.mu.Unlock() return nil, fmt.Errorf("pool is full") } } grain, err = p.spawn(id) if err == nil { p.storeGrain(id, grain) } } p.mu.Unlock() p.statsUpdate() } return grain, err } // Apply applies a mutation (legacy compatibility). func (p *GrainLocalPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { grain, err := p.GetGrain(id) if err != nil || grain == nil { return nil, err } return grain.Apply(mutation, false) } // Get returns current state (legacy wrapper). func (p *GrainLocalPool) Get(id CartId) (*CartGrain, error) { return p.GetGrain(id) } // DebugGrainCount returns counts for debugging. func (p *GrainLocalPool) DebugGrainCount() (authoritative int) { p.mu.RLock() defer p.mu.RUnlock() return len(p.grains) } // UnsafePointerToLegacyMap exposes the legacy map pointer (for transitional // tests that still poke the field directly). DO NOT rely on this long-term. func (p *GrainLocalPool) UnsafePointerToLegacyMap() uintptr { // Legacy map removed; retained only to satisfy any transitional callers. return 0 }