Files
go-cart-actor/synced-pool.go
matst80 8bbe3a6f51
All checks were successful
Build and Publish / BuildAndDeployAmd64 (push) Successful in 29s
Build and Publish / BuildAndDeploy (push) Successful in 2m26s
return hosts not ok
2024-11-14 08:36:04 +01:00

480 lines
11 KiB
Go

package main
import (
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"k8s.io/apimachinery/pkg/watch"
)
type Quorum interface {
Negotiate(knownHosts []string) ([]string, error)
OwnerChanged(CartId, host string) error
}
type HealthHandler interface {
IsHealthy() bool
}
type SyncedPool struct {
Server *GenericListener
mu sync.RWMutex
Hostname string
local *GrainLocalPool
remotes map[string]*RemoteHost
remoteIndex map[CartId]*RemoteGrain
}
var (
negotiationCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_remote_negotiation_total",
Help: "The total number of remote negotiations",
})
grainSyncCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_sync_total",
Help: "The total number of grain owner changes",
})
connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_connected_remotes",
Help: "The number of connected remotes",
})
remoteLookupCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_remote_lookup_total",
Help: "The total number of remote lookups",
})
packetQueue = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_packet_queue_size",
Help: "The total number of packets in the queue",
})
packetsSent = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_pool_packets_sent_total",
Help: "The total number of packets sent",
})
packetsReceived = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_pool_packets_received_total",
Help: "The total number of packets received",
})
)
func (p *SyncedPool) PongHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
resultChan <- MakeFrameWithPayload(Pong, 200, []byte{})
return nil
}
func (p *SyncedPool) GetCartIdHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
ids := make([]string, 0, len(p.local.grains))
for id := range p.local.grains {
if p.local.grains[id] == nil {
continue
}
s := id.String()
if s == "" {
continue
}
ids = append(ids, s)
}
log.Printf("Returning %d cart ids\n", len(ids))
resultChan <- MakeFrameWithPayload(CartIdsResponse, 200, []byte(strings.Join(ids, ";")))
return nil
}
func (p *SyncedPool) NegotiateHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
negotiationCount.Inc()
log.Printf("Handling negotiation\n")
for _, host := range p.ExcludeKnown(strings.Split(string(data.Payload), ";")) {
if host == "" {
continue
}
go p.AddRemote(host)
}
p.mu.RLock()
defer p.mu.RUnlock()
hosts := make([]string, 0, len(p.remotes))
for _, r := range p.remotes {
if r.IsHealthy() {
hosts = append(hosts, r.Host)
}
}
resultChan <- MakeFrameWithPayload(RemoteNegotiateResponse, 200, []byte(strings.Join(hosts, ";")))
return nil
}
func (p *SyncedPool) GrainOwnerChangeHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error {
grainSyncCount.Inc()
idAndHostParts := strings.Split(string(data.Payload), ";")
if len(idAndHostParts) != 2 {
log.Printf("Invalid remote grain change message\n")
resultChan <- MakeFrameWithPayload(AckError, 500, []byte("invalid"))
return nil
}
id := ToCartId(idAndHostParts[0])
host := idAndHostParts[1]
log.Printf("Handling remote grain owner change to %s for id %s\n", host, id)
for _, r := range p.remotes {
if r.Host == host && r.IsHealthy() {
go p.SpawnRemoteGrain(id, host)
break
}
}
go p.AddRemote(host)
resultChan <- MakeFrameWithPayload(AckChange, 200, []byte("ok"))
return nil
}
func (p *SyncedPool) RemoveRemoteGrain(id CartId) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.remoteIndex, id)
}
func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) {
if id.String() == "" {
log.Printf("Invalid grain id, %s\n", id)
return
}
if p.local.grains[id] != nil {
log.Printf("Grain %s already exists locally, owner is (%s)\n", id, host)
p.mu.Lock()
delete(p.local.grains, id)
p.mu.Unlock()
}
remote, err := NewRemoteGrain(id, host)
if err != nil {
log.Printf("Error creating remote grain %v\n", err)
return
}
// go func() {
// <-remote.Died
// p.RemoveRemoteGrain(id)
// p.HandleHostError(host)
// log.Printf("Remote grain %s died, host: %s\n", id.String(), host)
// }()
p.mu.Lock()
p.remoteIndex[id] = remote
p.mu.Unlock()
}
func (p *SyncedPool) HandleHostError(host string) {
for _, r := range p.remotes {
if r.Host == host {
if !r.IsHealthy() {
p.RemoveHost(r)
}
return
}
}
}
func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) {
listen := fmt.Sprintf("%s:1338", hostname)
conn := NewConnection(listen)
server, err := conn.Listen()
if err != nil {
return nil, err
}
log.Printf("Listening on %s", listen)
pool := &SyncedPool{
Server: server,
Hostname: hostname,
local: local,
remotes: make(map[string]*RemoteHost),
remoteIndex: make(map[CartId]*RemoteGrain),
}
server.AddHandler(Ping, pool.PongHandler)
server.AddHandler(GetCartIds, pool.GetCartIdHandler)
server.AddHandler(RemoteNegotiate, pool.NegotiateHandler)
server.AddHandler(RemoteGrainChanged, pool.GrainOwnerChangeHandler)
if discovery != nil {
go func() {
time.Sleep(time.Second * 5)
log.Printf("Starting discovery")
ch, err := discovery.Watch()
if err != nil {
log.Printf("Error discovering hosts: %v", err)
return
}
for chng := range ch {
if chng.Host == "" {
continue
}
known := pool.IsKnown(chng.Host)
if chng.Type != watch.Deleted && !known {
log.Printf("Discovered host %s, waiting for startup", chng.Host)
time.Sleep(3 * time.Second)
pool.AddRemote(chng.Host)
} else if chng.Type == watch.Deleted && known {
log.Printf("Host removed %s, removing from index", chng.Host)
for _, r := range pool.remotes {
if r.Host == chng.Host {
pool.RemoveHost(r)
break
}
}
}
}
}()
} else {
log.Printf("No discovery, waiting for remotes to connect")
}
return pool, nil
}
func (p *SyncedPool) IsHealthy() bool {
for _, r := range p.remotes {
if !r.IsHealthy() {
return false
}
}
return true
}
func (p *SyncedPool) IsKnown(host string) bool {
for _, r := range p.remotes {
if r.Host == host {
return true
}
}
return host == p.Hostname
}
func (p *SyncedPool) ExcludeKnown(hosts []string) []string {
ret := make([]string, 0, len(hosts))
for _, h := range hosts {
if !p.IsKnown(h) {
ret = append(ret, h)
}
}
return ret
}
func (p *SyncedPool) RemoveHost(host *RemoteHost) {
p.mu.Lock()
delete(p.remotes, host.Host)
p.mu.Unlock()
p.RemoveHostMappedCarts(host)
connectedRemotes.Set(float64(len(p.remotes)))
}
func (p *SyncedPool) RemoveHostMappedCarts(host *RemoteHost) {
p.mu.Lock()
defer p.mu.Unlock()
for id, r := range p.remoteIndex {
if r.Host == host.Host {
delete(p.remoteIndex, id)
}
}
}
const (
RemoteNegotiate = FrameType(3)
RemoteGrainChanged = FrameType(4)
AckChange = FrameType(5)
AckError = FrameType(6)
Ping = FrameType(7)
Pong = FrameType(8)
GetCartIds = FrameType(9)
CartIdsResponse = FrameType(10)
RemoteNegotiateResponse = FrameType(11)
)
func (p *SyncedPool) Negotiate() {
knownHosts := make([]string, 0, len(p.remotes)+1)
for _, r := range p.remotes {
knownHosts = append(knownHosts, r.Host)
}
knownHosts = append([]string{p.Hostname}, knownHosts...)
for _, r := range p.remotes {
hosts, err := r.Negotiate(knownHosts)
if err != nil {
log.Printf("Error negotiating with %s: %v\n", r.Host, err)
return
}
for _, h := range hosts {
if !p.IsKnown(h) {
p.AddRemote(h)
}
}
}
}
func (p *SyncedPool) GetHealthyRemotes() []*RemoteHost {
// p.mu.RLock()
// defer p.mu.RUnlock()
remotes := make([]*RemoteHost, 0, len(p.remotes))
for _, r := range p.remotes {
if r.IsHealthy() {
remotes = append(remotes, r)
}
}
return remotes
}
func (p *SyncedPool) RequestOwnership(id CartId) error {
ok := 0
all := 0
for _, r := range p.GetHealthyRemotes() {
if !r.IsHealthy() {
continue
}
//log.Printf("Asking for confirmation change of %s to %s (me) with %s\n", id, p.Hostname, r.Host)
err := r.ConfirmChange(id, p.Hostname)
all++
if err != nil {
if !r.IsHealthy() {
log.Printf("Ownership: Removing host, unable to communicate with %s", r.Host)
p.RemoveHost(r)
all--
} else {
log.Printf("Error confirming change: %v from %s\n", err, p.Hostname)
}
continue
}
//log.Printf("Remote confirmed change %s\n", r.Host)
ok++
}
if (all < 3 && ok < all) || ok < (all/2) {
p.removeLocalGrain(id)
return fmt.Errorf("quorum not reached")
}
return nil
}
func (p *SyncedPool) removeLocalGrain(id CartId) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.local.grains, id)
}
func (p *SyncedPool) AddRemote(host string) {
_, hasHost := p.remotes[host]
if host == "" || hasHost || host == p.Hostname {
return
}
client := NewConnection(fmt.Sprintf("%s:1338", host))
var r *FrameWithPayload
var err error
pings := 3
for pings >= 0 {
r, err = client.Call(Ping, nil)
if err != nil {
log.Printf("Ping failed when adding %s, trying %d more times\n", host, pings)
pings--
time.Sleep(time.Millisecond * 300)
continue
}
break
}
log.Printf("Connected to remote %s: %v\n", host, r)
remote := RemoteHost{
Connection: client,
MissedPings: 0,
Host: host,
}
p.mu.Lock()
p.remotes[host] = &remote
p.mu.Unlock()
go func() {
for range time.Tick(time.Second * 3) {
err := remote.Ping()
for err != nil {
time.Sleep(time.Millisecond * 200)
if !remote.IsHealthy() {
log.Printf("Removing host, unable to communicate with %s", host)
p.RemoveHost(&remote)
return
}
err = remote.Ping()
}
}
}()
connectedRemotes.Set(float64(len(p.remotes)))
log.Printf("Added remote %s\n", remote.Host)
go remote.Initialize(p)
return
}
func (p *SyncedPool) getGrain(id CartId) (Grain, error) {
var err error
p.mu.RLock()
defer p.mu.RUnlock()
localGrain, ok := p.local.grains[id]
if !ok {
// check if remote grain exists
remoteGrain, ok := p.remoteIndex[id]
if ok {
remoteLookupCount.Inc()
return remoteGrain, nil
}
go p.RequestOwnership(id)
// if err != nil {
// log.Printf("Error requesting ownership: %v\n", err)
// return nil, err
// }
localGrain, err = p.local.GetGrain(id)
if err != nil {
return nil, err
}
}
return localGrain, nil
}
func (p *SyncedPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) {
pool, err := p.getGrain(id)
var res *FrameWithPayload
if err != nil {
return nil, err
}
for _, m := range messages {
res, err = pool.HandleMessage(&m, false)
if err != nil {
return nil, err
}
}
return res, nil
}
func (p *SyncedPool) Get(id CartId) (*FrameWithPayload, error) {
grain, err := p.getGrain(id)
if err != nil {
return nil, err
}
return grain.GetCurrentState()
}