package main import ( "fmt" "log" "strings" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) type Quorum interface { Negotiate(knownHosts []string) ([]string, error) OwnerChanged(CartId, host string) error } type RemoteHost struct { *Client Host string MissedPings int Pool *RemoteGrainPool } type SyncedPool struct { *Server mu sync.RWMutex //Discovery Discovery Hostname string local *GrainLocalPool remotes []*RemoteHost remoteIndex map[CartId]*RemoteGrainPool } var ( negotiationCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_remote_negotiation_total", Help: "The total number of remote negotiations", }) grainSyncCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_grain_sync_total", Help: "The total number of grain owner changes", }) connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_connected_remotes", Help: "The number of connected remotes", }) remoteLookupCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_remote_lookup_total", Help: "The total number of remote lookups", }) packetQueue = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_packet_queue_size", Help: "The total number of packets in the queue", }) packetsSent = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_pool_packets_sent_total", Help: "The total number of packets sent", }) packetsReceived = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_pool_packets_received_total", Help: "The total number of packets received", }) ) func (p *SyncedPool) PongHandler(data []byte) (uint16, []byte, error) { return Pong, data, nil } func (p *SyncedPool) GetCartIdHandler(data []byte) (uint16, []byte, error) { ids := make([]string, 0, len(p.local.grains)) for id := range p.local.grains { ids = append(ids, id.String()) } return CartIdsResponse, []byte(strings.Join(ids, ";")), nil } func (p *SyncedPool) NegotiateHandler(data []byte) (uint16, []byte, error) { negotiationCount.Inc() log.Printf("Handling negotiation\n") for _, host := range p.ExcludeKnown(strings.Split(string(data), ";")) { err := p.AddRemote(host) if err != nil { log.Printf("Error adding remote %s: %v\n", host, err) } } return RemoteNegotiateResponse, []byte("ok"), nil } func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint16, []byte, error) { grainSyncCount.Inc() idAndHostParts := strings.Split(string(data), ";") if len(idAndHostParts) != 2 { log.Printf("Invalid remote grain change message\n") return AckChange, []byte("incorrect"), nil } for _, r := range p.remotes { if r.Host == string(idAndHostParts[1]) { log.Printf("Remote grain %s changed to %s\n", idAndHostParts[0], idAndHostParts[1]) p.mu.Lock() if p.local.grains[ToCartId(idAndHostParts[0])] != nil { log.Printf("Grain %s already exists locally, deleting\n", idAndHostParts[0]) delete(p.local.grains, ToCartId(idAndHostParts[0])) } p.remoteIndex[ToCartId(idAndHostParts[0])] = r.Pool p.mu.Unlock() return AckChange, []byte("ok"), nil } } return AckChange, []byte("not found"), nil } func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) { listen := fmt.Sprintf("%s:1338", hostname) server, err := Listen(listen) if err != nil { return nil, err } log.Printf("Listening on %s", listen) pool := &SyncedPool{ Server: server, //Discovery: discovery, Hostname: hostname, local: local, remotes: make([]*RemoteHost, 0), remoteIndex: make(map[CartId]*RemoteGrainPool), } server.HandleCall(Ping, pool.PongHandler) server.HandleCall(GetCartIds, pool.GetCartIdHandler) server.HandleCall(RemoteNegotiate, pool.NegotiateHandler) server.HandleCall(RemoteGrainChanged, pool.GrainOwnerChangeHandler) // // TODO FIX THIS, ONLY CLIENT OR SERVER SHOULD PING // go func() { // for { // for range time.Tick(time.Second * 2) { // for _, r := range pool.remotes { // err := DoPing(r) // if err != nil { // r.MissedPings++ // log.Printf("Error pinging remote %s: %v\n, missed pings: %d", r.Host, err, r.MissedPings) // if r.MissedPings > 3 { // log.Printf("Removing remote %s\n", r.Host) // go pool.RemoveHost(r) // //pool.remotes = append(pool.remotes[:i], pool.remotes[i+1:]...) // } // } else { // r.MissedPings = 0 // } // } // connectedRemotes.Set(float64(len(pool.remotes))) // } // } // }() if discovery != nil { go func() { time.Sleep(time.Second * 5) log.Printf("Starting discovery") ch, err := discovery.Watch() if err != nil { log.Printf("Error discovering hosts: %v", err) return } for host := range ch { if pool.IsKnown(host) || host == "" { continue } go func(h string) { log.Printf("Discovered host %s, waiting for startup", h) time.Sleep(time.Second) err := pool.AddRemote(h) if err != nil { log.Printf("Error adding remote %s: %v", h, err) } }(host) } }() } else { log.Printf("No discovery, waiting for remotes to connect") } return pool, nil } func (p *SyncedPool) IsKnown(host string) bool { for _, r := range p.remotes { if r.Host == host { return true } } return host == p.Hostname } func (p *SyncedPool) ExcludeKnown(hosts []string) []string { ret := make([]string, 0, len(hosts)) for _, h := range hosts { if !p.IsKnown(h) { ret = append(ret, h) } } return ret } func (p *SyncedPool) RemoveHost(host *RemoteHost) { for i, r := range p.remotes { if r == host { p.RemoveHostMappedCarts(r) p.remotes = append(p.remotes[:i], p.remotes[i+1:]...) connectedRemotes.Set(float64(len(p.remotes))) return } } } func (p *SyncedPool) RemoveHostMappedCarts(host *RemoteHost) { p.mu.Lock() defer p.mu.Unlock() for id, r := range p.remoteIndex { if r == host.Pool { p.remoteIndex[id].Delete(id) delete(p.remoteIndex, id) } } } const ( RemoteNegotiate = uint16(3) RemoteGrainChanged = uint16(4) AckChange = uint16(5) //AckError = uint16(6) Ping = uint16(7) Pong = uint16(8) GetCartIds = uint16(9) CartIdsResponse = uint16(10) RemoteNegotiateResponse = uint16(11) ) func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) { data, err := h.Call(RemoteNegotiate, RemoteNegotiateResponse, []byte(strings.Join(knownHosts, ";"))) if err != nil { return nil, err } return strings.Split(string(data), ";"), nil } func (g *RemoteHost) GetCartMappings() ([]CartId, error) { data, err := g.Call(GetCartIds, CartIdsResponse, nil) if err != nil { return nil, err } parts := strings.Split(string(data), ";") ids := make([]CartId, 0, len(parts)) for _, p := range parts { ids = append(ids, ToCartId(p)) } return ids, nil } func (p *SyncedPool) Negotiate(knownHosts []string) ([]string, error) { allHosts := make(map[string]struct{}, 0) for _, r := range p.remotes { hosts, err := r.Negotiate(knownHosts) if err != nil { return nil, err } for _, h := range hosts { allHosts[h] = struct{}{} } } ret := make([]string, 0, len(allHosts)) for h := range allHosts { ret = append(ret, h) } return ret, nil } func (r *RemoteHost) ConfirmChange(id CartId, host string) error { data, err := r.Call(RemoteGrainChanged, AckChange, []byte(fmt.Sprintf("%s;%s", id, host))) if err != nil { return err } if string(data) != "ok" { return fmt.Errorf("remote grain change failed %s", string(data)) } return nil } func (p *SyncedPool) RequestOwnership(id CartId) error { for _, r := range p.remotes { log.Printf("Confirming change of %s to %s (me) with %s\n", id, p.Hostname, r.Host) err := r.ConfirmChange(id, p.Hostname) if err != nil { log.Printf("Error confirming change: %v from %s\n", err, p.Hostname) return err } } return nil } func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) error { known := make([]string, 0, len(p.remotes)) for _, r := range p.remotes { known = append(known, r.Host) if r.Host == address { log.Printf("Remote %s already exists\n", address) return fmt.Errorf("remote %s already exists", address) } } p.remotes = append(p.remotes, remote) connectedRemotes.Set(float64(len(p.remotes))) log.Printf("Added remote %s\n", remote.Host) go func() { p.Negotiate(known) ids, err := remote.GetCartMappings() if err != nil { log.Printf("Error getting remote mappings: %v\n", err) return } p.mu.Lock() for _, id := range ids { if p.local.grains[id] != nil { log.Printf("Grain %s already exists locally, deleting\n", id) delete(p.local.grains, id) } p.remoteIndex[id] = remote.Pool } p.mu.Unlock() }() return nil } func (p *SyncedPool) AddRemote(address string) error { if address == "" || p.IsKnown(address) { return nil } client, err := Dial(fmt.Sprintf("%s:1338", address)) if err != nil { log.Printf("Error connecting to remote %s: %v\n", address, err) return err } pool := NewRemoteGrainPool(address) remote := RemoteHost{ Client: client, Pool: pool, Host: address, } go func() { for range client.Errors { if client.ErrorCount > 3 { p.RemoveHost(&remote) } } }() return p.addRemoteHost(address, &remote) } func (p *SyncedPool) getGrainPool(id CartId) (GrainPool, error) { _, ok := p.local.grains[id] if !ok { // check if remote grain exists p.mu.RLock() remotePool, ok := p.remoteIndex[id] p.mu.RUnlock() if ok { if remotePool == nil { p.remoteIndex[id].Delete(id) p.mu.Lock() delete(p.remoteIndex, id) p.mu.Unlock() return nil, fmt.Errorf("remote pool is nil for %v", id) } remoteLookupCount.Inc() return remotePool, nil } if !ok { err := p.RequestOwnership(id) if err != nil { return nil, err } } } return p.local, nil } func (p *SyncedPool) Process(id CartId, messages ...Message) ([]byte, error) { pool, err := p.getGrainPool(id) if err != nil { return nil, err } return pool.Process(id, messages...) } func (p *SyncedPool) Get(id CartId) ([]byte, error) { pool, err := p.getGrainPool(id) if err != nil { return nil, err } return pool.Get(id) }