455 lines
10 KiB
Go
455 lines
10 KiB
Go
package actor
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"maps"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
)
|
|
|
|
type SimpleGrainPool[V any] struct {
|
|
// fields and methods
|
|
localMu sync.RWMutex
|
|
grains map[uint64]Grain[V]
|
|
mutationRegistry MutationRegistry
|
|
spawn func(id uint64) (Grain[V], error)
|
|
spawnHost func(host string) (Host, error)
|
|
listeners []LogListener
|
|
storage LogStorage[V]
|
|
ttl time.Duration
|
|
poolSize int
|
|
|
|
// Cluster coordination --------------------------------------------------
|
|
hostname string
|
|
remoteMu sync.RWMutex
|
|
remoteOwners map[uint64]Host
|
|
remoteHosts map[string]Host
|
|
//discardedHostHandler *DiscardedHostHandler
|
|
|
|
// House-keeping ---------------------------------------------------------
|
|
purgeTicker *time.Ticker
|
|
}
|
|
|
|
type GrainPoolConfig[V any] struct {
|
|
Hostname string
|
|
Spawn func(id uint64) (Grain[V], error)
|
|
SpawnHost func(host string) (Host, error)
|
|
TTL time.Duration
|
|
PoolSize int
|
|
MutationRegistry MutationRegistry
|
|
Storage LogStorage[V]
|
|
}
|
|
|
|
func NewSimpleGrainPool[V any](config GrainPoolConfig[V]) (*SimpleGrainPool[V], error) {
|
|
p := &SimpleGrainPool[V]{
|
|
grains: make(map[uint64]Grain[V]),
|
|
mutationRegistry: config.MutationRegistry,
|
|
storage: config.Storage,
|
|
spawn: config.Spawn,
|
|
spawnHost: config.SpawnHost,
|
|
ttl: config.TTL,
|
|
poolSize: config.PoolSize,
|
|
hostname: config.Hostname,
|
|
remoteOwners: make(map[uint64]Host),
|
|
remoteHosts: make(map[string]Host),
|
|
}
|
|
|
|
p.purgeTicker = time.NewTicker(time.Minute)
|
|
go func() {
|
|
for range p.purgeTicker.C {
|
|
p.purge()
|
|
}
|
|
}()
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) AddListener(listener LogListener) {
|
|
p.listeners = append(p.listeners, listener)
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) RemoveListener(listener LogListener) {
|
|
for i, l := range p.listeners {
|
|
if l == listener {
|
|
p.listeners = append(p.listeners[:i], p.listeners[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) purge() {
|
|
purgeLimit := time.Now().Add(-p.ttl)
|
|
purgedIds := make([]uint64, 0, len(p.grains))
|
|
p.localMu.Lock()
|
|
for id, grain := range p.grains {
|
|
if grain.GetLastAccess().Before(purgeLimit) {
|
|
purgedIds = append(purgedIds, id)
|
|
|
|
delete(p.grains, id)
|
|
}
|
|
}
|
|
p.localMu.Unlock()
|
|
p.forAllHosts(func(remote Host) {
|
|
remote.AnnounceExpiry(purgedIds)
|
|
})
|
|
|
|
}
|
|
|
|
// LocalUsage returns the number of resident grains and configured capacity.
|
|
func (p *SimpleGrainPool[V]) LocalUsage() (int, int) {
|
|
p.localMu.RLock()
|
|
defer p.localMu.RUnlock()
|
|
return len(p.grains), p.poolSize
|
|
}
|
|
|
|
// LocalCartIDs returns the currently owned cart ids (for control-plane RPCs).
|
|
func (p *SimpleGrainPool[V]) GetLocalIds() []uint64 {
|
|
p.localMu.RLock()
|
|
defer p.localMu.RUnlock()
|
|
ids := make([]uint64, 0, len(p.grains))
|
|
for _, g := range p.grains {
|
|
if g == nil {
|
|
continue
|
|
}
|
|
ids = append(ids, uint64(g.GetId()))
|
|
}
|
|
return ids
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) HandleRemoteExpiry(host string, ids []uint64) error {
|
|
p.remoteMu.Lock()
|
|
defer p.remoteMu.Unlock()
|
|
for _, id := range ids {
|
|
delete(p.remoteOwners, id)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) HandleOwnershipChange(host string, ids []uint64) error {
|
|
log.Printf("host %s now owns %d cart ids", host, len(ids))
|
|
p.remoteMu.RLock()
|
|
remoteHost, exists := p.remoteHosts[host]
|
|
p.remoteMu.RUnlock()
|
|
if !exists {
|
|
createdHost, err := p.AddRemote(host)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
remoteHost = createdHost
|
|
}
|
|
p.remoteMu.Lock()
|
|
defer p.remoteMu.Unlock()
|
|
p.localMu.Lock()
|
|
defer p.localMu.Unlock()
|
|
for _, id := range ids {
|
|
log.Printf("Handling ownership change for cart %d to host %s", id, host)
|
|
delete(p.grains, id)
|
|
p.remoteOwners[id] = remoteHost
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TakeOwnership takes ownership of a grain.
|
|
func (p *SimpleGrainPool[V]) TakeOwnership(id uint64) {
|
|
p.broadcastOwnership([]uint64{id})
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) AddRemoteHost(host string) {
|
|
p.AddRemote(host)
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) AddRemote(host string) (Host, error) {
|
|
if host == "" {
|
|
return nil, fmt.Errorf("host is empty")
|
|
}
|
|
if host == p.hostname {
|
|
return nil, fmt.Errorf("same host, this should not happen")
|
|
}
|
|
p.remoteMu.RLock()
|
|
existing, found := p.remoteHosts[host]
|
|
p.remoteMu.RUnlock()
|
|
if found {
|
|
return existing, nil
|
|
}
|
|
|
|
remote, err := p.spawnHost(host)
|
|
if err != nil {
|
|
log.Printf("AddRemote %s failed: %v", host, err)
|
|
|
|
return nil, err
|
|
}
|
|
p.remoteMu.Lock()
|
|
p.remoteHosts[host] = remote
|
|
p.remoteMu.Unlock()
|
|
// connectedRemotes.Set(float64(p.RemoteCount()))
|
|
|
|
log.Printf("Connected to remote host %s", host)
|
|
go p.pingLoop(remote)
|
|
go p.initializeRemote(remote)
|
|
go p.SendNegotiation()
|
|
return remote, nil
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) initializeRemote(remote Host) {
|
|
|
|
remotesIds := remote.GetActorIds()
|
|
|
|
p.remoteMu.Lock()
|
|
for _, id := range remotesIds {
|
|
p.localMu.Lock()
|
|
delete(p.grains, id)
|
|
p.localMu.Unlock()
|
|
if _, exists := p.remoteOwners[id]; !exists {
|
|
p.remoteOwners[id] = remote
|
|
}
|
|
}
|
|
p.remoteMu.Unlock()
|
|
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) RemoveHost(host string) {
|
|
p.remoteMu.Lock()
|
|
remote, exists := p.remoteHosts[host]
|
|
|
|
if exists {
|
|
go remote.Close()
|
|
delete(p.remoteHosts, host)
|
|
}
|
|
count := 0
|
|
for id, owner := range p.remoteOwners {
|
|
if owner.Name() == host {
|
|
count++
|
|
delete(p.remoteOwners, id)
|
|
}
|
|
}
|
|
log.Printf("Removing host %s, grains: %d", host, count)
|
|
p.remoteMu.Unlock()
|
|
|
|
if exists {
|
|
remote.Close()
|
|
}
|
|
// connectedRemotes.Set(float64(p.RemoteCount()))
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) RemoteCount() int {
|
|
p.remoteMu.RLock()
|
|
defer p.remoteMu.RUnlock()
|
|
return len(p.remoteHosts)
|
|
}
|
|
|
|
// RemoteHostNames returns a snapshot of connected remote host identifiers.
|
|
func (p *SimpleGrainPool[V]) RemoteHostNames() []string {
|
|
p.remoteMu.RLock()
|
|
defer p.remoteMu.RUnlock()
|
|
hosts := make([]string, 0, len(p.remoteHosts))
|
|
for host := range p.remoteHosts {
|
|
hosts = append(hosts, host)
|
|
}
|
|
return hosts
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) IsKnown(host string) bool {
|
|
if host == p.hostname {
|
|
return true
|
|
}
|
|
p.remoteMu.RLock()
|
|
defer p.remoteMu.RUnlock()
|
|
_, ok := p.remoteHosts[host]
|
|
return ok
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) pingLoop(remote Host) {
|
|
remote.Ping()
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
defer ticker.Stop()
|
|
for range ticker.C {
|
|
if !remote.Ping() {
|
|
if !remote.IsHealthy() {
|
|
log.Printf("Remote %s unhealthy, removing", remote.Name())
|
|
p.Close()
|
|
p.RemoveHost(remote.Name())
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) IsHealthy() bool {
|
|
p.remoteMu.RLock()
|
|
defer p.remoteMu.RUnlock()
|
|
for _, r := range p.remoteHosts {
|
|
if !r.IsHealthy() {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) Negotiate(otherHosts []string) {
|
|
|
|
for _, host := range otherHosts {
|
|
if host != p.hostname {
|
|
p.remoteMu.RLock()
|
|
_, ok := p.remoteHosts[host]
|
|
p.remoteMu.RUnlock()
|
|
if !ok {
|
|
go p.AddRemote(host)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) SendNegotiation() {
|
|
//negotiationCount.Inc()
|
|
|
|
p.remoteMu.RLock()
|
|
hosts := make([]string, 0, len(p.remoteHosts)+1)
|
|
hosts = append(hosts, p.hostname)
|
|
remotes := make([]Host, 0, len(p.remoteHosts))
|
|
for h, r := range p.remoteHosts {
|
|
hosts = append(hosts, h)
|
|
remotes = append(remotes, r)
|
|
}
|
|
p.remoteMu.RUnlock()
|
|
|
|
p.forAllHosts(func(remote Host) {
|
|
knownByRemote, err := remote.Negotiate(hosts)
|
|
|
|
if err != nil {
|
|
log.Printf("Negotiate with %s failed: %v", remote.Name(), err)
|
|
return
|
|
}
|
|
for _, h := range knownByRemote {
|
|
if !p.IsKnown(h) {
|
|
go p.AddRemote(h)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) forAllHosts(fn func(Host)) {
|
|
p.remoteMu.RLock()
|
|
rh := maps.Clone(p.remoteHosts)
|
|
p.remoteMu.RUnlock()
|
|
|
|
wg := sync.WaitGroup{}
|
|
for _, host := range rh {
|
|
|
|
wg.Go(func() { fn(host) })
|
|
|
|
}
|
|
|
|
for name, host := range rh {
|
|
if !host.IsHealthy() {
|
|
host.Close()
|
|
p.remoteMu.Lock()
|
|
delete(p.remoteHosts, name)
|
|
p.remoteMu.Unlock()
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) broadcastOwnership(ids []uint64) {
|
|
if len(ids) == 0 {
|
|
return
|
|
}
|
|
|
|
p.forAllHosts(func(rh Host) {
|
|
rh.AnnounceOwnership(p.hostname, ids)
|
|
})
|
|
log.Printf("%s taking ownership of %d ids", p.hostname, len(ids))
|
|
// go p.statsUpdate()
|
|
}
|
|
|
|
func (p *SimpleGrainPool[V]) getOrClaimGrain(id uint64) (Grain[V], error) {
|
|
p.localMu.RLock()
|
|
grain, exists := p.grains[id]
|
|
p.localMu.RUnlock()
|
|
if exists && grain != nil {
|
|
return grain, nil
|
|
}
|
|
|
|
grain, err := p.spawn(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p.localMu.Lock()
|
|
p.grains[id] = grain
|
|
p.localMu.Unlock()
|
|
go p.broadcastOwnership([]uint64{id})
|
|
return grain, nil
|
|
}
|
|
|
|
// // ErrNotOwner is returned when a cart belongs to another host.
|
|
// var ErrNotOwner = fmt.Errorf("not owner")
|
|
|
|
// Apply applies a mutation to a grain.
|
|
func (p *SimpleGrainPool[V]) Apply(id uint64, mutation ...proto.Message) (*MutationResult[*V], error) {
|
|
grain, err := p.getOrClaimGrain(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mutations, err := p.mutationRegistry.Apply(grain, mutation...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if p.storage != nil {
|
|
go func() {
|
|
if err := p.storage.AppendMutations(id, mutation...); err != nil {
|
|
log.Printf("failed to store mutation for grain %d: %v", id, err)
|
|
}
|
|
}()
|
|
}
|
|
for _, listener := range p.listeners {
|
|
go listener.AppendMutations(id, mutations...)
|
|
}
|
|
result, err := grain.GetCurrentState()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &MutationResult[*V]{
|
|
Result: result,
|
|
Mutations: mutations,
|
|
}, nil
|
|
}
|
|
|
|
// Get returns the current state of a grain.
|
|
func (p *SimpleGrainPool[V]) Get(id uint64) (*V, error) {
|
|
grain, err := p.getOrClaimGrain(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return grain.GetCurrentState()
|
|
}
|
|
|
|
// OwnerHost reports the remote owner (if any) for the supplied cart id.
|
|
func (p *SimpleGrainPool[V]) OwnerHost(id uint64) (Host, bool) {
|
|
p.remoteMu.RLock()
|
|
defer p.remoteMu.RUnlock()
|
|
owner, ok := p.remoteOwners[id]
|
|
return owner, ok
|
|
}
|
|
|
|
// Hostname returns the local hostname (pod IP).
|
|
func (p *SimpleGrainPool[V]) Hostname() string {
|
|
return p.hostname
|
|
}
|
|
|
|
// Close notifies remotes that this host is shutting down.
|
|
func (p *SimpleGrainPool[V]) Close() {
|
|
|
|
p.forAllHosts(func(rh Host) {
|
|
rh.Close()
|
|
})
|
|
|
|
if p.purgeTicker != nil {
|
|
p.purgeTicker.Stop()
|
|
}
|
|
}
|