package main import ( "fmt" "log" "net" "strings" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/yudhasubki/netpool" "k8s.io/apimachinery/pkg/watch" ) type Quorum interface { Negotiate(knownHosts []string) ([]string, error) OwnerChanged(CartId, host string) error } type HealthHandler interface { IsHealthy() bool } type SyncedPool struct { Server *GenericListener mu sync.RWMutex discardedHostHandler *DiscardedHostHandler Hostname string local *GrainLocalPool remotes map[string]*RemoteHost remoteIndex map[CartId]*RemoteGrain } var ( negotiationCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_remote_negotiation_total", Help: "The total number of remote negotiations", }) grainSyncCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_sync_total", Help: "The total number of grain owner changes", }) connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_connected_remotes", Help: "The number of connected remotes", }) remoteLookupCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_remote_lookup_total", Help: "The total number of remote lookups", }) packetQueue = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_packet_queue_size", Help: "The total number of packets in the queue", }) packetsSent = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_pool_packets_sent_total", Help: "The total number of packets sent", }) packetsReceived = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_pool_packets_received_total", Help: "The total number of packets received", }) ) func (p *SyncedPool) PongHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error { resultChan <- MakeFrameWithPayload(Pong, 200, []byte{}) return nil } func (p *SyncedPool) GetCartIdHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error { ids := make([]string, 0, len(p.local.grains)) p.mu.RLock() defer p.mu.RUnlock() for id := range p.local.grains { if p.local.grains[id] == nil { continue } s := id.String() if s == "" { continue } ids = append(ids, s) } log.Printf("Returning %d cart ids\n", len(ids)) resultChan <- MakeFrameWithPayload(CartIdsResponse, 200, []byte(strings.Join(ids, ";"))) return nil } func (p *SyncedPool) NegotiateHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error { negotiationCount.Inc() log.Printf("Handling negotiation\n") for _, host := range p.ExcludeKnown(strings.Split(string(data.Payload), ";")) { if host == "" { continue } go p.AddRemote(host) } p.mu.RLock() defer p.mu.RUnlock() hosts := make([]string, 0, len(p.remotes)) for _, r := range p.remotes { if r.IsHealthy() { hosts = append(hosts, r.Host) } } resultChan <- MakeFrameWithPayload(RemoteNegotiateResponse, 200, []byte(strings.Join(hosts, ";"))) return nil } func (p *SyncedPool) GrainOwnerChangeHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error { grainSyncCount.Inc() idAndHostParts := strings.Split(string(data.Payload), ";") if len(idAndHostParts) != 2 { log.Printf("Invalid remote grain change message") resultChan <- MakeFrameWithPayload(AckError, 500, []byte("invalid")) return nil } id := ToCartId(idAndHostParts[0]) host := idAndHostParts[1] log.Printf("Handling remote grain owner change to %s for id %s", host, id) for _, r := range p.remotes { if r.Host == host && r.IsHealthy() { go p.SpawnRemoteGrain(id, host) break } } go p.AddRemote(host) resultChan <- MakeFrameWithPayload(AckChange, 200, []byte("ok")) return nil } func (p *SyncedPool) RemoveRemoteGrain(id CartId) { p.mu.Lock() defer p.mu.Unlock() delete(p.remoteIndex, id) } func (p *SyncedPool) SpawnRemoteGrain(id CartId, host string) { if id.String() == "" { log.Printf("Invalid grain id, %s", id) return } p.mu.RLock() localGrain, ok := p.local.grains[id] p.mu.RUnlock() if ok && localGrain != nil { log.Printf("Grain %s already exists locally, owner is (%s)", id, host) p.mu.Lock() delete(p.local.grains, id) p.mu.Unlock() } go func(i CartId, h string) { var pool netpool.Netpooler p.mu.RLock() for _, r := range p.remotes { if r.Host == h { pool = r.HostPool break } } p.mu.RUnlock() if pool == nil { log.Printf("Error spawning remote grain, no pool for %s", h) return } remoteGrain := NewRemoteGrain(i, h, pool) p.mu.Lock() p.remoteIndex[i] = remoteGrain p.mu.Unlock() }(id, host) } func (p *SyncedPool) HandleHostError(host string) { p.mu.RLock() defer p.mu.RUnlock() for _, r := range p.remotes { if r.Host == host { if !r.IsHealthy() { go p.RemoveHost(r) } return } } } func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) { listen := fmt.Sprintf("%s:1338", hostname) conn := NewConnection(listen, nil) server, err := conn.Listen() if err != nil { return nil, err } log.Printf("Listening on %s", listen) dh := NewDiscardedHostHandler(1338) pool := &SyncedPool{ Server: server, Hostname: hostname, local: local, discardedHostHandler: dh, remotes: make(map[string]*RemoteHost), remoteIndex: make(map[CartId]*RemoteGrain), } dh.SetReconnectHandler(pool.AddRemote) server.AddHandler(Ping, pool.PongHandler) server.AddHandler(GetCartIds, pool.GetCartIdHandler) server.AddHandler(RemoteNegotiate, pool.NegotiateHandler) server.AddHandler(RemoteGrainChanged, pool.GrainOwnerChangeHandler) server.AddHandler(Closing, pool.HostTerminatingHandler) if discovery != nil { go func() { time.Sleep(time.Second * 5) log.Printf("Starting discovery") ch, err := discovery.Watch() if err != nil { log.Printf("Error discovering hosts: %v", err) return } for chng := range ch { if chng.Host == "" { continue } known := pool.IsKnown(chng.Host) if chng.Type != watch.Deleted && !known { log.Printf("Discovered host %s, waiting for startup", chng.Host) time.Sleep(3 * time.Second) pool.AddRemote(chng.Host) } else if chng.Type == watch.Deleted && known { log.Printf("Host removed %s, removing from index", chng.Host) for _, r := range pool.remotes { if r.Host == chng.Host { pool.RemoveHost(r) break } } } } }() } else { log.Printf("No discovery, waiting for remotes to connect") } return pool, nil } func (p *SyncedPool) HostTerminatingHandler(data *FrameWithPayload, resultChan chan<- FrameWithPayload) error { log.Printf("Remote host terminating") host := string(data.Payload) p.mu.RLock() defer p.mu.RUnlock() for _, r := range p.remotes { if r.Host == host { go p.RemoveHost(r) break } } resultChan <- MakeFrameWithPayload(Pong, 200, []byte("ok")) return nil } func (p *SyncedPool) IsHealthy() bool { for _, r := range p.remotes { if !r.IsHealthy() { return false } } return true } func (p *SyncedPool) IsKnown(host string) bool { for _, r := range p.remotes { if r.Host == host { return true } } return host == p.Hostname } func (p *SyncedPool) ExcludeKnown(hosts []string) []string { ret := make([]string, 0, len(hosts)) for _, h := range hosts { if !p.IsKnown(h) { ret = append(ret, h) } } return ret } func (p *SyncedPool) RemoveHost(host *RemoteHost) { p.mu.Lock() delete(p.remotes, host.Host) p.mu.Unlock() p.RemoveHostMappedCarts(host) p.discardedHostHandler.AppendHost(host.Host) connectedRemotes.Set(float64(len(p.remotes))) } func (p *SyncedPool) RemoveHostMappedCarts(host *RemoteHost) { p.mu.Lock() defer p.mu.Unlock() for id, r := range p.remoteIndex { if r.Host == host.Host { delete(p.remoteIndex, id) } } } const ( RemoteNegotiate = FrameType(3) RemoteGrainChanged = FrameType(4) AckChange = FrameType(5) AckError = FrameType(6) Ping = FrameType(7) Pong = FrameType(8) GetCartIds = FrameType(9) CartIdsResponse = FrameType(10) RemoteNegotiateResponse = FrameType(11) Closing = FrameType(12) ) func (p *SyncedPool) Negotiate() { knownHosts := make([]string, 0, len(p.remotes)+1) for _, r := range p.remotes { knownHosts = append(knownHosts, r.Host) } knownHosts = append([]string{p.Hostname}, knownHosts...) for _, r := range p.remotes { hosts, err := r.Negotiate(knownHosts) if err != nil { log.Printf("Error negotiating with %s: %v\n", r.Host, err) return } for _, h := range hosts { if !p.IsKnown(h) { p.AddRemote(h) } } } } func (p *SyncedPool) GetHealthyRemotes() []*RemoteHost { p.mu.RLock() defer p.mu.RUnlock() remotes := make([]*RemoteHost, 0, len(p.remotes)) for _, r := range p.remotes { if r.IsHealthy() { remotes = append(remotes, r) } } return remotes } func (p *SyncedPool) RequestOwnership(id CartId) error { ok := 0 all := 0 for _, r := range p.GetHealthyRemotes() { err := r.ConfirmChange(id, p.Hostname) all++ if err != nil { if !r.IsHealthy() { log.Printf("Ownership: Removing host, unable to communicate with %s", r.Host) p.RemoveHost(r) all-- } else { log.Printf("Error confirming change: %v from %s\n", err, p.Hostname) } continue } //log.Printf("Remote confirmed change %s\n", r.Host) ok++ } if (all < 3 && ok < all) || ok < (all/2) { p.removeLocalGrain(id) return fmt.Errorf("quorum not reached") } return nil } func (p *SyncedPool) removeLocalGrain(id CartId) { p.mu.Lock() defer p.mu.Unlock() delete(p.local.grains, id) } func (p *SyncedPool) AddRemote(host string) { p.mu.Lock() defer p.mu.Unlock() _, hasHost := p.remotes[host] if host == "" || hasHost || host == p.Hostname { return } host_pool, err := netpool.New(func() (net.Conn, error) { return net.Dial("tcp", fmt.Sprintf("%s:1338", host)) }, netpool.WithMaxPool(128), netpool.WithMinPool(0)) if err != nil { log.Printf("Error creating host pool: %v\n", err) return } client := NewConnection(fmt.Sprintf("%s:1338", host), host_pool) pings := 3 for pings >= 0 { _, err = client.Call(Ping, nil) if err != nil { log.Printf("Ping failed when adding %s, trying %d more times\n", host, pings) pings-- time.Sleep(time.Millisecond * 300) continue } break } log.Printf("Connected to remote %s", host) cart_pool, err := netpool.New(func() (net.Conn, error) { return net.Dial("tcp", fmt.Sprintf("%s:1337", host)) }, netpool.WithMaxPool(128), netpool.WithMinPool(0)) if err != nil { log.Printf("Error creating grain pool: %v\n", err) return } remote := RemoteHost{ HostPool: cart_pool, Connection: client, MissedPings: 0, Host: host, } p.remotes[host] = &remote connectedRemotes.Set(float64(len(p.remotes))) go p.HandlePing(&remote) go remote.Initialize(p) } func (p *SyncedPool) HandlePing(remote *RemoteHost) { for range time.Tick(time.Second * 3) { err := remote.Ping() for err != nil { time.Sleep(time.Millisecond * 200) if !remote.IsHealthy() { log.Printf("Removing host, unable to communicate with %s", remote.Host) p.RemoveHost(remote) return } err = remote.Ping() } } } func (p *SyncedPool) getGrain(id CartId) (Grain, error) { var err error p.mu.RLock() defer p.mu.RUnlock() localGrain, ok := p.local.grains[id] if !ok { // check if remote grain exists remoteGrain, ok := p.remoteIndex[id] if ok { remoteLookupCount.Inc() return remoteGrain, nil } go p.RequestOwnership(id) // if err != nil { // log.Printf("Error requesting ownership: %v\n", err) // return nil, err // } localGrain, err = p.local.GetGrain(id) if err != nil { return nil, err } } return localGrain, nil } func (p *SyncedPool) Close() { p.mu.Lock() defer p.mu.Unlock() payload := []byte(p.Hostname) for _, r := range p.remotes { go r.Call(Closing, payload) } } func (p *SyncedPool) Process(id CartId, messages ...Message) (*FrameWithPayload, error) { pool, err := p.getGrain(id) var res *FrameWithPayload if err != nil { return nil, err } for _, m := range messages { res, err = pool.HandleMessage(&m, false) if err != nil { return nil, err } } return res, nil } func (p *SyncedPool) Get(id CartId) (*FrameWithPayload, error) { grain, err := p.getGrain(id) if err != nil { return nil, err } return grain.GetCurrentState() }