diff --git a/cart b/cart new file mode 100755 index 0000000..e77343b Binary files /dev/null and b/cart differ diff --git a/cart-grain-pool.go b/cart-grain-pool.go deleted file mode 100644 index 1da0173..0000000 --- a/cart-grain-pool.go +++ /dev/null @@ -1,543 +0,0 @@ -package main - -import ( - "fmt" - "log" - "maps" - "reflect" - "sync" - "time" - - "git.tornberg.me/go-cart-actor/pkg/actor" - "git.tornberg.me/go-cart-actor/pkg/discovery" - "git.tornberg.me/go-cart-actor/pkg/proxy" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "k8s.io/apimachinery/pkg/watch" -) - -// --------------------------------------------------------------------------- -// Metrics shared by the cart pool implementation. -// --------------------------------------------------------------------------- - -var ( - poolGrains = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cart_grains_in_pool", - Help: "The total number of grains in the local pool", - }) - poolSize = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cart_pool_size", - Help: "Configured capacity of the cart pool", - }) - poolUsage = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cart_grain_pool_usage", - Help: "Current utilisation of the cart pool", - }) - negotiationCount = promauto.NewCounter(prometheus.CounterOpts{ - Name: "cart_remote_negotiation_total", - Help: "The total number of remote host negotiations", - }) - connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cart_connected_remotes", - Help: "Number of connected remote hosts", - }) - cartMutationsTotal = promauto.NewCounter(prometheus.CounterOpts{ - Name: "cart_mutations_total", - Help: "Total number of cart state mutations applied", - }) - cartMutationFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{ - Name: "cart_mutation_failures_total", - Help: "Total number of failed cart state mutations", - }) - cartMutationLatencySeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Name: "cart_mutation_latency_seconds", - Help: "Latency of cart mutations in seconds", - Buckets: prometheus.DefBuckets, - }, []string{"mutation"}) -) - -// GrainPool is the interface exposed to HTTP handlers and other subsystems. - -// CartPool merges the responsibilities that previously belonged to -// GrainLocalPool and SyncedPool. It provides local grain storage together -// with cluster coordination, ownership negotiation and expiry signalling. -type CartPool struct { - // Local grain state ----------------------------------------------------- - localMu sync.RWMutex - grains map[uint64]*CartGrain - - spawn func(id CartId) (*CartGrain, error) - ttl time.Duration - poolSize int - - // Cluster coordination -------------------------------------------------- - hostname string - remoteMu sync.RWMutex - remoteOwners map[uint64]*proxy.RemoteHost - remoteHosts map[string]*proxy.RemoteHost - //discardedHostHandler *DiscardedHostHandler - - // House-keeping --------------------------------------------------------- - purgeTicker *time.Ticker -} - -// NewCartPool constructs a unified pool. Discovery may be nil for standalone -// deployments. -func NewCartPool(size int, ttl time.Duration, hostname string, spawn func(id CartId) (*CartGrain, error), hostWatch discovery.Discovery) (*CartPool, error) { - p := &CartPool{ - grains: make(map[uint64]*CartGrain), - - spawn: spawn, - ttl: ttl, - poolSize: size, - hostname: hostname, - remoteOwners: make(map[uint64]*proxy.RemoteHost), - remoteHosts: make(map[string]*proxy.RemoteHost), - } - - p.purgeTicker = time.NewTicker(time.Minute) - go func() { - for range p.purgeTicker.C { - p.purge() - } - }() - - if hostWatch != nil { - go p.startDiscovery(hostWatch) - } else { - log.Printf("No discovery configured; expecting manual AddRemote or static host injection") - } - - return p, nil -} - -func (p *CartPool) purge() { - purgeLimit := time.Now().Add(-p.ttl) - purgedIds := make([]uint64, 0, len(p.grains)) - p.localMu.Lock() - for id, grain := range p.grains { - if grain.GetLastAccess().Before(purgeLimit) { - purgedIds = append(purgedIds, id) - - delete(p.grains, id) - } - } - p.localMu.Unlock() - p.forAllHosts(func(remote *proxy.RemoteHost) { - remote.AnnounceExpiry(purgedIds) - }) - -} - -// startDiscovery subscribes to cluster events and adds/removes hosts. -func (p *CartPool) startDiscovery(discovery discovery.Discovery) { - time.Sleep(3 * time.Second) // allow gRPC server startup - log.Printf("Starting discovery watcher") - ch, err := discovery.Watch() - if err != nil { - log.Printf("Discovery error: %v", err) - return - } - for evt := range ch { - if evt.Host == "" { - continue - } - switch evt.Type { - case watch.Deleted: - if p.IsKnown(evt.Host) { - p.RemoveHost(evt.Host) - } - default: - if !p.IsKnown(evt.Host) { - log.Printf("Discovered host %s", evt.Host) - p.AddRemote(evt.Host) - } - } - } -} - -// --------------------------------------------------------------------------- -// Local grain management -// --------------------------------------------------------------------------- - -func (p *CartPool) statsUpdate() { - p.localMu.RLock() - size := len(p.grains) - cap := p.poolSize - p.localMu.RUnlock() - poolGrains.Set(float64(size)) - poolSize.Set(float64(cap)) - if cap > 0 { - poolUsage.Set(float64(size) / float64(cap)) - } -} - -// LocalUsage returns the number of resident grains and configured capacity. -func (p *CartPool) LocalUsage() (int, int) { - p.localMu.RLock() - defer p.localMu.RUnlock() - return len(p.grains), p.poolSize -} - -// LocalCartIDs returns the currently owned cart ids (for control-plane RPCs). -func (p *CartPool) GetLocalIds() []uint64 { - p.localMu.RLock() - defer p.localMu.RUnlock() - ids := make([]uint64, 0, len(p.grains)) - for _, g := range p.grains { - if g == nil { - continue - } - ids = append(ids, uint64(g.GetId())) - } - return ids -} - -func (p *CartPool) HandleRemoteExpiry(host string, ids []uint64) error { - p.remoteMu.Lock() - defer p.remoteMu.Unlock() - for _, id := range ids { - delete(p.remoteOwners, id) - } - return nil -} - -func (p *CartPool) HandleOwnershipChange(host string, ids []uint64) error { - - p.remoteMu.RLock() - remoteHost, exists := p.remoteHosts[host] - p.remoteMu.RUnlock() - if !exists { - createdHost, err := p.AddRemote(host) - if err != nil { - return err - } - remoteHost = createdHost - } - p.remoteMu.Lock() - defer p.remoteMu.Unlock() - p.localMu.Lock() - defer p.localMu.Unlock() - for _, id := range ids { - log.Printf("Handling ownership change for cart %d to host %s", id, host) - delete(p.grains, id) - p.remoteOwners[id] = remoteHost - } - return nil -} - -// SnapshotGrains returns a copy of the currently resident grains keyed by id. -func (p *CartPool) SnapshotGrains() map[uint64]*CartGrain { - p.localMu.RLock() - defer p.localMu.RUnlock() - out := maps.Clone(p.grains) - - return out -} - -// func (p *CartPool) getLocalGrain(key uint64) (*CartGrain, error) { - -// grainLookups.Inc() - -// p.localMu.RLock() -// grain, ok := p.grains[key] -// p.localMu.RUnlock() -// if grain != nil && ok { -// return grain, nil -// } - -// go p.statsUpdate() -// return grain, nil -// } - -// --------------------------------------------------------------------------- -// Cluster ownership and coordination -// --------------------------------------------------------------------------- - -func (p *CartPool) TakeOwnership(id uint64) { - - if p.grains[id] != nil { - return - } - log.Printf("taking ownership of: %d", id) - p.broadcastOwnership([]uint64{id}) -} - -func (p *CartPool) AddRemote(host string) (*proxy.RemoteHost, error) { - if host == "" || host == p.hostname || p.IsKnown(host) { - return nil, fmt.Errorf("invalid host") - } - - remote, err := proxy.NewRemoteHost(host) - if err != nil { - log.Printf("AddRemote: NewRemoteHostGRPC %s failed: %v", host, err) - - return nil, err - } - p.remoteMu.Lock() - p.remoteHosts[host] = remote - p.remoteMu.Unlock() - connectedRemotes.Set(float64(p.RemoteCount())) - - log.Printf("Connected to remote host %s", host) - go p.pingLoop(remote) - go p.initializeRemote(remote) - go p.SendNegotiation() - return remote, nil -} - -func (p *CartPool) initializeRemote(remote *proxy.RemoteHost) { - - remotesIds := remote.GetActorIds() - - p.remoteMu.Lock() - for _, id := range remotesIds { - p.localMu.Lock() - delete(p.grains, id) - p.localMu.Unlock() - if _, exists := p.remoteOwners[id]; !exists { - p.remoteOwners[id] = remote - } - } - p.remoteMu.Unlock() - -} - -func (p *CartPool) RemoveHost(host string) { - p.remoteMu.Lock() - remote, exists := p.remoteHosts[host] - if exists { - go remote.Close() - delete(p.remoteHosts, host) - } - for id, owner := range p.remoteOwners { - if owner.Host == host { - delete(p.remoteOwners, id) - } - } - p.remoteMu.Unlock() - - if exists { - remote.Close() - } - connectedRemotes.Set(float64(p.RemoteCount())) -} - -func (p *CartPool) RemoteCount() int { - p.remoteMu.RLock() - defer p.remoteMu.RUnlock() - return len(p.remoteHosts) -} - -// RemoteHostNames returns a snapshot of connected remote host identifiers. -func (p *CartPool) RemoteHostNames() []string { - p.remoteMu.RLock() - defer p.remoteMu.RUnlock() - hosts := make([]string, 0, len(p.remoteHosts)) - for host := range p.remoteHosts { - hosts = append(hosts, host) - } - return hosts -} - -func (p *CartPool) IsKnown(host string) bool { - if host == p.hostname { - return true - } - p.remoteMu.RLock() - defer p.remoteMu.RUnlock() - _, ok := p.remoteHosts[host] - return ok -} - -func (p *CartPool) pingLoop(remote *proxy.RemoteHost) { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - for range ticker.C { - if !remote.Ping() { - if !remote.IsHealthy() { - log.Printf("Remote %s unhealthy, removing", remote.Host) - p.Close() - p.RemoveHost(remote.Host) - return - } - continue - } - } -} - -func (p *CartPool) IsHealthy() bool { - p.remoteMu.RLock() - defer p.remoteMu.RUnlock() - for _, r := range p.remoteHosts { - if !r.IsHealthy() { - return false - } - } - return true -} - -func (p *CartPool) Negotiate(otherHosts []string) { - - for _, host := range otherHosts { - if host != p.hostname { - p.remoteMu.RLock() - _, ok := p.remoteHosts[host] - p.remoteMu.RUnlock() - if !ok { - go p.AddRemote(host) - } - } - } -} - -func (p *CartPool) SendNegotiation() { - negotiationCount.Inc() - - p.remoteMu.RLock() - hosts := make([]string, 0, len(p.remoteHosts)+1) - hosts = append(hosts, p.hostname) - remotes := make([]*proxy.RemoteHost, 0, len(p.remoteHosts)) - for h, r := range p.remoteHosts { - hosts = append(hosts, h) - remotes = append(remotes, r) - } - p.remoteMu.RUnlock() - - p.forAllHosts(func(remote *proxy.RemoteHost) { - knownByRemote, err := remote.Negotiate(hosts) - - if err != nil { - log.Printf("Negotiate with %s failed: %v", remote.Host, err) - return - } - for _, h := range knownByRemote { - if !p.IsKnown(h) { - go p.AddRemote(h) - } - } - }) -} - -func (p *CartPool) forAllHosts(fn func(*proxy.RemoteHost)) { - p.remoteMu.RLock() - rh := maps.Clone(p.remoteHosts) - p.remoteMu.RUnlock() - - wg := sync.WaitGroup{} - for _, host := range rh { - - wg.Go(func() { fn(host) }) - - } - - for name, host := range rh { - if !host.IsHealthy() { - host.Close() - p.remoteMu.Lock() - delete(p.remoteHosts, name) - p.remoteMu.Unlock() - } - - } -} - -func (p *CartPool) broadcastOwnership(ids []uint64) { - if len(ids) == 0 { - return - } - - p.forAllHosts(func(rh *proxy.RemoteHost) { - rh.AnnounceOwnership(ids) - }) - -} - -func (p *CartPool) getOrClaimGrain(id uint64) (*CartGrain, error) { - p.localMu.RLock() - grain, exists := p.grains[id] - p.localMu.RUnlock() - if exists && grain != nil { - return grain, nil - } - - grain, err := p.spawn(CartId(id)) - if err != nil { - return nil, err - } - p.localMu.Lock() - p.grains[id] = grain - p.localMu.Unlock() - go p.broadcastOwnership([]uint64{id}) - return grain, nil -} - -// ErrNotOwner is returned when a cart belongs to another host. -var ErrNotOwner = fmt.Errorf("not owner") - -// Apply applies a mutation to a grain. -func (p *CartPool) Apply(id uint64, mutation any) (*CartGrain, error) { - grain, err := p.getOrClaimGrain(id) - if err != nil { - return nil, err - } - start := time.Now() - result, applyErr := grain.Apply(mutation, false) - mutationType := "unknown" - if mutation != nil { - if t := reflect.TypeOf(mutation); t != nil { - if t.Kind() == reflect.Pointer { - t = t.Elem() - } - if t.Name() != "" { - mutationType = t.Name() - } - } - } - cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds()) - - if applyErr == nil && result != nil { - cartMutationsTotal.Inc() - - } else if applyErr != nil { - cartMutationFailuresTotal.Inc() - } - - return result, applyErr -} - -// Get returns the current state of a grain. -func (p *CartPool) Get(id uint64) (*CartGrain, error) { - grain, err := p.getOrClaimGrain(id) - if err != nil { - return nil, err - } - return grain.GetCurrentState() -} - -// OwnerHost reports the remote owner (if any) for the supplied cart id. -func (p *CartPool) OwnerHost(id uint64) (actor.Host, bool) { - p.remoteMu.RLock() - defer p.remoteMu.RUnlock() - owner, ok := p.remoteOwners[id] - return owner, ok -} - -// Hostname returns the local hostname (pod IP). -func (p *CartPool) Hostname() string { - return p.hostname -} - -// Close notifies remotes that this host is shutting down. -func (p *CartPool) Close() { - p.remoteMu.Lock() - defer p.remoteMu.Unlock() - for _, r := range p.remoteHosts { - go func(rh *proxy.RemoteHost) { - rh.Close() - }(r) - } - if p.purgeTicker != nil { - p.purgeTicker.Stop() - } -} diff --git a/amqp-order-handler.go b/cmd/cart/amqp-order-handler.go similarity index 100% rename from amqp-order-handler.go rename to cmd/cart/amqp-order-handler.go diff --git a/cart-grain.go b/cmd/cart/cart-grain.go similarity index 99% rename from cart-grain.go rename to cmd/cart/cart-grain.go index 277cfb8..a4ad103 100644 --- a/cart-grain.go +++ b/cmd/cart/cart-grain.go @@ -76,8 +76,8 @@ type CartGrain struct { PaymentStatus string `json:"paymentStatus,omitempty"` } -func (c *CartGrain) GetId() CartId { - return c.Id +func (c *CartGrain) GetId() uint64 { + return uint64(c.Id) } func (c *CartGrain) GetLastChange() time.Time { @@ -222,7 +222,6 @@ func GetTaxAmount(total int64, tax int) int64 { } func (c *CartGrain) Apply(content interface{}, isReplay bool) (*CartGrain, error) { - grainMutations.Inc() updated, err := ApplyRegistered(c, content) if err != nil { diff --git a/cart_id.go b/cmd/cart/cart_id.go similarity index 100% rename from cart_id.go rename to cmd/cart/cart_id.go diff --git a/cart_id_test.go b/cmd/cart/cart_id_test.go similarity index 100% rename from cart_id_test.go rename to cmd/cart/cart_id_test.go diff --git a/checkout_builder.go b/cmd/cart/checkout_builder.go similarity index 100% rename from checkout_builder.go rename to cmd/cart/checkout_builder.go diff --git a/disk-storage.go b/cmd/cart/disk-storage.go similarity index 100% rename from disk-storage.go rename to cmd/cart/disk-storage.go diff --git a/event_log.go b/cmd/cart/event_log.go similarity index 100% rename from event_log.go rename to cmd/cart/event_log.go diff --git a/klarna-client.go b/cmd/cart/klarna-client.go similarity index 100% rename from klarna-client.go rename to cmd/cart/klarna-client.go diff --git a/klarna-types.go b/cmd/cart/klarna-types.go similarity index 100% rename from klarna-types.go rename to cmd/cart/klarna-types.go diff --git a/main.go b/cmd/cart/main.go similarity index 78% rename from main.go rename to cmd/cart/main.go index 794bcf5..3ec7c8f 100644 --- a/main.go +++ b/cmd/cart/main.go @@ -15,9 +15,11 @@ import ( "git.tornberg.me/go-cart-actor/pkg/actor" "git.tornberg.me/go-cart-actor/pkg/discovery" messages "git.tornberg.me/go-cart-actor/pkg/messages" + "git.tornberg.me/go-cart-actor/pkg/proxy" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -37,13 +39,13 @@ var ( }) ) -func spawn(id CartId) (*CartGrain, error) { +func spawn(id uint64) (actor.Grain[CartGrain], error) { grainSpawns.Inc() ret := &CartGrain{ lastItemId: 0, lastDeliveryId: 0, Deliveries: []*CartDelivery{}, - Id: id, + Id: CartId(id), Items: []*CartItem{}, TotalPrice: 0, } @@ -53,7 +55,7 @@ func spawn(id CartId) (*CartGrain, error) { // Legacy loadMessages (no-op) retained; then replay append-only event log //_ = loadMessages(ret, id) - err := ReplayCartEvents(ret, id) + err := ReplayCartEvents(ret, CartId(id)) return ret, err } @@ -63,30 +65,13 @@ func init() { } type App struct { - pool *CartPool + pool *actor.SimpleGrainPool[CartGrain] storage *DiskStorage } -// func (a *App) Save() error { -// for id, grain := range a.pool.SnapshotGrains() { -// if grain == nil { -// continue -// } -// if grain.GetLastChange().After(a.storage.LastSaves[uint64(id)]) { - -// err := a.storage.Store(id, grain) -// if err != nil { -// log.Printf("Error saving grain %s: %v\n", id, err) -// } -// } -// } -// return nil -// } - var podIp = os.Getenv("POD_IP") var name = os.Getenv("POD_NAME") var amqpUrl = os.Getenv("AMQP_URL") -var KlarnaInstance = NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) var tpl = ` @@ -127,11 +112,15 @@ func GetDiscovery() discovery.Discovery { } func main() { + controlPlaneConfig := actor.DefaultServerConfig() storage, err := NewDiskStorage(fmt.Sprintf("data/s_%s.gob", name)) if err != nil { log.Printf("Error loading state: %v\n", err) } - pool, err := NewCartPool(2*65535, 15*time.Minute, podIp, spawn, GetDiscovery()) + + pool, err := actor.NewSimpleGrainPool(2*65535, 15*time.Minute, podIp, spawn, func(host string) (actor.Host, error) { + return proxy.NewRemoteHost(host) + }) if err != nil { log.Fatalf("Error creating cart pool: %v\n", err) } @@ -140,31 +129,52 @@ func main() { storage: storage, } - grpcSrv, err := actor.NewControlServer[*CartGrain](":1337", pool) + grpcSrv, err := actor.NewControlServer[*CartGrain](controlPlaneConfig, pool) if err != nil { log.Fatalf("Error starting control plane gRPC server: %v\n", err) } defer grpcSrv.GracefulStop() - // go func() { - // for range time.Tick(time.Minute * 5) { - // err := app.Save() - // if err != nil { - // log.Printf("Error saving: %v\n", err) - // } - // } - // }() + go func(hw discovery.Discovery) { + if hw == nil { + log.Print("No discovery service available") + return + } + ch, err := hw.Watch() + if err != nil { + log.Printf("Discovery error: %v", err) + return + } + for evt := range ch { + if evt.Host == "" { + continue + } + switch evt.Type { + case watch.Deleted: + if pool.IsKnown(evt.Host) { + pool.RemoveHost(evt.Host) + } + default: + if !pool.IsKnown(evt.Host) { + log.Printf("Discovered host %s", evt.Host) + pool.AddRemote(evt.Host) + } + } + } + }(GetDiscovery()) + orderHandler := &AmqpOrderHandler{ Url: amqpUrl, } + klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD")) - syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp)) + syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient) mux := http.NewServeMux() mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve())) // only for local - // mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) { - // syncedPool.AddRemote(r.PathValue("host")) - // }) + mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) { + pool.AddRemote(r.PathValue("host")) + }) // mux.HandleFunc("GET /save", app.HandleSave) //mux.HandleFunc("/", app.RewritePath) mux.HandleFunc("/debug/pprof/", pprof.Index) @@ -221,30 +231,42 @@ func main() { return } cartId := parsed - order, err = syncedServer.CreateOrUpdateCheckout(r.Host, cartId) + syncedServer.ProxyHandler(func(w http.ResponseWriter, r *http.Request, cartId CartId) error { + order, err = syncedServer.CreateOrUpdateCheckout(r.Host, cartId) + if err != nil { + return err + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")") + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, tpl, order.HTMLSnippet) + return nil + })(cartId, w, r) + if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) } + // v2: Apply now returns *CartGrain; order creation handled inside grain (no payload to unmarshal) } else { - order, err = KlarnaInstance.GetOrder(orderId) + order, err = klarnaClient.GetOrder(orderId) if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(err.Error())) return } - + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")") + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, tpl, order.HTMLSnippet) } - w.Header().Set("Content-Type", "text/html; charset=utf-8") - w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")") - w.WriteHeader(http.StatusOK) - fmt.Fprintf(w, tpl, order.HTMLSnippet) + }) mux.HandleFunc("/confirmation/{order_id}", func(w http.ResponseWriter, r *http.Request) { orderId := r.PathValue("order_id") - order, err := KlarnaInstance.GetOrder(orderId) + order, err := klarnaClient.GetOrder(orderId) if err != nil { w.WriteHeader(http.StatusInternalServerError) @@ -304,7 +326,7 @@ func main() { orderId := r.URL.Query().Get("order_id") log.Printf("Order confirmation push: %s", orderId) - order, err := KlarnaInstance.GetOrder(orderId) + order, err := klarnaClient.GetOrder(orderId) if err != nil { log.Printf("Error creating request: %v\n", err) @@ -325,7 +347,7 @@ func main() { w.WriteHeader(http.StatusInternalServerError) return } - err = KlarnaInstance.AcknowledgeOrder(orderId) + err = klarnaClient.AcknowledgeOrder(orderId) if err != nil { log.Printf("Error acknowledging order: %v\n", err) } diff --git a/mutation_add_item.go b/cmd/cart/mutation_add_item.go similarity index 100% rename from mutation_add_item.go rename to cmd/cart/mutation_add_item.go diff --git a/mutation_add_request.go b/cmd/cart/mutation_add_request.go similarity index 100% rename from mutation_add_request.go rename to cmd/cart/mutation_add_request.go diff --git a/mutation_change_quantity.go b/cmd/cart/mutation_change_quantity.go similarity index 100% rename from mutation_change_quantity.go rename to cmd/cart/mutation_change_quantity.go diff --git a/mutation_initialize_checkout.go b/cmd/cart/mutation_initialize_checkout.go similarity index 100% rename from mutation_initialize_checkout.go rename to cmd/cart/mutation_initialize_checkout.go diff --git a/mutation_order_created.go b/cmd/cart/mutation_order_created.go similarity index 100% rename from mutation_order_created.go rename to cmd/cart/mutation_order_created.go diff --git a/mutation_registry.go b/cmd/cart/mutation_registry.go similarity index 100% rename from mutation_registry.go rename to cmd/cart/mutation_registry.go diff --git a/mutation_remove_delivery.go b/cmd/cart/mutation_remove_delivery.go similarity index 100% rename from mutation_remove_delivery.go rename to cmd/cart/mutation_remove_delivery.go diff --git a/mutation_remove_item.go b/cmd/cart/mutation_remove_item.go similarity index 100% rename from mutation_remove_item.go rename to cmd/cart/mutation_remove_item.go diff --git a/mutation_set_cart_items.go b/cmd/cart/mutation_set_cart_items.go similarity index 100% rename from mutation_set_cart_items.go rename to cmd/cart/mutation_set_cart_items.go diff --git a/mutation_set_delivery.go b/cmd/cart/mutation_set_delivery.go similarity index 100% rename from mutation_set_delivery.go rename to cmd/cart/mutation_set_delivery.go diff --git a/mutation_set_pickup_point.go b/cmd/cart/mutation_set_pickup_point.go similarity index 100% rename from mutation_set_pickup_point.go rename to cmd/cart/mutation_set_pickup_point.go diff --git a/pool-server.go b/cmd/cart/pool-server.go similarity index 85% rename from pool-server.go rename to cmd/cart/pool-server.go index 74b5002..a344f40 100644 --- a/pool-server.go +++ b/cmd/cart/pool-server.go @@ -9,18 +9,21 @@ import ( "strconv" "time" + "git.tornberg.me/go-cart-actor/pkg/actor" messages "git.tornberg.me/go-cart-actor/pkg/messages" ) type PoolServer struct { - pod_name string - pool *CartPool + pod_name string + pool actor.GrainPool[*CartGrain] + klarnaClient *KlarnaClient } -func NewPoolServer(pool *CartPool, pod_name string) *PoolServer { +func NewPoolServer(pool actor.GrainPool[*CartGrain], pod_name string, klarnaClient *KlarnaClient) *PoolServer { return &PoolServer{ - pod_name: pod_name, - pool: pool, + pod_name: pod_name, + pool: pool, + klarnaClient: klarnaClient, } } @@ -181,24 +184,24 @@ func (s *PoolServer) HandleAddRequest(w http.ResponseWriter, r *http.Request, id return s.WriteResult(w, reply) } -func (s *PoolServer) HandleConfirmation(w http.ResponseWriter, r *http.Request, id CartId) error { - orderId := r.PathValue("orderId") - if orderId == "" { - return fmt.Errorf("orderId is empty") - } - order, err := KlarnaInstance.GetOrder(orderId) +// func (s *PoolServer) HandleConfirmation(w http.ResponseWriter, r *http.Request, id CartId) error { +// orderId := r.PathValue("orderId") +// if orderId == "" { +// return fmt.Errorf("orderId is empty") +// } +// order, err := KlarnaInstance.GetOrder(orderId) - if err != nil { - return err - } +// if err != nil { +// return err +// } - w.Header().Set("Content-Type", "application/json") - w.Header().Set("X-Pod-Name", s.pod_name) - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Access-Control-Allow-Origin", "*") - w.WriteHeader(http.StatusOK) - return json.NewEncoder(w).Encode(order) -} +// w.Header().Set("Content-Type", "application/json") +// w.Header().Set("X-Pod-Name", s.pod_name) +// w.Header().Set("Cache-Control", "no-cache") +// w.Header().Set("Access-Control-Allow-Origin", "*") +// w.WriteHeader(http.StatusOK) +// return json.NewEncoder(w).Encode(order) +// } func getCurrency(country string) string { if country == "no" { @@ -240,9 +243,9 @@ func (s *PoolServer) CreateOrUpdateCheckout(host string, id CartId) (*CheckoutOr } if grain.OrderReference != "" { - return KlarnaInstance.UpdateOrder(grain.OrderReference, bytes.NewReader(payload)) + return s.klarnaClient.UpdateOrder(grain.OrderReference, bytes.NewReader(payload)) } else { - return KlarnaInstance.CreateOrder(bytes.NewReader(payload)) + return s.klarnaClient.CreateOrder(bytes.NewReader(payload)) } } @@ -255,17 +258,19 @@ func (s *PoolServer) ApplyCheckoutStarted(klarnaOrder *CheckoutOrder, id CartId) }) } -func (s *PoolServer) HandleCheckout(w http.ResponseWriter, r *http.Request, id CartId) error { - klarnaOrder, err := s.CreateOrUpdateCheckout(r.Host, id) - if err != nil { - return err - } +// func (s *PoolServer) HandleCheckout(w http.ResponseWriter, r *http.Request, id CartId) error { +// klarnaOrder, err := s.CreateOrUpdateCheckout(r.Host, id) +// if err != nil { +// return err +// } - s.ApplyCheckoutStarted(klarnaOrder, id) +// s.ApplyCheckoutStarted(klarnaOrder, id) + +// w.Header().Set("Content-Type", "application/json") +// return json.NewEncoder(w).Encode(klarnaOrder) +// } +// - w.Header().Set("Content-Type", "application/json") - return json.NewEncoder(w).Encode(klarnaOrder) -} func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { @@ -391,8 +396,8 @@ func (s *PoolServer) Serve() *http.ServeMux { mux.HandleFunc("POST /delivery", CookieCartIdHandler(s.ProxyHandler(s.HandleSetDelivery))) mux.HandleFunc("DELETE /delivery/{deliveryId}", CookieCartIdHandler(s.ProxyHandler(s.HandleRemoveDelivery))) mux.HandleFunc("PUT /delivery/{deliveryId}/pickupPoint", CookieCartIdHandler(s.ProxyHandler(s.HandleSetPickupPoint))) - mux.HandleFunc("GET /checkout", CookieCartIdHandler(s.ProxyHandler(s.HandleCheckout))) - mux.HandleFunc("GET /confirmation/{orderId}", CookieCartIdHandler(s.ProxyHandler(s.HandleConfirmation))) + //mux.HandleFunc("GET /checkout", CookieCartIdHandler(s.ProxyHandler(s.HandleCheckout))) + //mux.HandleFunc("GET /confirmation/{orderId}", CookieCartIdHandler(s.ProxyHandler(s.HandleConfirmation))) mux.HandleFunc("GET /byid/{id}", CartIdHandler(s.ProxyHandler(s.HandleGet))) mux.HandleFunc("GET /byid/{id}/add/{sku}", CartIdHandler(s.ProxyHandler(s.HandleAddSku))) @@ -402,8 +407,8 @@ func (s *PoolServer) Serve() *http.ServeMux { mux.HandleFunc("POST /byid/{id}/delivery", CartIdHandler(s.ProxyHandler(s.HandleSetDelivery))) mux.HandleFunc("DELETE /byid/{id}/delivery/{deliveryId}", CartIdHandler(s.ProxyHandler(s.HandleRemoveDelivery))) mux.HandleFunc("PUT /byid/{id}/delivery/{deliveryId}/pickupPoint", CartIdHandler(s.ProxyHandler(s.HandleSetPickupPoint))) - mux.HandleFunc("GET /byid/{id}/checkout", CartIdHandler(s.ProxyHandler(s.HandleCheckout))) - mux.HandleFunc("GET /byid/{id}/confirmation", CartIdHandler(s.ProxyHandler(s.HandleConfirmation))) + //mux.HandleFunc("GET /byid/{id}/checkout", CartIdHandler(s.ProxyHandler(s.HandleCheckout))) + //mux.HandleFunc("GET /byid/{id}/confirmation", CartIdHandler(s.ProxyHandler(s.HandleConfirmation))) return mux } diff --git a/product-fetcher.go b/cmd/cart/product-fetcher.go similarity index 100% rename from product-fetcher.go rename to cmd/cart/product-fetcher.go diff --git a/pkg/actor/grain_pool.go b/pkg/actor/grain_pool.go index e2777e3..3a40c95 100644 --- a/pkg/actor/grain_pool.go +++ b/pkg/actor/grain_pool.go @@ -1,6 +1,8 @@ package actor -import "net/http" +import ( + "net/http" +) type GrainPool[V any] interface { Apply(id uint64, mutation any) (V, error) @@ -14,12 +16,19 @@ type GrainPool[V any] interface { GetLocalIds() []uint64 RemoveHost(host string) IsHealthy() bool + IsKnown(string) bool Close() } // Host abstracts a remote node capable of proxying cart requests. type Host interface { + AnnounceExpiry(ids []uint64) + Negotiate(otherHosts []string) ([]string, error) Name() string Proxy(id uint64, w http.ResponseWriter, r *http.Request) (bool, error) GetActorIds() []uint64 + Close() error + Ping() bool + IsHealthy() bool + AnnounceOwnership(ids []uint64) } diff --git a/pkg/actor/grpc_server.go b/pkg/actor/grpc_server.go index d91ce6d..2d46fb4 100644 --- a/pkg/actor/grpc_server.go +++ b/pkg/actor/grpc_server.go @@ -76,15 +76,31 @@ func (s *ControlServer[V]) Closing(ctx context.Context, req *messages.ClosingNot }, nil } +type ServerConfig struct { + Addr string + Options []grpc.ServerOption +} + +func NewServerConfig(addr string, options ...grpc.ServerOption) ServerConfig { + return ServerConfig{ + Addr: addr, + Options: options, + } +} + +func DefaultServerConfig() ServerConfig { + return NewServerConfig(":1337") +} + // StartGRPCServer configures and starts the unified gRPC server on the given address. // It registers both the CartActor and ControlPlane services. -func NewControlServer[V any](addr string, pool GrainPool[V]) (*grpc.Server, error) { - lis, err := net.Listen("tcp", addr) +func NewControlServer[V any](config ServerConfig, pool GrainPool[V]) (*grpc.Server, error) { + lis, err := net.Listen("tcp", config.Addr) if err != nil { return nil, fmt.Errorf("failed to listen: %w", err) } - grpcServer := grpc.NewServer() + grpcServer := grpc.NewServer(config.Options...) server := &ControlServer[V]{ pool: pool, } @@ -92,7 +108,7 @@ func NewControlServer[V any](addr string, pool GrainPool[V]) (*grpc.Server, erro messages.RegisterControlPlaneServer(grpcServer, server) reflection.Register(grpcServer) - log.Printf("gRPC server listening on %s", addr) + log.Printf("gRPC server listening on %s", config.Addr) go func() { if err := grpcServer.Serve(lis); err != nil { log.Fatalf("failed to serve gRPC: %v", err) diff --git a/pkg/actor/simple_grain_pool.go b/pkg/actor/simple_grain_pool.go new file mode 100644 index 0000000..68a8a2b --- /dev/null +++ b/pkg/actor/simple_grain_pool.go @@ -0,0 +1,413 @@ +package actor + +import ( + "fmt" + "log" + "maps" + "sync" + "time" +) + +type SimpleGrainPool[V any] struct { + // fields and methods + localMu sync.RWMutex + grains map[uint64]Grain[V] + + spawn func(id uint64) (Grain[V], error) + spawnHost func(host string) (Host, error) + ttl time.Duration + poolSize int + + // Cluster coordination -------------------------------------------------- + hostname string + remoteMu sync.RWMutex + remoteOwners map[uint64]Host + remoteHosts map[string]Host + //discardedHostHandler *DiscardedHostHandler + + // House-keeping --------------------------------------------------------- + purgeTicker *time.Ticker +} + +func NewSimpleGrainPool[V any](size int, ttl time.Duration, hostname string, spawn func(id uint64) (Grain[V], error), spawnHost func(host string) (Host, error)) (*SimpleGrainPool[V], error) { + p := &SimpleGrainPool[V]{ + grains: make(map[uint64]Grain[V]), + + spawn: spawn, + spawnHost: spawnHost, + ttl: ttl, + poolSize: size, + hostname: hostname, + remoteOwners: make(map[uint64]Host), + remoteHosts: make(map[string]Host), + } + + p.purgeTicker = time.NewTicker(time.Minute) + go func() { + for range p.purgeTicker.C { + p.purge() + } + }() + + return p, nil +} + +func (p *SimpleGrainPool[V]) purge() { + purgeLimit := time.Now().Add(-p.ttl) + purgedIds := make([]uint64, 0, len(p.grains)) + p.localMu.Lock() + for id, grain := range p.grains { + if grain.GetLastAccess().Before(purgeLimit) { + purgedIds = append(purgedIds, id) + + delete(p.grains, id) + } + } + p.localMu.Unlock() + p.forAllHosts(func(remote Host) { + remote.AnnounceExpiry(purgedIds) + }) + +} + +// LocalUsage returns the number of resident grains and configured capacity. +func (p *SimpleGrainPool[V]) LocalUsage() (int, int) { + p.localMu.RLock() + defer p.localMu.RUnlock() + return len(p.grains), p.poolSize +} + +// LocalCartIDs returns the currently owned cart ids (for control-plane RPCs). +func (p *SimpleGrainPool[V]) GetLocalIds() []uint64 { + p.localMu.RLock() + defer p.localMu.RUnlock() + ids := make([]uint64, 0, len(p.grains)) + for _, g := range p.grains { + if g == nil { + continue + } + ids = append(ids, uint64(g.GetId())) + } + return ids +} + +func (p *SimpleGrainPool[V]) HandleRemoteExpiry(host string, ids []uint64) error { + p.remoteMu.Lock() + defer p.remoteMu.Unlock() + for _, id := range ids { + delete(p.remoteOwners, id) + } + return nil +} + +func (p *SimpleGrainPool[V]) HandleOwnershipChange(host string, ids []uint64) error { + + p.remoteMu.RLock() + remoteHost, exists := p.remoteHosts[host] + p.remoteMu.RUnlock() + if !exists { + createdHost, err := p.AddRemote(host) + if err != nil { + return err + } + remoteHost = createdHost + } + p.remoteMu.Lock() + defer p.remoteMu.Unlock() + p.localMu.Lock() + defer p.localMu.Unlock() + for _, id := range ids { + log.Printf("Handling ownership change for cart %d to host %s", id, host) + delete(p.grains, id) + p.remoteOwners[id] = remoteHost + } + return nil +} + +// TakeOwnership takes ownership of a grain. +func (p *SimpleGrainPool[V]) TakeOwnership(id uint64) { + p.broadcastOwnership([]uint64{id}) +} + +func (p *SimpleGrainPool[V]) AddRemote(host string) (Host, error) { + if host == "" || host == p.hostname || p.IsKnown(host) { + return nil, fmt.Errorf("invalid host") + } + + remote, err := p.spawnHost(host) + if err != nil { + log.Printf("AddRemote %s failed: %v", host, err) + + return nil, err + } + p.remoteMu.Lock() + p.remoteHosts[host] = remote + p.remoteMu.Unlock() + // connectedRemotes.Set(float64(p.RemoteCount())) + + log.Printf("Connected to remote host %s", host) + go p.pingLoop(remote) + go p.initializeRemote(remote) + go p.SendNegotiation() + return remote, nil +} + +func (p *SimpleGrainPool[V]) initializeRemote(remote Host) { + + remotesIds := remote.GetActorIds() + + p.remoteMu.Lock() + for _, id := range remotesIds { + p.localMu.Lock() + delete(p.grains, id) + p.localMu.Unlock() + if _, exists := p.remoteOwners[id]; !exists { + p.remoteOwners[id] = remote + } + } + p.remoteMu.Unlock() + +} + +func (p *SimpleGrainPool[V]) RemoveHost(host string) { + p.remoteMu.Lock() + remote, exists := p.remoteHosts[host] + + if exists { + go remote.Close() + delete(p.remoteHosts, host) + } + count := 0 + for id, owner := range p.remoteOwners { + if owner.Name() == host { + count++ + delete(p.remoteOwners, id) + } + } + log.Printf("Removing host %s, grains: %d", host, count) + p.remoteMu.Unlock() + + if exists { + remote.Close() + } + // connectedRemotes.Set(float64(p.RemoteCount())) +} + +func (p *SimpleGrainPool[V]) RemoteCount() int { + p.remoteMu.RLock() + defer p.remoteMu.RUnlock() + return len(p.remoteHosts) +} + +// RemoteHostNames returns a snapshot of connected remote host identifiers. +func (p *SimpleGrainPool[V]) RemoteHostNames() []string { + p.remoteMu.RLock() + defer p.remoteMu.RUnlock() + hosts := make([]string, 0, len(p.remoteHosts)) + for host := range p.remoteHosts { + hosts = append(hosts, host) + } + return hosts +} + +func (p *SimpleGrainPool[V]) IsKnown(host string) bool { + if host == p.hostname { + return true + } + p.remoteMu.RLock() + defer p.remoteMu.RUnlock() + _, ok := p.remoteHosts[host] + return ok +} + +func (p *SimpleGrainPool[V]) pingLoop(remote Host) { + remote.Ping() + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for range ticker.C { + if !remote.Ping() { + if !remote.IsHealthy() { + log.Printf("Remote %s unhealthy, removing", remote.Name()) + p.Close() + p.RemoveHost(remote.Name()) + return + } + continue + } + } +} + +func (p *SimpleGrainPool[V]) IsHealthy() bool { + p.remoteMu.RLock() + defer p.remoteMu.RUnlock() + for _, r := range p.remoteHosts { + if !r.IsHealthy() { + return false + } + } + return true +} + +func (p *SimpleGrainPool[V]) Negotiate(otherHosts []string) { + + for _, host := range otherHosts { + if host != p.hostname { + p.remoteMu.RLock() + _, ok := p.remoteHosts[host] + p.remoteMu.RUnlock() + if !ok { + go p.AddRemote(host) + } + } + } +} + +func (p *SimpleGrainPool[V]) SendNegotiation() { + //negotiationCount.Inc() + + p.remoteMu.RLock() + hosts := make([]string, 0, len(p.remoteHosts)+1) + hosts = append(hosts, p.hostname) + remotes := make([]Host, 0, len(p.remoteHosts)) + for h, r := range p.remoteHosts { + hosts = append(hosts, h) + remotes = append(remotes, r) + } + p.remoteMu.RUnlock() + + p.forAllHosts(func(remote Host) { + knownByRemote, err := remote.Negotiate(hosts) + + if err != nil { + log.Printf("Negotiate with %s failed: %v", remote.Name(), err) + return + } + for _, h := range knownByRemote { + if !p.IsKnown(h) { + go p.AddRemote(h) + } + } + }) +} + +func (p *SimpleGrainPool[V]) forAllHosts(fn func(Host)) { + p.remoteMu.RLock() + rh := maps.Clone(p.remoteHosts) + p.remoteMu.RUnlock() + + wg := sync.WaitGroup{} + for _, host := range rh { + + wg.Go(func() { fn(host) }) + + } + + for name, host := range rh { + if !host.IsHealthy() { + host.Close() + p.remoteMu.Lock() + delete(p.remoteHosts, name) + p.remoteMu.Unlock() + } + + } +} + +func (p *SimpleGrainPool[V]) broadcastOwnership(ids []uint64) { + if len(ids) == 0 { + return + } + + p.forAllHosts(func(rh Host) { + rh.AnnounceOwnership(ids) + }) + log.Printf("taking ownership of %d ids", len(ids)) + // go p.statsUpdate() +} + +func (p *SimpleGrainPool[V]) getOrClaimGrain(id uint64) (Grain[V], error) { + p.localMu.RLock() + grain, exists := p.grains[id] + p.localMu.RUnlock() + if exists && grain != nil { + return grain, nil + } + + grain, err := p.spawn(id) + if err != nil { + return nil, err + } + p.localMu.Lock() + p.grains[id] = grain + p.localMu.Unlock() + go p.broadcastOwnership([]uint64{id}) + return grain, nil +} + +// ErrNotOwner is returned when a cart belongs to another host. +var ErrNotOwner = fmt.Errorf("not owner") + +// Apply applies a mutation to a grain. +func (p *SimpleGrainPool[V]) Apply(id uint64, mutation any) (*V, error) { + grain, err := p.getOrClaimGrain(id) + if err != nil { + return nil, err + } + //start := time.Now() + result, applyErr := grain.Apply(mutation, false) + //mutationType := "unknown" + // if mutation != nil { + // if t := reflect.TypeOf(mutation); t != nil { + // if t.Kind() == reflect.Pointer { + // t = t.Elem() + // } + // if t.Name() != "" { + // mutationType = t.Name() + // } + // } + // } + // cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds()) + + // if applyErr == nil && result != nil { + // cartMutationsTotal.Inc() + + // } else if applyErr != nil { + // cartMutationFailuresTotal.Inc() + // } + + return result, applyErr +} + +// Get returns the current state of a grain. +func (p *SimpleGrainPool[V]) Get(id uint64) (*V, error) { + grain, err := p.getOrClaimGrain(id) + if err != nil { + return nil, err + } + return grain.GetCurrentState() +} + +// OwnerHost reports the remote owner (if any) for the supplied cart id. +func (p *SimpleGrainPool[V]) OwnerHost(id uint64) (Host, bool) { + p.remoteMu.RLock() + defer p.remoteMu.RUnlock() + owner, ok := p.remoteOwners[id] + return owner, ok +} + +// Hostname returns the local hostname (pod IP). +func (p *SimpleGrainPool[V]) Hostname() string { + return p.hostname +} + +// Close notifies remotes that this host is shutting down. +func (p *SimpleGrainPool[V]) Close() { + + p.forAllHosts(func(rh Host) { + rh.Close() + }) + + if p.purgeTicker != nil { + p.purgeTicker.Stop() + } +} diff --git a/pkg/proxy/remotehost.go b/pkg/proxy/remotehost.go index 4225aa8..8f41f99 100644 --- a/pkg/proxy/remotehost.go +++ b/pkg/proxy/remotehost.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "errors" "fmt" "io" "log" @@ -37,24 +38,27 @@ func NewRemoteHost(host string) (*RemoteHost, error) { } controlClient := messages.NewControlPlaneClient(conn) - for retries := range 3 { - ctx, pingCancel := context.WithTimeout(context.Background(), time.Second) - _, pingErr := controlClient.Ping(ctx, &messages.Empty{}) - pingCancel() - if pingErr == nil { - break - } - if retries == 2 { - log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr) - conn.Close() - return nil, pingErr - } - time.Sleep(500 * time.Millisecond) - } + // go func() { + // for retries := range 3 { + // ctx, pingCancel := context.WithTimeout(context.Background(), time.Second) + // _, pingErr := controlClient.Ping(ctx, &messages.Empty{}) + // pingCancel() + // if pingErr == nil { + // break + // } + // if retries == 2 { + // log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr) + // conn.Close() + // p + // } + // time.Sleep(500 * time.Millisecond) + // } + // }() transport := &http.Transport{ MaxIdleConns: 100, MaxIdleConnsPerHost: 100, + DisableKeepAlives: false, IdleConnTimeout: 120 * time.Second, } client := &http.Client{Transport: transport, Timeout: 10 * time.Second} @@ -82,14 +86,19 @@ func (h *RemoteHost) Close() error { } func (h *RemoteHost) Ping() bool { - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - _, err := h.controlClient.Ping(ctx, &messages.Empty{}) - cancel() - if err != nil { - h.MissedPings++ - log.Printf("Ping %s failed (%d) %v", h.Host, h.MissedPings, err) - return false + var err error = errors.ErrUnsupported + for err != nil { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + _, err = h.controlClient.Ping(ctx, &messages.Empty{}) + cancel() + if err != nil { + h.MissedPings++ + log.Printf("Ping %s failed (%d) %v", h.Host, h.MissedPings, err) + } + if !h.IsHealthy() { + return false + } + time.Sleep(time.Millisecond * 200) } h.MissedPings = 0