more stuff
All checks were successful
Build and Publish / Metadata (push) Successful in 4s
Build and Publish / BuildAndDeployAmd64 (push) Successful in 49s
Build and Publish / BuildAndDeployArm64 (push) Successful in 3m50s

This commit is contained in:
matst80
2025-10-12 21:36:00 +02:00
parent 0ba7410162
commit b8266d80f9
31 changed files with 578 additions and 778 deletions

539
cart-grain-pool.go Normal file
View File

@@ -0,0 +1,539 @@
package main
import (
"fmt"
"log"
"maps"
"reflect"
"sync"
"time"
"git.tornberg.me/go-cart-actor/pkg/actor"
"git.tornberg.me/go-cart-actor/pkg/discovery"
"git.tornberg.me/go-cart-actor/pkg/proxy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"k8s.io/apimachinery/pkg/watch"
)
// ---------------------------------------------------------------------------
// Metrics shared by the cart pool implementation.
// ---------------------------------------------------------------------------
var (
poolGrains = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_grains_in_pool",
Help: "The total number of grains in the local pool",
})
poolSize = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_pool_size",
Help: "Configured capacity of the cart pool",
})
poolUsage = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_grain_pool_usage",
Help: "Current utilisation of the cart pool",
})
negotiationCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_remote_negotiation_total",
Help: "The total number of remote host negotiations",
})
connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_connected_remotes",
Help: "Number of connected remote hosts",
})
cartMutationsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_mutations_total",
Help: "Total number of cart state mutations applied",
})
cartMutationFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_mutation_failures_total",
Help: "Total number of failed cart state mutations",
})
cartMutationLatencySeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "cart_mutation_latency_seconds",
Help: "Latency of cart mutations in seconds",
Buckets: prometheus.DefBuckets,
}, []string{"mutation"})
)
// GrainPool is the interface exposed to HTTP handlers and other subsystems.
// CartPool merges the responsibilities that previously belonged to
// GrainLocalPool and SyncedPool. It provides local grain storage together
// with cluster coordination, ownership negotiation and expiry signalling.
type CartPool struct {
// Local grain state -----------------------------------------------------
localMu sync.RWMutex
grains map[uint64]*CartGrain
spawn func(id CartId) (*CartGrain, error)
ttl time.Duration
poolSize int
// Cluster coordination --------------------------------------------------
hostname string
remoteMu sync.RWMutex
remoteOwners map[uint64]*proxy.RemoteHost
remoteHosts map[string]*proxy.RemoteHost
//discardedHostHandler *DiscardedHostHandler
// House-keeping ---------------------------------------------------------
purgeTicker *time.Ticker
}
// NewCartPool constructs a unified pool. Discovery may be nil for standalone
// deployments.
func NewCartPool(size int, ttl time.Duration, hostname string, spawn func(id CartId) (*CartGrain, error), hostWatch discovery.Discovery) (*CartPool, error) {
p := &CartPool{
grains: make(map[uint64]*CartGrain),
spawn: spawn,
ttl: ttl,
poolSize: size,
hostname: hostname,
remoteOwners: make(map[uint64]*proxy.RemoteHost),
remoteHosts: make(map[string]*proxy.RemoteHost),
}
p.purgeTicker = time.NewTicker(time.Minute)
go func() {
for range p.purgeTicker.C {
p.purge()
}
}()
if hostWatch != nil {
go p.startDiscovery(hostWatch)
} else {
log.Printf("No discovery configured; expecting manual AddRemote or static host injection")
}
return p, nil
}
func (p *CartPool) purge() {
purgeLimit := time.Now().Add(-p.ttl)
purgedIds := make([]uint64, 0, len(p.grains))
p.localMu.Lock()
for id, grain := range p.grains {
if grain.GetLastAccess().Before(purgeLimit) {
purgedIds = append(purgedIds, id)
delete(p.grains, id)
}
}
p.localMu.Unlock()
p.forAllHosts(func(remote *proxy.RemoteHost) {
remote.AnnounceExpiry(purgedIds)
})
}
// startDiscovery subscribes to cluster events and adds/removes hosts.
func (p *CartPool) startDiscovery(discovery discovery.Discovery) {
time.Sleep(3 * time.Second) // allow gRPC server startup
log.Printf("Starting discovery watcher")
ch, err := discovery.Watch()
if err != nil {
log.Printf("Discovery error: %v", err)
return
}
for evt := range ch {
if evt.Host == "" {
continue
}
switch evt.Type {
case watch.Deleted:
if p.IsKnown(evt.Host) {
p.RemoveHost(evt.Host)
}
default:
if !p.IsKnown(evt.Host) {
log.Printf("Discovered host %s", evt.Host)
p.AddRemote(evt.Host)
}
}
}
}
// ---------------------------------------------------------------------------
// Local grain management
// ---------------------------------------------------------------------------
func (p *CartPool) statsUpdate() {
p.localMu.RLock()
size := len(p.grains)
cap := p.poolSize
p.localMu.RUnlock()
poolGrains.Set(float64(size))
poolSize.Set(float64(cap))
if cap > 0 {
poolUsage.Set(float64(size) / float64(cap))
}
}
// LocalUsage returns the number of resident grains and configured capacity.
func (p *CartPool) LocalUsage() (int, int) {
p.localMu.RLock()
defer p.localMu.RUnlock()
return len(p.grains), p.poolSize
}
// LocalCartIDs returns the currently owned cart ids (for control-plane RPCs).
func (p *CartPool) GetLocalIds() []uint64 {
p.localMu.RLock()
defer p.localMu.RUnlock()
ids := make([]uint64, 0, len(p.grains))
for _, g := range p.grains {
if g == nil {
continue
}
ids = append(ids, uint64(g.GetId()))
}
return ids
}
func (p *CartPool) HandleRemoteExpiry(host string, ids []uint64) error {
p.remoteMu.Lock()
defer p.remoteMu.Unlock()
for _, id := range ids {
delete(p.remoteOwners, id)
}
return nil
}
func (p *CartPool) HandleOwnershipChange(host string, ids []uint64) error {
p.remoteMu.RLock()
remoteHost, exists := p.remoteHosts[host]
p.remoteMu.RUnlock()
if !exists {
createdHost, err := p.AddRemote(host)
if err != nil {
return err
}
remoteHost = createdHost
}
p.remoteMu.Lock()
defer p.remoteMu.Unlock()
p.localMu.Lock()
defer p.localMu.Unlock()
for _, id := range ids {
delete(p.grains, id)
p.remoteOwners[id] = remoteHost
}
return nil
}
// SnapshotGrains returns a copy of the currently resident grains keyed by id.
func (p *CartPool) SnapshotGrains() map[CartId]*CartGrain {
p.localMu.RLock()
defer p.localMu.RUnlock()
out := make(map[CartId]*CartGrain, len(p.grains))
for _, g := range p.grains {
if g != nil {
out[g.GetId()] = g
}
}
return out
}
// func (p *CartPool) getLocalGrain(key uint64) (*CartGrain, error) {
// grainLookups.Inc()
// p.localMu.RLock()
// grain, ok := p.grains[key]
// p.localMu.RUnlock()
// if grain != nil && ok {
// return grain, nil
// }
// go p.statsUpdate()
// return grain, nil
// }
// ---------------------------------------------------------------------------
// Cluster ownership and coordination
// ---------------------------------------------------------------------------
func (p *CartPool) TakeOwnership(id uint64) {
p.broadcastOwnership([]uint64{id})
}
func (p *CartPool) AddRemote(host string) (*proxy.RemoteHost, error) {
if host == "" || host == p.hostname || p.IsKnown(host) {
return nil, fmt.Errorf("invalid host")
}
remote, err := proxy.NewRemoteHost(host)
if err != nil {
log.Printf("AddRemote: NewRemoteHostGRPC %s failed: %v", host, err)
return nil, err
}
p.remoteMu.Lock()
p.remoteHosts[host] = remote
p.remoteMu.Unlock()
connectedRemotes.Set(float64(p.RemoteCount()))
log.Printf("Connected to remote host %s", host)
go p.pingLoop(remote)
go p.initializeRemote(remote)
go p.SendNegotiation()
return remote, nil
}
func (p *CartPool) initializeRemote(remote *proxy.RemoteHost) {
remotesIds := remote.GetActorIds()
p.remoteMu.Lock()
for _, id := range remotesIds {
p.localMu.Lock()
delete(p.grains, id)
p.localMu.Unlock()
if _, exists := p.remoteOwners[id]; !exists {
p.remoteOwners[id] = remote
}
}
p.remoteMu.Unlock()
}
func (p *CartPool) RemoveHost(host string) {
p.remoteMu.Lock()
remote, exists := p.remoteHosts[host]
if exists {
go remote.Close()
delete(p.remoteHosts, host)
}
for id, owner := range p.remoteOwners {
if owner.Host == host {
delete(p.remoteOwners, id)
}
}
p.remoteMu.Unlock()
if exists {
remote.Close()
}
connectedRemotes.Set(float64(p.RemoteCount()))
}
func (p *CartPool) RemoteCount() int {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
return len(p.remoteHosts)
}
// RemoteHostNames returns a snapshot of connected remote host identifiers.
func (p *CartPool) RemoteHostNames() []string {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
hosts := make([]string, 0, len(p.remoteHosts))
for host := range p.remoteHosts {
hosts = append(hosts, host)
}
return hosts
}
func (p *CartPool) IsKnown(host string) bool {
if host == p.hostname {
return true
}
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
_, ok := p.remoteHosts[host]
return ok
}
func (p *CartPool) pingLoop(remote *proxy.RemoteHost) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if !remote.Ping() {
if !remote.IsHealthy() {
log.Printf("Remote %s unhealthy, removing", remote.Host)
p.Close()
p.RemoveHost(remote.Host)
return
}
continue
}
}
}
func (p *CartPool) IsHealthy() bool {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
for _, r := range p.remoteHosts {
if !r.IsHealthy() {
return false
}
}
return true
}
func (p *CartPool) Negotiate(otherHosts []string) {
for _, host := range otherHosts {
if host != p.hostname {
p.remoteMu.RLock()
_, ok := p.remoteHosts[host]
p.remoteMu.RUnlock()
if !ok {
go p.AddRemote(host)
}
}
}
}
func (p *CartPool) SendNegotiation() {
negotiationCount.Inc()
p.remoteMu.RLock()
hosts := make([]string, 0, len(p.remoteHosts)+1)
hosts = append(hosts, p.hostname)
remotes := make([]*proxy.RemoteHost, 0, len(p.remoteHosts))
for h, r := range p.remoteHosts {
hosts = append(hosts, h)
remotes = append(remotes, r)
}
p.remoteMu.RUnlock()
for _, r := range remotes {
knownByRemote, err := r.Negotiate(hosts)
if err != nil {
log.Printf("Negotiate with %s failed: %v", r.Host, err)
continue
}
for _, h := range knownByRemote {
if !p.IsKnown(h) {
p.AddRemote(h)
}
}
}
}
func (p *CartPool) forAllHosts(fn func(*proxy.RemoteHost)) {
p.remoteMu.RLock()
rh := maps.Clone(p.remoteHosts)
p.remoteMu.RUnlock()
wg := sync.WaitGroup{}
for _, host := range rh {
wg.Go(func() { fn(host) })
}
for name, host := range rh {
if !host.IsHealthy() {
host.Close()
p.remoteMu.Lock()
delete(p.remoteHosts, name)
p.remoteMu.Unlock()
}
}
}
func (p *CartPool) broadcastOwnership(ids []uint64) {
if len(ids) == 0 {
return
}
p.forAllHosts(func(rh *proxy.RemoteHost) {
rh.AnnounceOwnership(ids)
})
}
func (p *CartPool) getOrClaimGrain(id uint64) (*CartGrain, error) {
p.localMu.RLock()
grain, exists := p.grains[id]
p.localMu.RUnlock()
if exists && grain != nil {
return grain, nil
}
grain, err := p.spawn(CartId(id))
if err != nil {
return nil, err
}
go p.broadcastOwnership([]uint64{id})
return grain, nil
}
// ErrNotOwner is returned when a cart belongs to another host.
var ErrNotOwner = fmt.Errorf("not owner")
// Apply applies a mutation to a grain.
func (p *CartPool) Apply(id uint64, mutation any) (*CartGrain, error) {
grain, err := p.getOrClaimGrain(id)
if err != nil {
return nil, err
}
start := time.Now()
result, applyErr := grain.Apply(mutation, false)
mutationType := "unknown"
if mutation != nil {
if t := reflect.TypeOf(mutation); t != nil {
if t.Kind() == reflect.Pointer {
t = t.Elem()
}
if t.Name() != "" {
mutationType = t.Name()
}
}
}
cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds())
if applyErr == nil && result != nil {
cartMutationsTotal.Inc()
//p.RefreshExpiry(id)
//cartActiveGrains.Set(float64(len(p.grains)))
} else if applyErr != nil {
cartMutationFailuresTotal.Inc()
}
return result, applyErr
}
// Get returns the current state of a grain.
func (p *CartPool) Get(id uint64) (*CartGrain, error) {
grain, err := p.getOrClaimGrain(id)
if err != nil {
return nil, err
}
return grain.GetCurrentState()
}
// OwnerHost reports the remote owner (if any) for the supplied cart id.
func (p *CartPool) OwnerHost(id uint64) (actor.Host, bool) {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
owner, ok := p.remoteOwners[id]
return owner, ok
}
// Hostname returns the local hostname (pod IP).
func (p *CartPool) Hostname() string {
return p.hostname
}
// Close notifies remotes that this host is shutting down.
func (p *CartPool) Close() {
p.remoteMu.Lock()
defer p.remoteMu.Unlock()
for _, r := range p.remoteHosts {
go func(rh *proxy.RemoteHost) {
rh.Close()
}(r)
}
if p.purgeTicker != nil {
p.purgeTicker.Stop()
}
}