package main import ( "bytes" "context" "fmt" "io" "log" "net/http" "reflect" "sync" "time" messages "git.tornberg.me/go-cart-actor/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "k8s.io/apimachinery/pkg/watch" ) // --------------------------------------------------------------------------- // Metrics shared by the cart pool implementation. // --------------------------------------------------------------------------- var ( poolGrains = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_grains_in_pool", Help: "The total number of grains in the local pool", }) poolSize = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_pool_size", Help: "Configured capacity of the cart pool", }) poolUsage = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_grain_pool_usage", Help: "Current utilisation of the cart pool", }) negotiationCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_remote_negotiation_total", Help: "The total number of remote host negotiations", }) connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_connected_remotes", Help: "Number of connected remote hosts", }) cartMutationsTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_mutations_total", Help: "Total number of cart state mutations applied", }) cartMutationFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_mutation_failures_total", Help: "Total number of failed cart state mutations", }) cartMutationLatencySeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "cart_mutation_latency_seconds", Help: "Latency of cart mutations in seconds", Buckets: prometheus.DefBuckets, }, []string{"mutation"}) cartActiveGrains = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_active_grains", Help: "Number of active (resident) local grains", }) ) // GrainPool is the interface exposed to HTTP handlers and other subsystems. type GrainPool interface { Apply(id CartId, mutation interface{}) (*CartGrain, error) Get(id CartId) (*CartGrain, error) OwnerHost(id CartId) (Host, bool) Hostname() string TakeOwnership(id CartId) IsHealthy() bool Close() } // Host abstracts a remote node capable of proxying cart requests. type Host interface { Name() string Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error) } // Ttl tracks the expiry deadline for an in-memory grain. type Ttl struct { Expires time.Time Grain *CartGrain } // CartPool merges the responsibilities that previously belonged to // GrainLocalPool and SyncedPool. It provides local grain storage together // with cluster coordination, ownership negotiation and expiry signalling. type CartPool struct { // Local grain state ----------------------------------------------------- localMu sync.RWMutex grains map[uint64]*CartGrain expiry []Ttl spawn func(id CartId) (*CartGrain, error) ttl time.Duration poolSize int // Cluster coordination -------------------------------------------------- hostname string remoteMu sync.RWMutex remoteOwners map[CartId]*RemoteHostGRPC remoteHosts map[string]*RemoteHostGRPC discardedHostHandler *DiscardedHostHandler // House-keeping --------------------------------------------------------- purgeTicker *time.Ticker } // RemoteHostGRPC mirrors the lightweight controller used for remote node // interaction. type RemoteHostGRPC struct { Host string HTTPBase string Conn *grpc.ClientConn Transport *http.Transport Client *http.Client ControlClient messages.ControlPlaneClient MissedPings int } func (h *RemoteHostGRPC) Name() string { return h.Host } func (h *RemoteHostGRPC) Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error) { target := fmt.Sprintf("%s%s", h.HTTPBase, r.URL.RequestURI()) var bodyCopy []byte if r.Body != nil && r.Body != http.NoBody { var err error bodyCopy, err = io.ReadAll(r.Body) if err != nil { http.Error(w, "proxy read error", http.StatusBadGateway) return false, err } } if r.Body != nil { r.Body.Close() } var reqBody io.Reader if len(bodyCopy) > 0 { reqBody = bytes.NewReader(bodyCopy) } req, err := http.NewRequestWithContext(r.Context(), r.Method, target, reqBody) if err != nil { http.Error(w, "proxy build error", http.StatusBadGateway) return false, err } r.Body = io.NopCloser(bytes.NewReader(bodyCopy)) req.Header.Set("X-Forwarded-Host", r.Host) if idStr := id.String(); idStr != "" { req.Header.Set("X-Cart-Id", idStr) } for k, v := range r.Header { for _, vv := range v { req.Header.Add(k, vv) } } res, err := h.Client.Do(req) if err != nil { http.Error(w, "proxy request error", http.StatusBadGateway) return false, err } defer res.Body.Close() for k, v := range res.Header { for _, vv := range v { w.Header().Add(k, vv) } } w.Header().Set("X-Cart-Owner-Routed", "true") if res.StatusCode >= 200 && res.StatusCode <= 299 { w.WriteHeader(res.StatusCode) _, copyErr := io.Copy(w, res.Body) if copyErr != nil { return true, copyErr } return true, nil } return false, fmt.Errorf("proxy response status %d", res.StatusCode) } func (r *RemoteHostGRPC) IsHealthy() bool { return r.MissedPings < 3 } // NewCartPool constructs a unified pool. Discovery may be nil for standalone // deployments. func NewCartPool(size int, ttl time.Duration, hostname string, spawn func(id CartId) (*CartGrain, error), discovery Discovery) (*CartPool, error) { p := &CartPool{ grains: make(map[uint64]*CartGrain), expiry: make([]Ttl, 0), spawn: spawn, ttl: ttl, poolSize: size, hostname: hostname, remoteOwners: make(map[CartId]*RemoteHostGRPC), remoteHosts: make(map[string]*RemoteHostGRPC), } p.discardedHostHandler = NewDiscardedHostHandler(1338) p.discardedHostHandler.SetReconnectHandler(p.AddRemote) p.purgeTicker = time.NewTicker(time.Minute) go func() { for range p.purgeTicker.C { p.Purge() } }() if discovery != nil { go p.startDiscovery(discovery) } else { log.Printf("No discovery configured; expecting manual AddRemote or static host injection") } return p, nil } // startDiscovery subscribes to cluster events and adds/removes hosts. func (p *CartPool) startDiscovery(discovery Discovery) { time.Sleep(3 * time.Second) // allow gRPC server startup log.Printf("Starting discovery watcher") ch, err := discovery.Watch() if err != nil { log.Printf("Discovery error: %v", err) return } for evt := range ch { if evt.Host == "" { continue } switch evt.Type { case watch.Deleted: if p.IsKnown(evt.Host) { p.RemoveHost(evt.Host) } default: if !p.IsKnown(evt.Host) { log.Printf("Discovered host %s", evt.Host) p.AddRemote(evt.Host) } } } } // --------------------------------------------------------------------------- // Local grain management // --------------------------------------------------------------------------- func (p *CartPool) statsUpdate() { p.localMu.RLock() size := len(p.grains) cap := p.poolSize p.localMu.RUnlock() poolGrains.Set(float64(size)) poolSize.Set(float64(cap)) if cap > 0 { poolUsage.Set(float64(size) / float64(cap)) } } // LocalUsage returns the number of resident grains and configured capacity. func (p *CartPool) LocalUsage() (int, int) { p.localMu.RLock() defer p.localMu.RUnlock() return len(p.grains), p.poolSize } // SetAvailable pre-populates placeholder entries. func (p *CartPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) { p.localMu.Lock() defer p.localMu.Unlock() for id := range availableWithLastChangeUnix { k := uint64(id) if _, ok := p.grains[k]; !ok { p.grains[k] = nil p.expiry = append(p.expiry, Ttl{Expires: time.Now().Add(p.ttl)}) } } p.statsUpdate() } // Purge removes expired grains and broadcasts expiry announcements so that // other hosts drop stale ownership hints. func (p *CartPool) Purge() { now := time.Now() keepChanged := now.Add(-p.ttl).Unix() var expired []CartId p.localMu.Lock() for i := 0; i < len(p.expiry); { entry := p.expiry[i] if entry.Grain == nil { i++ continue } if entry.Expires.After(now) { break } if entry.Grain.GetLastChange() > keepChanged { // Recently mutated: move to back. p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) p.expiry = append(p.expiry, entry) continue } id := entry.Grain.GetId() delete(p.grains, uint64(id)) expired = append(expired, id) if i < len(p.expiry)-1 { p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) } else { p.expiry = p.expiry[:i] } } p.localMu.Unlock() if len(expired) > 0 { p.statsUpdate() go p.broadcastExpiry(expired) } } // RefreshExpiry updates the TTL entry for a given grain. func (p *CartPool) RefreshExpiry(id CartId) { p.localMu.Lock() defer p.localMu.Unlock() for i := range p.expiry { g := p.expiry[i].Grain if g != nil && g.Id == id { p.expiry[i].Expires = time.Now().Add(p.ttl) return } } // If no entry existed, append one (safeguard for newly spawned grains). p.expiry = append(p.expiry, Ttl{Expires: time.Now().Add(p.ttl), Grain: p.grains[uint64(id)]}) } // DebugGrainCount returns the number of locally resident grains. func (p *CartPool) DebugGrainCount() int { p.localMu.RLock() defer p.localMu.RUnlock() return len(p.grains) } // LocalCartIDs returns the currently owned cart ids (for control-plane RPCs). func (p *CartPool) LocalCartIDs() []uint64 { p.localMu.RLock() defer p.localMu.RUnlock() ids := make([]uint64, 0, len(p.grains)) for _, g := range p.grains { if g == nil { continue } ids = append(ids, uint64(g.GetId())) } return ids } // SnapshotGrains returns a copy of the currently resident grains keyed by id. func (p *CartPool) SnapshotGrains() map[CartId]*CartGrain { p.localMu.RLock() defer p.localMu.RUnlock() out := make(map[CartId]*CartGrain, len(p.grains)) for _, g := range p.grains { if g != nil { out[g.GetId()] = g } } return out } func (p *CartPool) removeLocalGrain(id CartId) { p.localMu.Lock() delete(p.grains, uint64(id)) for i := range p.expiry { if p.expiry[i].Grain != nil && p.expiry[i].Grain.GetId() == id { p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) break } } p.localMu.Unlock() p.statsUpdate() } func (p *CartPool) getLocalGrain(id CartId) (*CartGrain, error) { key := uint64(id) grainLookups.Inc() p.localMu.RLock() grain, ok := p.grains[key] p.localMu.RUnlock() if grain != nil && ok { return grain, nil } p.localMu.Lock() defer p.localMu.Unlock() grain, ok = p.grains[key] if grain == nil || !ok { if len(p.grains) >= p.poolSize && len(p.expiry) > 0 { if p.expiry[0].Expires.Before(time.Now()) && p.expiry[0].Grain != nil { oldID := p.expiry[0].Grain.GetId() delete(p.grains, uint64(oldID)) p.expiry = p.expiry[1:] go p.broadcastExpiry([]CartId{oldID}) } else { return nil, fmt.Errorf("pool is full") } } spawned, err := p.spawn(id) if err != nil { return nil, err } p.grains[key] = spawned p.expiry = append(p.expiry, Ttl{Expires: time.Now().Add(p.ttl), Grain: spawned}) grain = spawned } go p.statsUpdate() return grain, nil } // --------------------------------------------------------------------------- // Cluster ownership and coordination // --------------------------------------------------------------------------- func (p *CartPool) TakeOwnership(id CartId) { p.broadcastOwnership([]CartId{id}) } func (p *CartPool) AddRemote(host string) { if host == "" || host == p.hostname { return } p.remoteMu.Lock() if _, exists := p.remoteHosts[host]; exists { p.remoteMu.Unlock() return } p.remoteMu.Unlock() target := fmt.Sprintf("%s:1337", host) dialCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) conn, err := grpc.DialContext(dialCtx, target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) cancel() if err != nil { log.Printf("AddRemote: dial %s failed: %v", target, err) return } controlClient := messages.NewControlPlaneClient(conn) for retries := 0; retries < 3; retries++ { ctx, pingCancel := context.WithTimeout(context.Background(), time.Second) _, pingErr := controlClient.Ping(ctx, &messages.Empty{}) pingCancel() if pingErr == nil { break } if retries == 2 { log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr) conn.Close() return } time.Sleep(200 * time.Millisecond) } transport := &http.Transport{ MaxIdleConns: 100, MaxIdleConnsPerHost: 100, IdleConnTimeout: 120 * time.Second, } client := &http.Client{Transport: transport, Timeout: 10 * time.Second} remote := &RemoteHostGRPC{ Host: host, HTTPBase: fmt.Sprintf("http://%s:8080/cart", host), Conn: conn, Transport: transport, Client: client, ControlClient: controlClient, } p.remoteMu.Lock() p.remoteHosts[host] = remote p.remoteMu.Unlock() connectedRemotes.Set(float64(p.RemoteCount())) log.Printf("Connected to remote host %s", host) go p.pingLoop(remote) go p.initializeRemote(remote) go p.Negotiate() } func (p *CartPool) initializeRemote(remote *RemoteHostGRPC) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() reply, err := remote.ControlClient.GetCartIds(ctx, &messages.Empty{}) if err != nil { log.Printf("Init remote %s: GetCartIds error: %v", remote.Host, err) return } count := 0 p.remoteMu.Lock() for _, cid := range reply.CartIds { id := CartId(cid) if _, exists := p.remoteOwners[id]; !exists { p.remoteOwners[id] = remote } count++ } p.remoteMu.Unlock() log.Printf("Remote %s reported %d remote-owned carts", remote.Host, count) } func (p *CartPool) RemoveHost(host string) { p.remoteMu.Lock() remote, exists := p.remoteHosts[host] if exists { delete(p.remoteHosts, host) } for id, owner := range p.remoteOwners { if owner.Host == host { delete(p.remoteOwners, id) } } p.remoteMu.Unlock() if exists { remote.Conn.Close() } connectedRemotes.Set(float64(p.RemoteCount())) } func (p *CartPool) RemoteCount() int { p.remoteMu.RLock() defer p.remoteMu.RUnlock() return len(p.remoteHosts) } // RemoteHostNames returns a snapshot of connected remote host identifiers. func (p *CartPool) RemoteHostNames() []string { p.remoteMu.RLock() defer p.remoteMu.RUnlock() hosts := make([]string, 0, len(p.remoteHosts)) for host := range p.remoteHosts { hosts = append(hosts, host) } return hosts } func (p *CartPool) IsKnown(host string) bool { if host == p.hostname { return true } p.remoteMu.RLock() defer p.remoteMu.RUnlock() _, ok := p.remoteHosts[host] return ok } func (p *CartPool) pingLoop(remote *RemoteHostGRPC) { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() for range ticker.C { ctx, cancel := context.WithTimeout(context.Background(), time.Second) _, err := remote.ControlClient.Ping(ctx, &messages.Empty{}) cancel() if err != nil { remote.MissedPings++ log.Printf("Ping %s failed (%d)", remote.Host, remote.MissedPings) if !remote.IsHealthy() { log.Printf("Remote %s unhealthy, removing", remote.Host) p.RemoveHost(remote.Host) return } continue } remote.MissedPings = 0 } } func (p *CartPool) IsHealthy() bool { p.remoteMu.RLock() defer p.remoteMu.RUnlock() for _, r := range p.remoteHosts { if !r.IsHealthy() { return false } } return true } func (p *CartPool) Negotiate() { negotiationCount.Inc() p.remoteMu.RLock() hosts := make([]string, 0, len(p.remoteHosts)+1) hosts = append(hosts, p.hostname) remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) for h, r := range p.remoteHosts { hosts = append(hosts, h) remotes = append(remotes, r) } p.remoteMu.RUnlock() for _, r := range remotes { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) reply, err := r.ControlClient.Negotiate(ctx, &messages.NegotiateRequest{KnownHosts: hosts}) cancel() if err != nil { log.Printf("Negotiate with %s failed: %v", r.Host, err) continue } for _, h := range reply.Hosts { if !p.IsKnown(h) { p.AddRemote(h) } } } } func (p *CartPool) broadcastOwnership(ids []CartId) { if len(ids) == 0 { return } uids := make([]uint64, len(ids)) for i, id := range ids { uids[i] = uint64(id) } p.remoteMu.RLock() remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) for _, r := range p.remoteHosts { if r.IsHealthy() { remotes = append(remotes, r) } } p.remoteMu.RUnlock() for _, remote := range remotes { go func(rh *RemoteHostGRPC) { _, err := rh.ControlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{ Host: p.hostname, CartIds: uids, }) if err != nil { log.Printf("ownership announce to %s failed: %v", rh.Host, err) } }(remote) } } func (p *CartPool) broadcastExpiry(ids []CartId) { if len(ids) == 0 { return } uids := make([]uint64, len(ids)) for i, id := range ids { uids[i] = uint64(id) } p.remoteMu.RLock() remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) for _, r := range p.remoteHosts { if r.IsHealthy() { remotes = append(remotes, r) } } p.remoteMu.RUnlock() for _, remote := range remotes { go func(rh *RemoteHostGRPC) { _, err := rh.ControlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{ Host: p.hostname, CartIds: uids, }) if err != nil { log.Printf("expiry announce to %s failed: %v", rh.Host, err) } }(remote) } } func (p *CartPool) AdoptRemoteOwnership(host string, ids []string) { if host == "" || host == p.hostname { return } remoteHost, ok := p.remoteHosts[host] if !ok { log.Printf("AdoptRemoteOwnership: unknown host %s", host) } p.remoteMu.Lock() defer p.remoteMu.Unlock() for _, s := range ids { if s == "" { continue } parsed, ok := ParseCartId(s) if !ok { continue } if existing, ok := p.remoteOwners[parsed]; ok && existing != remoteHost { continue } p.localMu.RLock() _, localHas := p.grains[uint64(parsed)] p.localMu.RUnlock() if localHas { continue } p.remoteOwners[parsed] = remoteHost } } func (p *CartPool) HandleRemoteExpiry(host string, ids []uint64) { if host == "" || host == p.hostname { return } p.remoteMu.Lock() defer p.remoteMu.Unlock() for _, raw := range ids { id := CartId(raw) if owner, ok := p.remoteOwners[id]; ok && owner.Host == host { delete(p.remoteOwners, id) } } } func (p *CartPool) getOrClaimGrain(id CartId) (*CartGrain, error) { p.localMu.RLock() grain, exists := p.grains[uint64(id)] p.localMu.RUnlock() if exists && grain != nil { return grain, nil } p.remoteMu.RLock() remoteHost, found := p.remoteOwners[id] p.remoteMu.RUnlock() if found && remoteHost != nil && remoteHost.Host != "" { return nil, ErrNotOwner } grain, err := p.getLocalGrain(id) if err != nil { return nil, err } go p.broadcastOwnership([]CartId{id}) return grain, nil } // ErrNotOwner is returned when a cart belongs to another host. var ErrNotOwner = fmt.Errorf("not owner") // Apply applies a mutation to a grain. func (p *CartPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { grain, err := p.getOrClaimGrain(id) if err != nil { return nil, err } start := time.Now() result, applyErr := grain.Apply(mutation, false) mutationType := "unknown" if mutation != nil { if t := reflect.TypeOf(mutation); t != nil { if t.Kind() == reflect.Ptr { t = t.Elem() } if t.Name() != "" { mutationType = t.Name() } } } cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds()) if applyErr == nil && result != nil { cartMutationsTotal.Inc() p.RefreshExpiry(id) cartActiveGrains.Set(float64(p.DebugGrainCount())) } else if applyErr != nil { cartMutationFailuresTotal.Inc() } return result, applyErr } // Get returns the current state of a grain. func (p *CartPool) Get(id CartId) (*CartGrain, error) { grain, err := p.getOrClaimGrain(id) if err != nil { return nil, err } return grain.GetCurrentState() } // OwnerHost reports the remote owner (if any) for the supplied cart id. func (p *CartPool) OwnerHost(id CartId) (Host, bool) { p.remoteMu.RLock() defer p.remoteMu.RUnlock() owner, ok := p.remoteOwners[id] return owner, ok } // Hostname returns the local hostname (pod IP). func (p *CartPool) Hostname() string { return p.hostname } // Close notifies remotes that this host is shutting down. func (p *CartPool) Close() { p.remoteMu.Lock() defer p.remoteMu.Unlock() for _, r := range p.remoteHosts { go func(rh *RemoteHostGRPC) { _, err := rh.ControlClient.Closing(context.Background(), &messages.ClosingNotice{Host: p.hostname}) if err != nil { log.Printf("Close notify to %s failed: %v", rh.Host, err) } }(r) } if p.purgeTicker != nil { p.purgeTicker.Stop() } }