Files
go-cart-actor/synced-pool.go
matst80 743849a131
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m52s
health check
2024-11-10 23:20:52 +01:00

489 lines
11 KiB
Go

package main
import (
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type Quorum interface {
Negotiate(knownHosts []string) ([]string, error)
OwnerChanged(CartId, host string) error
}
type HealthHandler interface {
IsHealthy() bool
}
type RemoteHost struct {
*Client
Host string
MissedPings int
//Pool *RemoteGrainPool
}
type SyncedPool struct {
*Server
mu sync.RWMutex
Hostname string
local *GrainLocalPool
remotes []*RemoteHost
remoteIndex map[CartId]*RemoteGrain
}
var (
negotiationCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_remote_negotiation_total",
Help: "The total number of remote negotiations",
})
grainSyncCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_grain_sync_total",
Help: "The total number of grain owner changes",
})
connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_connected_remotes",
Help: "The number of connected remotes",
})
remoteLookupCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_remote_lookup_total",
Help: "The total number of remote lookups",
})
packetQueue = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_packet_queue_size",
Help: "The total number of packets in the queue",
})
packetsSent = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_pool_packets_sent_total",
Help: "The total number of packets sent",
})
packetsReceived = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_pool_packets_received_total",
Help: "The total number of packets received",
})
)
func (p *SyncedPool) PongHandler(data []byte) (uint32, []byte, error) {
return Pong, data, nil
}
func (p *SyncedPool) GetCartIdHandler(data []byte) (uint32, []byte, error) {
ids := make([]string, 0, len(p.local.grains))
for id := range p.local.grains {
ids = append(ids, id.String())
}
return CartIdsResponse, []byte(strings.Join(ids, ";")), nil
}
func (p *SyncedPool) NegotiateHandler(data []byte) (uint32, []byte, error) {
negotiationCount.Inc()
log.Printf("Handling negotiation\n")
for _, host := range p.ExcludeKnown(strings.Split(string(data), ";")) {
err := p.AddRemote(host)
if err != nil {
log.Printf("Error adding remote %s: %v\n", host, err)
}
}
return RemoteNegotiateResponse, []byte("ok"), nil
}
func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint32, []byte, error) {
grainSyncCount.Inc()
idAndHostParts := strings.Split(string(data), ";")
if len(idAndHostParts) != 2 {
log.Printf("Invalid remote grain change message\n")
return AckChange, []byte("incorrect"), nil
}
id := ToCartId(idAndHostParts[0])
host := idAndHostParts[1]
for _, r := range p.remotes {
if r.Host == host {
log.Printf("Remote grain %s changed to %s\n", id, host)
p.mu.Lock()
if p.local.grains[id] != nil {
log.Printf("Grain %s already exists locally, deleting\n", id)
delete(p.local.grains, id)
}
grain, err := NewRemoteGrain(id, r.Host)
if err != nil {
log.Printf("Error creating remote grain %s: %v\n", id, err)
return AckChange, []byte("error"), nil
}
p.remoteIndex[id] = grain
p.mu.Unlock()
return AckChange, []byte("ok"), nil
}
}
return AckChange, []byte("not found"), nil
}
func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) {
listen := fmt.Sprintf("%s:1338", hostname)
server, err := Listen(listen)
if err != nil {
return nil, err
}
log.Printf("Listening on %s", listen)
pool := &SyncedPool{
Server: server,
//Discovery: discovery,
Hostname: hostname,
local: local,
remotes: make([]*RemoteHost, 0),
remoteIndex: make(map[CartId]*RemoteGrain),
}
server.HandleCall(Ping, pool.PongHandler)
server.HandleCall(GetCartIds, pool.GetCartIdHandler)
server.HandleCall(RemoteNegotiate, pool.NegotiateHandler)
server.HandleCall(RemoteGrainChanged, pool.GrainOwnerChangeHandler)
// // TODO FIX THIS, ONLY CLIENT OR SERVER SHOULD PING
// go func() {
// for {
// for range time.Tick(time.Second * 2) {
// for _, r := range pool.remotes {
// err := DoPing(r)
// if err != nil {
// r.MissedPings++
// log.Printf("Error pinging remote %s: %v\n, missed pings: %d", r.Host, err, r.MissedPings)
// if r.MissedPings > 3 {
// log.Printf("Removing remote %s\n", r.Host)
// go pool.RemoveHost(r)
// //pool.remotes = append(pool.remotes[:i], pool.remotes[i+1:]...)
// }
// } else {
// r.MissedPings = 0
// }
// }
// connectedRemotes.Set(float64(len(pool.remotes)))
// }
// }
// }()
if discovery != nil {
go func() {
time.Sleep(time.Second * 5)
log.Printf("Starting discovery")
ch, err := discovery.Watch()
if err != nil {
log.Printf("Error discovering hosts: %v", err)
return
}
for host := range ch {
if pool.IsKnown(host) || host == "" {
continue
}
go func(h string) {
log.Printf("Discovered host %s, waiting for startup", h)
time.Sleep(time.Second)
err := pool.AddRemote(h)
if err != nil {
log.Printf("Error adding remote %s: %v", h, err)
}
}(host)
}
}()
} else {
log.Printf("No discovery, waiting for remotes to connect")
}
return pool, nil
}
func (p *SyncedPool) IsHealthy() bool {
for _, r := range p.remotes {
if r.MissedPings > 3 {
return false
}
}
return true
}
func (p *SyncedPool) IsKnown(host string) bool {
for _, r := range p.remotes {
if r.Host == host {
return true
}
}
return host == p.Hostname
}
func (p *SyncedPool) ExcludeKnown(hosts []string) []string {
ret := make([]string, 0, len(hosts))
for _, h := range hosts {
if !p.IsKnown(h) {
ret = append(ret, h)
}
}
return ret
}
func (p *SyncedPool) RemoveHost(host *RemoteHost) {
for i, r := range p.remotes {
if r == host {
p.RemoveHostMappedCarts(r)
p.remotes = append(p.remotes[:i], p.remotes[i+1:]...)
connectedRemotes.Set(float64(len(p.remotes)))
return
}
}
}
func (p *SyncedPool) RemoveHostMappedCarts(host *RemoteHost) {
p.mu.Lock()
defer p.mu.Unlock()
for id, r := range p.remoteIndex {
if r.Host == host.Host {
delete(p.remoteIndex, id)
}
}
}
const (
RemoteNegotiate = uint32(3)
RemoteGrainChanged = uint32(4)
AckChange = uint32(5)
//AckError = uint32(6)
Ping = uint32(7)
Pong = uint32(8)
GetCartIds = uint32(9)
CartIdsResponse = uint32(10)
RemoteNegotiateResponse = uint32(11)
)
func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) {
data, err := h.Call(RemoteNegotiate, RemoteNegotiateResponse, []byte(strings.Join(knownHosts, ";")))
if err != nil {
return nil, err
}
return strings.Split(string(data), ";"), nil
}
func (g *RemoteHost) GetCartMappings() ([]CartId, error) {
data, err := g.Call(GetCartIds, CartIdsResponse, []byte{})
if err != nil {
return nil, err
}
parts := strings.Split(string(data), ";")
ids := make([]CartId, 0, len(parts))
for _, p := range parts {
ids = append(ids, ToCartId(p))
}
return ids, nil
}
func (p *SyncedPool) Negotiate(knownHosts []string) ([]string, error) {
allHosts := make(map[string]struct{}, 0)
for _, r := range p.remotes {
hosts, err := r.Negotiate(knownHosts)
if err != nil {
return nil, err
}
for _, h := range hosts {
allHosts[h] = struct{}{}
}
}
ret := make([]string, 0, len(allHosts))
for h := range allHosts {
ret = append(ret, h)
}
return ret, nil
}
func (r *RemoteHost) ConfirmChange(id CartId, host string) error {
data, err := r.Call(RemoteGrainChanged, AckChange, []byte(fmt.Sprintf("%s;%s", id, host)))
if err != nil {
return err
}
if string(data) != "ok" {
return fmt.Errorf("remote grain change failed %s", string(data))
}
return nil
}
func (p *SyncedPool) RequestOwnership(id CartId) error {
for _, r := range p.remotes {
log.Printf("Confirming change of %s to %s (me) with %s\n", id, p.Hostname, r.Host)
err := r.ConfirmChange(id, p.Hostname)
if err != nil {
log.Printf("Error confirming change: %v from %s\n", err, p.Hostname)
return err
}
}
return nil
}
func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) error {
known := make([]string, 0, len(p.remotes))
for _, r := range p.remotes {
known = append(known, r.Host)
if r.Host == address {
log.Printf("Remote %s already exists\n", address)
return fmt.Errorf("remote %s already exists", address)
}
}
p.remotes = append(p.remotes, remote)
connectedRemotes.Set(float64(len(p.remotes)))
log.Printf("Added remote %s\n", remote.Host)
go func() {
p.Negotiate(known)
ids, err := remote.GetCartMappings()
if err != nil {
log.Printf("Error getting remote mappings: %v\n", err)
return
}
p.mu.Lock()
for _, id := range ids {
if p.local.grains[id] != nil {
log.Printf("Grain %s already exists locally, deleting\n", id)
delete(p.local.grains, id)
}
grain, err := NewRemoteGrain(id, remote.Host)
if err != nil {
log.Printf("Error creating remote grain %s: %v\n", id, err)
continue
}
p.remoteIndex[id] = grain
}
p.mu.Unlock()
}()
return nil
}
func (h *RemoteHost) Ping() error {
_, err := h.Call(Ping, Pong, []byte{})
if err != nil {
h.MissedPings++
log.Printf("Error pinging remote %s: %v\n, missed pings: %d", h.Host, err, h.MissedPings)
} else {
h.MissedPings = 0
}
return err
}
func (p *SyncedPool) AddRemote(host string) error {
if host == "" || p.IsKnown(host) {
return nil
}
client, err := Dial(fmt.Sprintf("%s:1338", host))
if err != nil {
log.Printf("Error connecting to remote %s: %v\n", host, err)
return err
}
_, err = client.Call(Ping, Pong, []byte{})
if err != nil {
log.Printf("Error pinging remote %s: %v\n", host, err)
return err
}
remote := RemoteHost{
Client: client,
MissedPings: 0,
Host: host,
}
go func() {
for range time.Tick(time.Second * 2) {
var err error
err = remote.Ping()
if err != nil {
for err != nil {
time.Sleep(time.Millisecond * 200)
err = remote.Ping()
if remote.MissedPings > 3 {
log.Printf("Error pinging remote %s: %v\n, missed pings: %d", host, err, remote.MissedPings)
p.RemoveHost(&remote)
return
}
}
}
}
}()
go func() {
for range client.Errors {
if client.ErrorCount > 3 {
log.Printf("Error count exceeded, removing remote %s\n", host)
p.RemoveHost(&remote)
}
}
}()
return p.addRemoteHost(host, &remote)
}
func (p *SyncedPool) getGrain(id CartId) (Grain, error) {
localGrain, ok := p.local.grains[id]
if !ok {
// check if remote grain exists
p.mu.RLock()
remoteGrain, ok := p.remoteIndex[id]
p.mu.RUnlock()
if ok {
remoteLookupCount.Inc()
return remoteGrain, nil
}
err := p.RequestOwnership(id)
if err != nil {
return nil, err
}
localGrain, err = p.local.GetGrain(id)
if err != nil {
return nil, err
}
}
return localGrain, nil
}
func (p *SyncedPool) Process(id CartId, messages ...Message) ([]byte, error) {
pool, err := p.getGrain(id)
var res []byte
if err != nil {
return nil, err
}
for _, m := range messages {
res, err = pool.HandleMessage(&m, false)
if err != nil {
return nil, err
}
}
return res, nil
}
func (p *SyncedPool) Get(id CartId) ([]byte, error) {
grain, err := p.getGrain(id)
if err != nil {
return nil, err
}
if remoteGrain, ok := grain.(*RemoteGrain); ok {
return remoteGrain.GetCurrentState()
}
return json.Marshal(grain)
}