package actor import ( "context" "fmt" "log" "maps" "sync" "time" "google.golang.org/protobuf/proto" ) type SimpleGrainPool[V any] struct { // fields and methods localMu sync.RWMutex grains map[uint64]Grain[V] mutationRegistry MutationRegistry spawn func(ctx context.Context, id uint64) (Grain[V], error) destroy func(grain Grain[V]) error spawnHost func(host string) (Host[V], error) listeners []LogListener storage LogStorage[V] ttl time.Duration poolSize int // Cluster coordination -------------------------------------------------- hostname string remoteMu sync.RWMutex remoteOwners map[uint64]Host[V] remoteHosts map[string]Host[V] //discardedHostHandler *DiscardedHostHandler // House-keeping --------------------------------------------------------- purgeTicker *time.Ticker } type GrainPoolConfig[V any] struct { Hostname string Spawn func(ctx context.Context, id uint64) (Grain[V], error) SpawnHost func(host string) (Host[V], error) Destroy func(grain Grain[V]) error TTL time.Duration PoolSize int MutationRegistry MutationRegistry Storage LogStorage[V] } func NewSimpleGrainPool[V any](config GrainPoolConfig[V]) (*SimpleGrainPool[V], error) { p := &SimpleGrainPool[V]{ grains: make(map[uint64]Grain[V]), mutationRegistry: config.MutationRegistry, storage: config.Storage, spawn: config.Spawn, spawnHost: config.SpawnHost, destroy: config.Destroy, ttl: config.TTL, poolSize: config.PoolSize, hostname: config.Hostname, remoteOwners: make(map[uint64]Host[V]), remoteHosts: make(map[string]Host[V]), } p.purgeTicker = time.NewTicker(time.Minute) go func() { for range p.purgeTicker.C { p.purge() } }() return p, nil } func (p *SimpleGrainPool[V]) AddListener(listener LogListener) { p.listeners = append(p.listeners, listener) } func (p *SimpleGrainPool[V]) RemoveListener(listener LogListener) { for i, l := range p.listeners { if l == listener { p.listeners = append(p.listeners[:i], p.listeners[i+1:]...) break } } } func (p *SimpleGrainPool[V]) purge() { purgeLimit := time.Now().Add(-p.ttl) purgedIds := make([]uint64, 0, len(p.grains)) p.localMu.Lock() for id, grain := range p.grains { if grain.GetLastAccess().Before(purgeLimit) { purgedIds = append(purgedIds, id) if err := p.destroy(grain); err != nil { log.Printf("failed to destroy grain %d: %v", id, err) } delete(p.grains, id) } } p.localMu.Unlock() p.forAllHosts(func(remote Host[V]) { remote.AnnounceExpiry(purgedIds) }) } // LocalUsage returns the number of resident grains and configured capacity. func (p *SimpleGrainPool[V]) LocalUsage() (int, int) { p.localMu.RLock() defer p.localMu.RUnlock() return len(p.grains), p.poolSize } // LocalCartIDs returns the currently owned cart ids (for control-plane RPCs). func (p *SimpleGrainPool[V]) GetLocalIds() []uint64 { p.localMu.RLock() defer p.localMu.RUnlock() ids := make([]uint64, 0, len(p.grains)) for _, g := range p.grains { if g == nil { continue } ids = append(ids, uint64(g.GetId())) } return ids } func (p *SimpleGrainPool[V]) HandleRemoteExpiry(host string, ids []uint64) error { p.remoteMu.Lock() defer p.remoteMu.Unlock() for _, id := range ids { delete(p.remoteOwners, id) } return nil } func (p *SimpleGrainPool[V]) HandleOwnershipChange(host string, ids []uint64) error { p.remoteMu.RLock() remoteHost, exists := p.remoteHosts[host] p.remoteMu.RUnlock() if !exists { createdHost, err := p.AddRemote(host) if err != nil { return err } remoteHost = createdHost } p.remoteMu.Lock() defer p.remoteMu.Unlock() p.localMu.Lock() defer p.localMu.Unlock() for _, id := range ids { log.Printf("Handling ownership change for cart %d to host %s", id, host) delete(p.grains, id) p.remoteOwners[id] = remoteHost } return nil } // TakeOwnership takes ownership of a grain. func (p *SimpleGrainPool[V]) TakeOwnership(id uint64) { p.broadcastOwnership([]uint64{id}) } func (p *SimpleGrainPool[V]) AddRemoteHost(host string) { p.AddRemote(host) } func (p *SimpleGrainPool[V]) AddRemote(host string) (Host[V], error) { if host == "" { return nil, fmt.Errorf("host is empty") } if host == p.hostname { return nil, fmt.Errorf("same host, this should not happen") } p.remoteMu.RLock() existing, found := p.remoteHosts[host] p.remoteMu.RUnlock() if found { return existing, nil } remote, err := p.spawnHost(host) if err != nil { log.Printf("AddRemote %s failed: %v", host, err) return nil, err } p.remoteMu.Lock() p.remoteHosts[host] = remote p.remoteMu.Unlock() // connectedRemotes.Set(float64(p.RemoteCount())) log.Printf("Connected to remote host %s", host) go p.pingLoop(remote) go p.initializeRemote(remote) go p.SendNegotiation() return remote, nil } func (p *SimpleGrainPool[V]) initializeRemote(remote Host[V]) { remotesIds := remote.GetActorIds() p.remoteMu.Lock() for _, id := range remotesIds { p.localMu.Lock() delete(p.grains, id) p.localMu.Unlock() if _, exists := p.remoteOwners[id]; !exists { p.remoteOwners[id] = remote } } p.remoteMu.Unlock() } func (p *SimpleGrainPool[V]) RemoveHost(host string) { p.remoteMu.Lock() remote, exists := p.remoteHosts[host] if exists { go remote.Close() delete(p.remoteHosts, host) } count := 0 for id, owner := range p.remoteOwners { if owner.Name() == host { count++ delete(p.remoteOwners, id) } } log.Printf("Removing host %s, grains: %d", host, count) p.remoteMu.Unlock() if exists { remote.Close() } // connectedRemotes.Set(float64(p.RemoteCount())) } func (p *SimpleGrainPool[V]) RemoteCount() int { p.remoteMu.RLock() defer p.remoteMu.RUnlock() return len(p.remoteHosts) } // RemoteHostNames returns a snapshot of connected remote host identifiers. func (p *SimpleGrainPool[V]) RemoteHostNames() []string { p.remoteMu.RLock() defer p.remoteMu.RUnlock() hosts := make([]string, 0, len(p.remoteHosts)) for host := range p.remoteHosts { hosts = append(hosts, host) } return hosts } func (p *SimpleGrainPool[V]) IsKnown(host string) bool { if host == p.hostname { return true } p.remoteMu.RLock() defer p.remoteMu.RUnlock() _, ok := p.remoteHosts[host] return ok } func (p *SimpleGrainPool[V]) pingLoop(remote Host[V]) { remote.Ping() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for range ticker.C { if !remote.Ping() { if !remote.IsHealthy() { log.Printf("Remote %s unhealthy, removing", remote.Name()) p.Close() p.RemoveHost(remote.Name()) return } continue } } } func (p *SimpleGrainPool[V]) IsHealthy() bool { p.remoteMu.RLock() defer p.remoteMu.RUnlock() for _, r := range p.remoteHosts { if !r.IsHealthy() { return false } } return true } func (p *SimpleGrainPool[V]) Negotiate(otherHosts []string) { for _, host := range otherHosts { if host != p.hostname { p.remoteMu.RLock() _, ok := p.remoteHosts[host] p.remoteMu.RUnlock() if !ok { go p.AddRemote(host) } } } } func (p *SimpleGrainPool[V]) SendNegotiation() { //negotiationCount.Inc() p.remoteMu.RLock() hosts := make([]string, 0, len(p.remoteHosts)+1) hosts = append(hosts, p.hostname) remotes := make([]Host[V], 0, len(p.remoteHosts)) for h, r := range p.remoteHosts { hosts = append(hosts, h) remotes = append(remotes, r) } p.remoteMu.RUnlock() p.forAllHosts(func(remote Host[V]) { knownByRemote, err := remote.Negotiate(hosts) if err != nil { log.Printf("Negotiate with %s failed: %v", remote.Name(), err) return } for _, h := range knownByRemote { if !p.IsKnown(h) { go p.AddRemote(h) } } }) } func (p *SimpleGrainPool[V]) forAllHosts(fn func(Host[V])) { p.remoteMu.RLock() rh := maps.Clone(p.remoteHosts) p.remoteMu.RUnlock() wg := sync.WaitGroup{} for _, host := range rh { wg.Go(func() { fn(host) }) } for name, host := range rh { if !host.IsHealthy() { host.Close() p.remoteMu.Lock() delete(p.remoteHosts, name) p.remoteMu.Unlock() } } } func (p *SimpleGrainPool[V]) broadcastOwnership(ids []uint64) { if len(ids) == 0 { return } p.forAllHosts(func(rh Host[V]) { rh.AnnounceOwnership(p.hostname, ids) }) log.Printf("%s taking ownership of %d ids", p.hostname, len(ids)) // go p.statsUpdate() } func (p *SimpleGrainPool[V]) getOrClaimGrain(ctx context.Context, id uint64) (Grain[V], error) { p.localMu.RLock() grain, exists := p.grains[id] p.localMu.RUnlock() if exists && grain != nil { return grain, nil } grain, err := p.spawn(ctx, id) if err != nil { return nil, err } go p.broadcastOwnership([]uint64{id}) p.localMu.Lock() p.grains[id] = grain p.localMu.Unlock() return grain, nil } // // ErrNotOwner is returned when a cart belongs to another host. // var ErrNotOwner = fmt.Errorf("not owner") // Apply applies a mutation to a grain. func (p *SimpleGrainPool[V]) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (*MutationResult[V], error) { grain, err := p.getOrClaimGrain(ctx, id) if err != nil { return nil, err } mutations, err := p.mutationRegistry.Apply(ctx, grain, mutation...) if err != nil { return nil, err } if p.storage != nil { go func() { if err := p.storage.AppendMutations(id, mutation...); err != nil { log.Printf("failed to store mutation for grain %d: %v", id, err) } }() } for _, listener := range p.listeners { go listener.AppendMutations(id, mutations...) } result, err := grain.GetCurrentState() if err != nil { return nil, err } return &MutationResult[V]{ Result: *result, Mutations: mutations, }, nil } // Get returns the current state of a grain. func (p *SimpleGrainPool[V]) Get(ctx context.Context, id uint64) (*V, error) { grain, err := p.getOrClaimGrain(ctx, id) if err != nil { return nil, err } return grain.GetCurrentState() } // OwnerHost reports the remote owner (if any) for the supplied cart id. func (p *SimpleGrainPool[V]) OwnerHost(id uint64) (Host[V], bool) { p.remoteMu.RLock() defer p.remoteMu.RUnlock() owner, ok := p.remoteOwners[id] return owner, ok } // Hostname returns the local hostname (pod IP). func (p *SimpleGrainPool[V]) Hostname() string { return p.hostname } // Close notifies remotes that this host is shutting down. func (p *SimpleGrainPool[V]) Close() { p.forAllHosts(func(rh Host[V]) { rh.Close() }) if p.purgeTicker != nil { p.purgeTicker.Stop() } }