package actor import ( "fmt" "log" "maps" "sync" "time" "github.com/gogo/protobuf/proto" ) type SimpleGrainPool[V any] struct { // fields and methods localMu sync.RWMutex grains map[uint64]Grain[V] mutationRegistry MutationRegistry spawn func(id uint64) (Grain[V], error) spawnHost func(host string) (Host, error) storage LogStorage[V] ttl time.Duration poolSize int // Cluster coordination -------------------------------------------------- hostname string remoteMu sync.RWMutex remoteOwners map[uint64]Host remoteHosts map[string]Host //discardedHostHandler *DiscardedHostHandler // House-keeping --------------------------------------------------------- purgeTicker *time.Ticker } type GrainPoolConfig[V any] struct { Hostname string Spawn func(id uint64) (Grain[V], error) SpawnHost func(host string) (Host, error) TTL time.Duration PoolSize int MutationRegistry MutationRegistry Storage LogStorage[V] } func NewSimpleGrainPool[V any](config GrainPoolConfig[V]) (*SimpleGrainPool[V], error) { p := &SimpleGrainPool[V]{ grains: make(map[uint64]Grain[V]), mutationRegistry: config.MutationRegistry, storage: config.Storage, spawn: config.Spawn, spawnHost: config.SpawnHost, ttl: config.TTL, poolSize: config.PoolSize, hostname: config.Hostname, remoteOwners: make(map[uint64]Host), remoteHosts: make(map[string]Host), } p.purgeTicker = time.NewTicker(time.Minute) go func() { for range p.purgeTicker.C { p.purge() } }() return p, nil } func (p *SimpleGrainPool[V]) purge() { purgeLimit := time.Now().Add(-p.ttl) purgedIds := make([]uint64, 0, len(p.grains)) p.localMu.Lock() for id, grain := range p.grains { if grain.GetLastAccess().Before(purgeLimit) { purgedIds = append(purgedIds, id) delete(p.grains, id) } } p.localMu.Unlock() p.forAllHosts(func(remote Host) { remote.AnnounceExpiry(purgedIds) }) } // LocalUsage returns the number of resident grains and configured capacity. func (p *SimpleGrainPool[V]) LocalUsage() (int, int) { p.localMu.RLock() defer p.localMu.RUnlock() return len(p.grains), p.poolSize } // LocalCartIDs returns the currently owned cart ids (for control-plane RPCs). func (p *SimpleGrainPool[V]) GetLocalIds() []uint64 { p.localMu.RLock() defer p.localMu.RUnlock() ids := make([]uint64, 0, len(p.grains)) for _, g := range p.grains { if g == nil { continue } ids = append(ids, uint64(g.GetId())) } return ids } func (p *SimpleGrainPool[V]) HandleRemoteExpiry(host string, ids []uint64) error { p.remoteMu.Lock() defer p.remoteMu.Unlock() for _, id := range ids { delete(p.remoteOwners, id) } return nil } func (p *SimpleGrainPool[V]) HandleOwnershipChange(host string, ids []uint64) error { p.remoteMu.RLock() remoteHost, exists := p.remoteHosts[host] p.remoteMu.RUnlock() if !exists { createdHost, err := p.AddRemote(host) if err != nil { return err } remoteHost = createdHost } p.remoteMu.Lock() defer p.remoteMu.Unlock() p.localMu.Lock() defer p.localMu.Unlock() for _, id := range ids { log.Printf("Handling ownership change for cart %d to host %s", id, host) delete(p.grains, id) p.remoteOwners[id] = remoteHost } return nil } // TakeOwnership takes ownership of a grain. func (p *SimpleGrainPool[V]) TakeOwnership(id uint64) { p.broadcastOwnership([]uint64{id}) } func (p *SimpleGrainPool[V]) AddRemote(host string) (Host, error) { if host == "" { return nil, fmt.Errorf("host is empty") } if host == p.hostname { return nil, fmt.Errorf("same host, this should not happen") } p.remoteMu.RLock() existing, found := p.remoteHosts[host] p.remoteMu.RUnlock() if found { return existing, nil } remote, err := p.spawnHost(host) if err != nil { log.Printf("AddRemote %s failed: %v", host, err) return nil, err } p.remoteMu.Lock() p.remoteHosts[host] = remote p.remoteMu.Unlock() // connectedRemotes.Set(float64(p.RemoteCount())) log.Printf("Connected to remote host %s", host) go p.pingLoop(remote) go p.initializeRemote(remote) go p.SendNegotiation() return remote, nil } func (p *SimpleGrainPool[V]) initializeRemote(remote Host) { remotesIds := remote.GetActorIds() p.remoteMu.Lock() for _, id := range remotesIds { p.localMu.Lock() delete(p.grains, id) p.localMu.Unlock() if _, exists := p.remoteOwners[id]; !exists { p.remoteOwners[id] = remote } } p.remoteMu.Unlock() } func (p *SimpleGrainPool[V]) RemoveHost(host string) { p.remoteMu.Lock() remote, exists := p.remoteHosts[host] if exists { go remote.Close() delete(p.remoteHosts, host) } count := 0 for id, owner := range p.remoteOwners { if owner.Name() == host { count++ delete(p.remoteOwners, id) } } log.Printf("Removing host %s, grains: %d", host, count) p.remoteMu.Unlock() if exists { remote.Close() } // connectedRemotes.Set(float64(p.RemoteCount())) } func (p *SimpleGrainPool[V]) RemoteCount() int { p.remoteMu.RLock() defer p.remoteMu.RUnlock() return len(p.remoteHosts) } // RemoteHostNames returns a snapshot of connected remote host identifiers. func (p *SimpleGrainPool[V]) RemoteHostNames() []string { p.remoteMu.RLock() defer p.remoteMu.RUnlock() hosts := make([]string, 0, len(p.remoteHosts)) for host := range p.remoteHosts { hosts = append(hosts, host) } return hosts } func (p *SimpleGrainPool[V]) IsKnown(host string) bool { if host == p.hostname { return true } p.remoteMu.RLock() defer p.remoteMu.RUnlock() _, ok := p.remoteHosts[host] return ok } func (p *SimpleGrainPool[V]) pingLoop(remote Host) { remote.Ping() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for range ticker.C { if !remote.Ping() { if !remote.IsHealthy() { log.Printf("Remote %s unhealthy, removing", remote.Name()) p.Close() p.RemoveHost(remote.Name()) return } continue } } } func (p *SimpleGrainPool[V]) IsHealthy() bool { p.remoteMu.RLock() defer p.remoteMu.RUnlock() for _, r := range p.remoteHosts { if !r.IsHealthy() { return false } } return true } func (p *SimpleGrainPool[V]) Negotiate(otherHosts []string) { for _, host := range otherHosts { if host != p.hostname { p.remoteMu.RLock() _, ok := p.remoteHosts[host] p.remoteMu.RUnlock() if !ok { go p.AddRemote(host) } } } } func (p *SimpleGrainPool[V]) SendNegotiation() { //negotiationCount.Inc() p.remoteMu.RLock() hosts := make([]string, 0, len(p.remoteHosts)+1) hosts = append(hosts, p.hostname) remotes := make([]Host, 0, len(p.remoteHosts)) for h, r := range p.remoteHosts { hosts = append(hosts, h) remotes = append(remotes, r) } p.remoteMu.RUnlock() p.forAllHosts(func(remote Host) { knownByRemote, err := remote.Negotiate(hosts) if err != nil { log.Printf("Negotiate with %s failed: %v", remote.Name(), err) return } for _, h := range knownByRemote { if !p.IsKnown(h) { go p.AddRemote(h) } } }) } func (p *SimpleGrainPool[V]) forAllHosts(fn func(Host)) { p.remoteMu.RLock() rh := maps.Clone(p.remoteHosts) p.remoteMu.RUnlock() wg := sync.WaitGroup{} for _, host := range rh { wg.Go(func() { fn(host) }) } for name, host := range rh { if !host.IsHealthy() { host.Close() p.remoteMu.Lock() delete(p.remoteHosts, name) p.remoteMu.Unlock() } } } func (p *SimpleGrainPool[V]) broadcastOwnership(ids []uint64) { if len(ids) == 0 { return } p.forAllHosts(func(rh Host) { rh.AnnounceOwnership(ids) }) log.Printf("taking ownership of %d ids", len(ids)) // go p.statsUpdate() } func (p *SimpleGrainPool[V]) getOrClaimGrain(id uint64) (Grain[V], error) { p.localMu.RLock() grain, exists := p.grains[id] p.localMu.RUnlock() if exists && grain != nil { return grain, nil } grain, err := p.spawn(id) if err != nil { return nil, err } p.localMu.Lock() p.grains[id] = grain p.localMu.Unlock() go p.broadcastOwnership([]uint64{id}) return grain, nil } // // ErrNotOwner is returned when a cart belongs to another host. // var ErrNotOwner = fmt.Errorf("not owner") // Apply applies a mutation to a grain. func (p *SimpleGrainPool[V]) Apply(id uint64, mutation ...proto.Message) (*MutationResult[*V], error) { grain, err := p.getOrClaimGrain(id) if err != nil { return nil, err } mutations, err := p.mutationRegistry.Apply(grain, mutation...) if err != nil { return nil, err } if p.storage != nil { go func() { if err := p.storage.AppendEvent(id, mutation...); err != nil { log.Printf("failed to store mutation for grain %d: %v", id, err) } }() } result, err := grain.GetCurrentState() if err != nil { return nil, err } return &MutationResult[*V]{ Result: result, Mutations: mutations, }, nil } // Get returns the current state of a grain. func (p *SimpleGrainPool[V]) Get(id uint64) (*V, error) { grain, err := p.getOrClaimGrain(id) if err != nil { return nil, err } return grain.GetCurrentState() } // OwnerHost reports the remote owner (if any) for the supplied cart id. func (p *SimpleGrainPool[V]) OwnerHost(id uint64) (Host, bool) { p.remoteMu.RLock() defer p.remoteMu.RUnlock() owner, ok := p.remoteOwners[id] return owner, ok } // Hostname returns the local hostname (pod IP). func (p *SimpleGrainPool[V]) Hostname() string { return p.hostname } // Close notifies remotes that this host is shutting down. func (p *SimpleGrainPool[V]) Close() { p.forAllHosts(func(rh Host) { rh.Close() }) if p.purgeTicker != nil { p.purgeTicker.Stop() } }