Files
go-cart-actor/grain-pool.go
Mats Törnberg 0ba7410162
All checks were successful
Build and Publish / Metadata (push) Successful in 6s
Build and Publish / BuildAndDeployAmd64 (push) Successful in 46s
Build and Publish / BuildAndDeployArm64 (push) Successful in 4m8s
even more refactoring
2025-10-11 18:17:31 +02:00

705 lines
17 KiB
Go

package main
import (
"context"
"fmt"
"log"
"net/http"
"reflect"
"sync"
"time"
messages "git.tornberg.me/go-cart-actor/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"k8s.io/apimachinery/pkg/watch"
)
// ---------------------------------------------------------------------------
// Metrics shared by the cart pool implementation.
// ---------------------------------------------------------------------------
var (
poolGrains = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_grains_in_pool",
Help: "The total number of grains in the local pool",
})
poolSize = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_pool_size",
Help: "Configured capacity of the cart pool",
})
poolUsage = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_grain_pool_usage",
Help: "Current utilisation of the cart pool",
})
negotiationCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_remote_negotiation_total",
Help: "The total number of remote host negotiations",
})
connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_connected_remotes",
Help: "Number of connected remote hosts",
})
cartMutationsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_mutations_total",
Help: "Total number of cart state mutations applied",
})
cartMutationFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_mutation_failures_total",
Help: "Total number of failed cart state mutations",
})
cartMutationLatencySeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "cart_mutation_latency_seconds",
Help: "Latency of cart mutations in seconds",
Buckets: prometheus.DefBuckets,
}, []string{"mutation"})
cartActiveGrains = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_active_grains",
Help: "Number of active (resident) local grains",
})
)
// GrainPool is the interface exposed to HTTP handlers and other subsystems.
type GrainPool interface {
Apply(id CartId, mutation interface{}) (*CartGrain, error)
Get(id CartId) (*CartGrain, error)
OwnerHost(id CartId) (Host, bool)
Hostname() string
TakeOwnership(id CartId)
IsHealthy() bool
Close()
}
// Host abstracts a remote node capable of proxying cart requests.
type Host interface {
Name() string
Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error)
}
// Ttl tracks the expiry deadline for an in-memory grain.
type Ttl struct {
Expires time.Time
Grain *CartGrain
}
// CartPool merges the responsibilities that previously belonged to
// GrainLocalPool and SyncedPool. It provides local grain storage together
// with cluster coordination, ownership negotiation and expiry signalling.
type CartPool struct {
// Local grain state -----------------------------------------------------
localMu sync.RWMutex
grains map[uint64]*CartGrain
expiry []Ttl
spawn func(id CartId) (*CartGrain, error)
ttl time.Duration
poolSize int
// Cluster coordination --------------------------------------------------
hostname string
remoteMu sync.RWMutex
remoteOwners map[CartId]*RemoteHostGRPC
remoteHosts map[string]*RemoteHostGRPC
//discardedHostHandler *DiscardedHostHandler
// House-keeping ---------------------------------------------------------
purgeTicker *time.Ticker
}
// NewCartPool constructs a unified pool. Discovery may be nil for standalone
// deployments.
func NewCartPool(size int, ttl time.Duration, hostname string, spawn func(id CartId) (*CartGrain, error), discovery Discovery) (*CartPool, error) {
p := &CartPool{
grains: make(map[uint64]*CartGrain),
expiry: make([]Ttl, 0),
spawn: spawn,
ttl: ttl,
poolSize: size,
hostname: hostname,
remoteOwners: make(map[CartId]*RemoteHostGRPC),
remoteHosts: make(map[string]*RemoteHostGRPC),
}
// p.discardedHostHandler = NewDiscardedHostHandler(1338)
// p.discardedHostHandler.SetReconnectHandler(p.AddRemote)
p.purgeTicker = time.NewTicker(time.Minute)
go func() {
for range p.purgeTicker.C {
p.Purge()
}
}()
if discovery != nil {
go p.startDiscovery(discovery)
} else {
log.Printf("No discovery configured; expecting manual AddRemote or static host injection")
}
return p, nil
}
// startDiscovery subscribes to cluster events and adds/removes hosts.
func (p *CartPool) startDiscovery(discovery Discovery) {
time.Sleep(3 * time.Second) // allow gRPC server startup
log.Printf("Starting discovery watcher")
ch, err := discovery.Watch()
if err != nil {
log.Printf("Discovery error: %v", err)
return
}
for evt := range ch {
if evt.Host == "" {
continue
}
switch evt.Type {
case watch.Deleted:
if p.IsKnown(evt.Host) {
p.RemoveHost(evt.Host)
}
default:
if !p.IsKnown(evt.Host) {
log.Printf("Discovered host %s", evt.Host)
p.AddRemote(evt.Host)
}
}
}
}
// ---------------------------------------------------------------------------
// Local grain management
// ---------------------------------------------------------------------------
func (p *CartPool) statsUpdate() {
p.localMu.RLock()
size := len(p.grains)
cap := p.poolSize
p.localMu.RUnlock()
poolGrains.Set(float64(size))
poolSize.Set(float64(cap))
if cap > 0 {
poolUsage.Set(float64(size) / float64(cap))
}
}
// LocalUsage returns the number of resident grains and configured capacity.
func (p *CartPool) LocalUsage() (int, int) {
p.localMu.RLock()
defer p.localMu.RUnlock()
return len(p.grains), p.poolSize
}
// SetAvailable pre-populates placeholder entries.
func (p *CartPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) {
p.localMu.Lock()
defer p.localMu.Unlock()
for id := range availableWithLastChangeUnix {
k := uint64(id)
if _, ok := p.grains[k]; !ok {
p.grains[k] = nil
p.expiry = append(p.expiry, Ttl{Expires: time.Now().Add(p.ttl)})
}
}
p.statsUpdate()
}
// Purge removes expired grains and broadcasts expiry announcements so that
// other hosts drop stale ownership hints.
func (p *CartPool) Purge() {
now := time.Now()
keepChanged := now.Add(-p.ttl).Unix()
var expired []CartId
p.localMu.Lock()
for i := 0; i < len(p.expiry); {
entry := p.expiry[i]
if entry.Grain == nil {
i++
continue
}
if entry.Expires.After(now) {
break
}
if entry.Grain.GetLastChange() > keepChanged {
// Recently mutated: move to back.
p.expiry = append(p.expiry[:i], p.expiry[i+1:]...)
p.expiry = append(p.expiry, entry)
continue
}
id := entry.Grain.GetId()
delete(p.grains, uint64(id))
expired = append(expired, id)
if i < len(p.expiry)-1 {
p.expiry = append(p.expiry[:i], p.expiry[i+1:]...)
} else {
p.expiry = p.expiry[:i]
}
}
p.localMu.Unlock()
if len(expired) > 0 {
p.statsUpdate()
go p.broadcastExpiry(expired)
}
}
// RefreshExpiry updates the TTL entry for a given grain.
func (p *CartPool) RefreshExpiry(id CartId) {
p.localMu.Lock()
defer p.localMu.Unlock()
for i := range p.expiry {
g := p.expiry[i].Grain
if g != nil && g.Id == id {
p.expiry[i].Expires = time.Now().Add(p.ttl)
return
}
}
// If no entry existed, append one (safeguard for newly spawned grains).
p.expiry = append(p.expiry, Ttl{Expires: time.Now().Add(p.ttl), Grain: p.grains[uint64(id)]})
}
// DebugGrainCount returns the number of locally resident grains.
func (p *CartPool) DebugGrainCount() int {
p.localMu.RLock()
defer p.localMu.RUnlock()
return len(p.grains)
}
// LocalCartIDs returns the currently owned cart ids (for control-plane RPCs).
func (p *CartPool) LocalCartIDs() []uint64 {
p.localMu.RLock()
defer p.localMu.RUnlock()
ids := make([]uint64, 0, len(p.grains))
for _, g := range p.grains {
if g == nil {
continue
}
ids = append(ids, uint64(g.GetId()))
}
return ids
}
// SnapshotGrains returns a copy of the currently resident grains keyed by id.
func (p *CartPool) SnapshotGrains() map[CartId]*CartGrain {
p.localMu.RLock()
defer p.localMu.RUnlock()
out := make(map[CartId]*CartGrain, len(p.grains))
for _, g := range p.grains {
if g != nil {
out[g.GetId()] = g
}
}
return out
}
func (p *CartPool) removeLocalGrain(id CartId) {
p.localMu.Lock()
delete(p.grains, uint64(id))
for i := range p.expiry {
if p.expiry[i].Grain != nil && p.expiry[i].Grain.GetId() == id {
p.expiry = append(p.expiry[:i], p.expiry[i+1:]...)
break
}
}
p.localMu.Unlock()
p.statsUpdate()
}
func (p *CartPool) getLocalGrain(id CartId) (*CartGrain, error) {
key := uint64(id)
grainLookups.Inc()
p.localMu.RLock()
grain, ok := p.grains[key]
p.localMu.RUnlock()
if grain != nil && ok {
return grain, nil
}
p.localMu.Lock()
defer p.localMu.Unlock()
grain, ok = p.grains[key]
if grain == nil || !ok {
if len(p.grains) >= p.poolSize && len(p.expiry) > 0 {
if p.expiry[0].Expires.Before(time.Now()) && p.expiry[0].Grain != nil {
oldID := p.expiry[0].Grain.GetId()
delete(p.grains, uint64(oldID))
p.expiry = p.expiry[1:]
go p.broadcastExpiry([]CartId{oldID})
} else {
return nil, fmt.Errorf("pool is full")
}
}
spawned, err := p.spawn(id)
if err != nil {
return nil, err
}
p.grains[key] = spawned
p.expiry = append(p.expiry, Ttl{Expires: time.Now().Add(p.ttl), Grain: spawned})
grain = spawned
}
go p.statsUpdate()
return grain, nil
}
// ---------------------------------------------------------------------------
// Cluster ownership and coordination
// ---------------------------------------------------------------------------
func (p *CartPool) TakeOwnership(id CartId) {
p.broadcastOwnership([]CartId{id})
}
func (p *CartPool) AddRemote(host string) (*RemoteHostGRPC, error) {
if host == "" || host == p.hostname {
return nil, fmt.Errorf("invalid host")
}
p.remoteMu.Lock()
if _, exists := p.remoteHosts[host]; exists {
p.remoteMu.Unlock()
return nil, fmt.Errorf("host already exists")
}
p.remoteMu.Unlock()
remote, err := NewRemoteHostGRPC(host)
if err != nil {
log.Printf("AddRemote: NewRemoteHostGRPC %s failed: %v", host, err)
return nil, err
}
p.remoteMu.Lock()
p.remoteHosts[host] = remote
p.remoteMu.Unlock()
connectedRemotes.Set(float64(p.RemoteCount()))
log.Printf("Connected to remote host %s", host)
go p.pingLoop(remote)
go p.initializeRemote(remote)
go p.Negotiate()
return remote, nil
}
func (p *CartPool) initializeRemote(remote *RemoteHostGRPC) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
reply, err := remote.ControlClient.GetCartIds(ctx, &messages.Empty{})
if err != nil {
log.Printf("Init remote %s: GetCartIds error: %v", remote.Host, err)
return
}
count := 0
p.remoteMu.Lock()
for _, cid := range reply.CartIds {
id := CartId(cid)
if _, exists := p.remoteOwners[id]; !exists {
p.remoteOwners[id] = remote
}
count++
}
p.remoteMu.Unlock()
log.Printf("Remote %s reported %d remote-owned carts", remote.Host, count)
}
func (p *CartPool) RemoveHost(host string) {
p.remoteMu.Lock()
remote, exists := p.remoteHosts[host]
if exists {
go remote.Close()
delete(p.remoteHosts, host)
}
for id, owner := range p.remoteOwners {
if owner.Host == host {
delete(p.remoteOwners, id)
}
}
p.remoteMu.Unlock()
if exists {
remote.Conn.Close()
}
connectedRemotes.Set(float64(p.RemoteCount()))
}
func (p *CartPool) RemoteCount() int {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
return len(p.remoteHosts)
}
// RemoteHostNames returns a snapshot of connected remote host identifiers.
func (p *CartPool) RemoteHostNames() []string {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
hosts := make([]string, 0, len(p.remoteHosts))
for host := range p.remoteHosts {
hosts = append(hosts, host)
}
return hosts
}
func (p *CartPool) IsKnown(host string) bool {
if host == p.hostname {
return true
}
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
_, ok := p.remoteHosts[host]
return ok
}
func (p *CartPool) pingLoop(remote *RemoteHostGRPC) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if !remote.Ping() {
if !remote.IsHealthy() {
log.Printf("Remote %s unhealthy, removing", remote.Host)
p.Close()
p.RemoveHost(remote.Host)
return
}
continue
}
}
}
func (p *CartPool) IsHealthy() bool {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
for _, r := range p.remoteHosts {
if !r.IsHealthy() {
return false
}
}
return true
}
func (p *CartPool) Negotiate() {
negotiationCount.Inc()
p.remoteMu.RLock()
hosts := make([]string, 0, len(p.remoteHosts)+1)
hosts = append(hosts, p.hostname)
remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
for h, r := range p.remoteHosts {
hosts = append(hosts, h)
remotes = append(remotes, r)
}
p.remoteMu.RUnlock()
for _, r := range remotes {
knownByRemote, err := r.Negotiate(hosts)
if err != nil {
log.Printf("Negotiate with %s failed: %v", r.Host, err)
continue
}
for _, h := range knownByRemote {
if !p.IsKnown(h) {
p.AddRemote(h)
}
}
}
}
func (p *CartPool) broadcastOwnership(ids []CartId) {
if len(ids) == 0 {
return
}
uids := make([]uint64, len(ids))
for i, id := range ids {
uids[i] = uint64(id)
}
p.remoteMu.RLock()
remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
for _, r := range p.remoteHosts {
if r.IsHealthy() {
remotes = append(remotes, r)
} else {
log.Printf("Skipping announce to unhealthy remote %s", r.Host)
p.RemoveHost(r.Host)
}
}
p.remoteMu.RUnlock()
for _, remote := range remotes {
go func(rh *RemoteHostGRPC) {
rh.AnnounceOwnership(uids)
}(remote)
}
}
func (p *CartPool) broadcastExpiry(ids []CartId) {
if len(ids) == 0 {
return
}
uids := make([]uint64, len(ids))
for i, id := range ids {
uids[i] = uint64(id)
}
p.remoteMu.RLock()
remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
for _, r := range p.remoteHosts {
if r.IsHealthy() {
remotes = append(remotes, r)
}
}
p.remoteMu.RUnlock()
for _, remote := range remotes {
go func(rh *RemoteHostGRPC) {
rh.AnnounceExpiry(uids)
}(remote)
}
}
func (p *CartPool) AdoptRemoteOwnership(host string, ids []string) {
if host == "" || host == p.hostname {
return
}
remoteHost, ok := p.remoteHosts[host]
if !ok {
log.Printf("AdoptRemoteOwnership: unknown host %s", host)
createdHost, err := p.AddRemote(host)
if err != nil {
log.Printf("AdoptRemoteOwnership: failed to add remote %s: %v", host, err)
return
}
remoteHost = createdHost
}
p.remoteMu.Lock()
defer p.remoteMu.Unlock()
for _, s := range ids {
if s == "" {
continue
}
parsed, ok := ParseCartId(s)
if !ok {
continue
}
if existing, ok := p.remoteOwners[parsed]; ok && existing != remoteHost {
continue
}
p.localMu.RLock()
_, localHas := p.grains[uint64(parsed)]
p.localMu.RUnlock()
if localHas {
continue
}
p.remoteOwners[parsed] = remoteHost
}
}
func (p *CartPool) HandleRemoteExpiry(host string, ids []uint64) {
if host == "" || host == p.hostname {
return
}
p.remoteMu.Lock()
defer p.remoteMu.Unlock()
for _, raw := range ids {
id := CartId(raw)
if owner, ok := p.remoteOwners[id]; ok && owner.Host == host {
delete(p.remoteOwners, id)
}
}
}
func (p *CartPool) getOrClaimGrain(id CartId) (*CartGrain, error) {
p.localMu.RLock()
grain, exists := p.grains[uint64(id)]
p.localMu.RUnlock()
if exists && grain != nil {
return grain, nil
}
p.remoteMu.RLock()
remoteHost, found := p.remoteOwners[id]
p.remoteMu.RUnlock()
if found && remoteHost != nil && remoteHost.Host != "" {
return nil, ErrNotOwner
}
grain, err := p.getLocalGrain(id)
if err != nil {
return nil, err
}
go p.broadcastOwnership([]CartId{id})
return grain, nil
}
// ErrNotOwner is returned when a cart belongs to another host.
var ErrNotOwner = fmt.Errorf("not owner")
// Apply applies a mutation to a grain.
func (p *CartPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) {
grain, err := p.getOrClaimGrain(id)
if err != nil {
return nil, err
}
start := time.Now()
result, applyErr := grain.Apply(mutation, false)
mutationType := "unknown"
if mutation != nil {
if t := reflect.TypeOf(mutation); t != nil {
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
if t.Name() != "" {
mutationType = t.Name()
}
}
}
cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds())
if applyErr == nil && result != nil {
cartMutationsTotal.Inc()
p.RefreshExpiry(id)
cartActiveGrains.Set(float64(p.DebugGrainCount()))
} else if applyErr != nil {
cartMutationFailuresTotal.Inc()
}
return result, applyErr
}
// Get returns the current state of a grain.
func (p *CartPool) Get(id CartId) (*CartGrain, error) {
grain, err := p.getOrClaimGrain(id)
if err != nil {
return nil, err
}
return grain.GetCurrentState()
}
// OwnerHost reports the remote owner (if any) for the supplied cart id.
func (p *CartPool) OwnerHost(id CartId) (Host, bool) {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
owner, ok := p.remoteOwners[id]
return owner, ok
}
// Hostname returns the local hostname (pod IP).
func (p *CartPool) Hostname() string {
return p.hostname
}
// Close notifies remotes that this host is shutting down.
func (p *CartPool) Close() {
p.remoteMu.Lock()
defer p.remoteMu.Unlock()
for _, r := range p.remoteHosts {
go func(rh *RemoteHostGRPC) {
_, err := rh.ControlClient.Closing(context.Background(), &messages.ClosingNotice{Host: p.hostname})
if err != nil {
log.Printf("Close notify to %s failed: %v", rh.Host, err)
}
}(r)
}
if p.purgeTicker != nil {
p.purgeTicker.Stop()
}
}