package main import ( "fmt" "log" "maps" "reflect" "sync" "time" "git.tornberg.me/go-cart-actor/pkg/actor" "git.tornberg.me/go-cart-actor/pkg/discovery" "git.tornberg.me/go-cart-actor/pkg/proxy" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "k8s.io/apimachinery/pkg/watch" ) // --------------------------------------------------------------------------- // Metrics shared by the cart pool implementation. // --------------------------------------------------------------------------- var ( poolGrains = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_grains_in_pool", Help: "The total number of grains in the local pool", }) poolSize = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_pool_size", Help: "Configured capacity of the cart pool", }) poolUsage = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_grain_pool_usage", Help: "Current utilisation of the cart pool", }) negotiationCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_remote_negotiation_total", Help: "The total number of remote host negotiations", }) connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_connected_remotes", Help: "Number of connected remote hosts", }) cartMutationsTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_mutations_total", Help: "Total number of cart state mutations applied", }) cartMutationFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_mutation_failures_total", Help: "Total number of failed cart state mutations", }) cartMutationLatencySeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "cart_mutation_latency_seconds", Help: "Latency of cart mutations in seconds", Buckets: prometheus.DefBuckets, }, []string{"mutation"}) ) // GrainPool is the interface exposed to HTTP handlers and other subsystems. // CartPool merges the responsibilities that previously belonged to // GrainLocalPool and SyncedPool. It provides local grain storage together // with cluster coordination, ownership negotiation and expiry signalling. type CartPool struct { // Local grain state ----------------------------------------------------- localMu sync.RWMutex grains map[uint64]*CartGrain spawn func(id CartId) (*CartGrain, error) ttl time.Duration poolSize int // Cluster coordination -------------------------------------------------- hostname string remoteMu sync.RWMutex remoteOwners map[uint64]*proxy.RemoteHost remoteHosts map[string]*proxy.RemoteHost //discardedHostHandler *DiscardedHostHandler // House-keeping --------------------------------------------------------- purgeTicker *time.Ticker } // NewCartPool constructs a unified pool. Discovery may be nil for standalone // deployments. func NewCartPool(size int, ttl time.Duration, hostname string, spawn func(id CartId) (*CartGrain, error), hostWatch discovery.Discovery) (*CartPool, error) { p := &CartPool{ grains: make(map[uint64]*CartGrain), spawn: spawn, ttl: ttl, poolSize: size, hostname: hostname, remoteOwners: make(map[uint64]*proxy.RemoteHost), remoteHosts: make(map[string]*proxy.RemoteHost), } p.purgeTicker = time.NewTicker(time.Minute) go func() { for range p.purgeTicker.C { p.purge() } }() if hostWatch != nil { go p.startDiscovery(hostWatch) } else { log.Printf("No discovery configured; expecting manual AddRemote or static host injection") } return p, nil } func (p *CartPool) purge() { purgeLimit := time.Now().Add(-p.ttl) purgedIds := make([]uint64, 0, len(p.grains)) p.localMu.Lock() for id, grain := range p.grains { if grain.GetLastAccess().Before(purgeLimit) { purgedIds = append(purgedIds, id) delete(p.grains, id) } } p.localMu.Unlock() p.forAllHosts(func(remote *proxy.RemoteHost) { remote.AnnounceExpiry(purgedIds) }) } // startDiscovery subscribes to cluster events and adds/removes hosts. func (p *CartPool) startDiscovery(discovery discovery.Discovery) { time.Sleep(3 * time.Second) // allow gRPC server startup log.Printf("Starting discovery watcher") ch, err := discovery.Watch() if err != nil { log.Printf("Discovery error: %v", err) return } for evt := range ch { if evt.Host == "" { continue } switch evt.Type { case watch.Deleted: if p.IsKnown(evt.Host) { p.RemoveHost(evt.Host) } default: if !p.IsKnown(evt.Host) { log.Printf("Discovered host %s", evt.Host) p.AddRemote(evt.Host) } } } } // --------------------------------------------------------------------------- // Local grain management // --------------------------------------------------------------------------- func (p *CartPool) statsUpdate() { p.localMu.RLock() size := len(p.grains) cap := p.poolSize p.localMu.RUnlock() poolGrains.Set(float64(size)) poolSize.Set(float64(cap)) if cap > 0 { poolUsage.Set(float64(size) / float64(cap)) } } // LocalUsage returns the number of resident grains and configured capacity. func (p *CartPool) LocalUsage() (int, int) { p.localMu.RLock() defer p.localMu.RUnlock() return len(p.grains), p.poolSize } // LocalCartIDs returns the currently owned cart ids (for control-plane RPCs). func (p *CartPool) GetLocalIds() []uint64 { p.localMu.RLock() defer p.localMu.RUnlock() ids := make([]uint64, 0, len(p.grains)) for _, g := range p.grains { if g == nil { continue } ids = append(ids, uint64(g.GetId())) } return ids } func (p *CartPool) HandleRemoteExpiry(host string, ids []uint64) error { p.remoteMu.Lock() defer p.remoteMu.Unlock() for _, id := range ids { delete(p.remoteOwners, id) } return nil } func (p *CartPool) HandleOwnershipChange(host string, ids []uint64) error { p.remoteMu.RLock() remoteHost, exists := p.remoteHosts[host] p.remoteMu.RUnlock() if !exists { createdHost, err := p.AddRemote(host) if err != nil { return err } remoteHost = createdHost } p.remoteMu.Lock() defer p.remoteMu.Unlock() p.localMu.Lock() defer p.localMu.Unlock() for _, id := range ids { log.Printf("Handling ownership change for cart %d to host %s", id, host) delete(p.grains, id) p.remoteOwners[id] = remoteHost } return nil } // SnapshotGrains returns a copy of the currently resident grains keyed by id. func (p *CartPool) SnapshotGrains() map[uint64]*CartGrain { p.localMu.RLock() defer p.localMu.RUnlock() out := maps.Clone(p.grains) return out } // func (p *CartPool) getLocalGrain(key uint64) (*CartGrain, error) { // grainLookups.Inc() // p.localMu.RLock() // grain, ok := p.grains[key] // p.localMu.RUnlock() // if grain != nil && ok { // return grain, nil // } // go p.statsUpdate() // return grain, nil // } // --------------------------------------------------------------------------- // Cluster ownership and coordination // --------------------------------------------------------------------------- func (p *CartPool) TakeOwnership(id uint64) { if p.grains[id] != nil { return } log.Printf("taking ownership of: %d", id) p.broadcastOwnership([]uint64{id}) } func (p *CartPool) AddRemote(host string) (*proxy.RemoteHost, error) { if host == "" || host == p.hostname || p.IsKnown(host) { return nil, fmt.Errorf("invalid host") } remote, err := proxy.NewRemoteHost(host) if err != nil { log.Printf("AddRemote: NewRemoteHostGRPC %s failed: %v", host, err) return nil, err } p.remoteMu.Lock() p.remoteHosts[host] = remote p.remoteMu.Unlock() connectedRemotes.Set(float64(p.RemoteCount())) log.Printf("Connected to remote host %s", host) go p.pingLoop(remote) go p.initializeRemote(remote) go p.SendNegotiation() return remote, nil } func (p *CartPool) initializeRemote(remote *proxy.RemoteHost) { remotesIds := remote.GetActorIds() p.remoteMu.Lock() for _, id := range remotesIds { p.localMu.Lock() delete(p.grains, id) p.localMu.Unlock() if _, exists := p.remoteOwners[id]; !exists { p.remoteOwners[id] = remote } } p.remoteMu.Unlock() } func (p *CartPool) RemoveHost(host string) { p.remoteMu.Lock() remote, exists := p.remoteHosts[host] if exists { go remote.Close() delete(p.remoteHosts, host) } for id, owner := range p.remoteOwners { if owner.Host == host { delete(p.remoteOwners, id) } } p.remoteMu.Unlock() if exists { remote.Close() } connectedRemotes.Set(float64(p.RemoteCount())) } func (p *CartPool) RemoteCount() int { p.remoteMu.RLock() defer p.remoteMu.RUnlock() return len(p.remoteHosts) } // RemoteHostNames returns a snapshot of connected remote host identifiers. func (p *CartPool) RemoteHostNames() []string { p.remoteMu.RLock() defer p.remoteMu.RUnlock() hosts := make([]string, 0, len(p.remoteHosts)) for host := range p.remoteHosts { hosts = append(hosts, host) } return hosts } func (p *CartPool) IsKnown(host string) bool { if host == p.hostname { return true } p.remoteMu.RLock() defer p.remoteMu.RUnlock() _, ok := p.remoteHosts[host] return ok } func (p *CartPool) pingLoop(remote *proxy.RemoteHost) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for range ticker.C { if !remote.Ping() { if !remote.IsHealthy() { log.Printf("Remote %s unhealthy, removing", remote.Host) p.Close() p.RemoveHost(remote.Host) return } continue } } } func (p *CartPool) IsHealthy() bool { p.remoteMu.RLock() defer p.remoteMu.RUnlock() for _, r := range p.remoteHosts { if !r.IsHealthy() { return false } } return true } func (p *CartPool) Negotiate(otherHosts []string) { for _, host := range otherHosts { if host != p.hostname { p.remoteMu.RLock() _, ok := p.remoteHosts[host] p.remoteMu.RUnlock() if !ok { go p.AddRemote(host) } } } } func (p *CartPool) SendNegotiation() { negotiationCount.Inc() p.remoteMu.RLock() hosts := make([]string, 0, len(p.remoteHosts)+1) hosts = append(hosts, p.hostname) remotes := make([]*proxy.RemoteHost, 0, len(p.remoteHosts)) for h, r := range p.remoteHosts { hosts = append(hosts, h) remotes = append(remotes, r) } p.remoteMu.RUnlock() p.forAllHosts(func(remote *proxy.RemoteHost) { knownByRemote, err := remote.Negotiate(hosts) if err != nil { log.Printf("Negotiate with %s failed: %v", remote.Host, err) return } for _, h := range knownByRemote { if !p.IsKnown(h) { go p.AddRemote(h) } } }) } func (p *CartPool) forAllHosts(fn func(*proxy.RemoteHost)) { p.remoteMu.RLock() rh := maps.Clone(p.remoteHosts) p.remoteMu.RUnlock() wg := sync.WaitGroup{} for _, host := range rh { wg.Go(func() { fn(host) }) } for name, host := range rh { if !host.IsHealthy() { host.Close() p.remoteMu.Lock() delete(p.remoteHosts, name) p.remoteMu.Unlock() } } } func (p *CartPool) broadcastOwnership(ids []uint64) { if len(ids) == 0 { return } p.forAllHosts(func(rh *proxy.RemoteHost) { rh.AnnounceOwnership(ids) }) } func (p *CartPool) getOrClaimGrain(id uint64) (*CartGrain, error) { p.localMu.RLock() grain, exists := p.grains[id] p.localMu.RUnlock() if exists && grain != nil { return grain, nil } grain, err := p.spawn(CartId(id)) if err != nil { return nil, err } p.localMu.Lock() p.grains[id] = grain p.localMu.Unlock() go p.broadcastOwnership([]uint64{id}) return grain, nil } // ErrNotOwner is returned when a cart belongs to another host. var ErrNotOwner = fmt.Errorf("not owner") // Apply applies a mutation to a grain. func (p *CartPool) Apply(id uint64, mutation any) (*CartGrain, error) { grain, err := p.getOrClaimGrain(id) if err != nil { return nil, err } start := time.Now() result, applyErr := grain.Apply(mutation, false) mutationType := "unknown" if mutation != nil { if t := reflect.TypeOf(mutation); t != nil { if t.Kind() == reflect.Pointer { t = t.Elem() } if t.Name() != "" { mutationType = t.Name() } } } cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds()) if applyErr == nil && result != nil { cartMutationsTotal.Inc() } else if applyErr != nil { cartMutationFailuresTotal.Inc() } return result, applyErr } // Get returns the current state of a grain. func (p *CartPool) Get(id uint64) (*CartGrain, error) { grain, err := p.getOrClaimGrain(id) if err != nil { return nil, err } return grain.GetCurrentState() } // OwnerHost reports the remote owner (if any) for the supplied cart id. func (p *CartPool) OwnerHost(id uint64) (actor.Host, bool) { p.remoteMu.RLock() defer p.remoteMu.RUnlock() owner, ok := p.remoteOwners[id] return owner, ok } // Hostname returns the local hostname (pod IP). func (p *CartPool) Hostname() string { return p.hostname } // Close notifies remotes that this host is shutting down. func (p *CartPool) Close() { p.remoteMu.Lock() defer p.remoteMu.Unlock() for _, r := range p.remoteHosts { go func(rh *proxy.RemoteHost) { rh.Close() }(r) } if p.purgeTicker != nil { p.purgeTicker.Stop() } }