643 lines
17 KiB
Go
643 lines
17 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
|
|
messages "git.tornberg.me/go-cart-actor/proto"
|
|
proto "git.tornberg.me/go-cart-actor/proto"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
)
|
|
|
|
// SyncedPool coordinates cart grain ownership across nodes using gRPC control plane
|
|
// and cart actor services.
|
|
//
|
|
// Responsibilities:
|
|
// - Local grain access (delegates to GrainLocalPool)
|
|
// - Cluster membership (AddRemote via discovery + negotiation)
|
|
// - Health/ping monitoring & remote removal
|
|
// - (Legacy) ring-based ownership removed in first-touch model
|
|
//
|
|
// Thread-safety: public methods that mutate internal maps lock p.mu (RWMutex).
|
|
type SyncedPool struct {
|
|
LocalHostname string
|
|
local *GrainLocalPool
|
|
|
|
// New ownership tracking (first-touch / announcement model)
|
|
// remoteOwners maps cart id -> owning host (excluding locally owned carts which live in local.grains)
|
|
remoteOwners map[CartId]*RemoteHostGRPC
|
|
|
|
mu sync.RWMutex
|
|
|
|
// Remote host state (gRPC only)
|
|
remoteHosts map[string]*RemoteHostGRPC // host -> remote host
|
|
|
|
// Discovery handler for re-adding hosts after failures
|
|
discardedHostHandler *DiscardedHostHandler
|
|
}
|
|
|
|
// RemoteHostGRPC tracks a remote host's clients & health.
|
|
type RemoteHostGRPC struct {
|
|
Host string
|
|
Conn *grpc.ClientConn
|
|
Transport *http.Transport
|
|
Client *http.Client
|
|
ControlClient proto.ControlPlaneClient
|
|
MissedPings int
|
|
}
|
|
|
|
func (h *RemoteHostGRPC) Name() string {
|
|
return h.Host
|
|
}
|
|
|
|
func (h *RemoteHostGRPC) Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error) {
|
|
|
|
req, err := http.NewRequestWithContext(r.Context(), r.Method, h.Host, r.Body)
|
|
|
|
if err != nil {
|
|
http.Error(w, "proxy build error", http.StatusBadGateway)
|
|
return true, err
|
|
}
|
|
for k, v := range r.Header {
|
|
for _, vv := range v {
|
|
req.Header.Add(k, vv)
|
|
}
|
|
}
|
|
res, err := h.Client.Do(req)
|
|
if err != nil {
|
|
http.Error(w, "proxy request error", http.StatusBadGateway)
|
|
return true, err
|
|
}
|
|
defer res.Body.Close()
|
|
|
|
for k, v := range res.Header {
|
|
for _, vv := range v {
|
|
w.Header().Add(k, vv)
|
|
}
|
|
}
|
|
w.Header().Set("X-Cart-Owner-Routed", "true")
|
|
if res.StatusCode >= 200 && res.StatusCode <= 299 {
|
|
w.WriteHeader(res.StatusCode)
|
|
_, copyErr := io.Copy(w, res.Body)
|
|
if copyErr != nil {
|
|
return true, copyErr
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
return false, fmt.Errorf("proxy response status %d", res.StatusCode)
|
|
|
|
}
|
|
|
|
func (r *RemoteHostGRPC) IsHealthy() bool {
|
|
return r.MissedPings < 3
|
|
}
|
|
|
|
var (
|
|
negotiationCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_remote_negotiation_total",
|
|
Help: "The total number of remote negotiations",
|
|
})
|
|
connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_connected_remotes",
|
|
Help: "The number of connected remotes",
|
|
})
|
|
cartMutationsTotal = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_mutations_total",
|
|
Help: "Total number of cart state mutations applied.",
|
|
})
|
|
cartMutationFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_mutation_failures_total",
|
|
Help: "Total number of failed cart state mutations.",
|
|
})
|
|
cartMutationLatencySeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
|
|
Name: "cart_mutation_latency_seconds",
|
|
Help: "Latency of cart mutations in seconds.",
|
|
Buckets: prometheus.DefBuckets,
|
|
}, []string{"mutation"})
|
|
cartActiveGrains = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_active_grains",
|
|
Help: "Number of active (resident) local grains.",
|
|
})
|
|
)
|
|
|
|
func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) {
|
|
p := &SyncedPool{
|
|
LocalHostname: hostname,
|
|
local: local,
|
|
remoteHosts: make(map[string]*RemoteHostGRPC),
|
|
remoteOwners: make(map[CartId]*RemoteHostGRPC),
|
|
discardedHostHandler: NewDiscardedHostHandler(1338),
|
|
}
|
|
p.discardedHostHandler.SetReconnectHandler(p.AddRemote)
|
|
|
|
if discovery != nil {
|
|
go func() {
|
|
time.Sleep(3 * time.Second) // allow gRPC server startup
|
|
log.Printf("Starting discovery watcher")
|
|
ch, err := discovery.Watch()
|
|
if err != nil {
|
|
log.Printf("Discovery error: %v", err)
|
|
return
|
|
}
|
|
for evt := range ch {
|
|
if evt.Host == "" {
|
|
continue
|
|
}
|
|
switch evt.Type {
|
|
case watch.Deleted:
|
|
if p.IsKnown(evt.Host) {
|
|
p.RemoveHost(evt.Host)
|
|
}
|
|
default:
|
|
if !p.IsKnown(evt.Host) {
|
|
log.Printf("Discovered host %s", evt.Host)
|
|
p.AddRemote(evt.Host)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
} else {
|
|
log.Printf("No discovery configured; expecting manual AddRemote or static host injection")
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
// ------------------------- Remote Host Management -----------------------------
|
|
|
|
// AddRemote dials a remote host and initializes grain proxies.
|
|
func (p *SyncedPool) AddRemote(host string) {
|
|
if host == "" || host == p.LocalHostname {
|
|
return
|
|
}
|
|
|
|
p.mu.Lock()
|
|
if _, exists := p.remoteHosts[host]; exists {
|
|
p.mu.Unlock()
|
|
return
|
|
}
|
|
p.mu.Unlock()
|
|
|
|
target := fmt.Sprintf("%s:1337", host)
|
|
//ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
//defer cancel()
|
|
conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials())) //grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithBlock())
|
|
if err != nil {
|
|
log.Printf("AddRemote: dial %s failed: %v", target, err)
|
|
return
|
|
}
|
|
|
|
controlClient := proto.NewControlPlaneClient(conn)
|
|
|
|
// Health check (Ping) with limited retries
|
|
pings := 3
|
|
for pings > 0 {
|
|
ctxPing, cancelPing := context.WithTimeout(context.Background(), 1*time.Second)
|
|
_, pingErr := controlClient.Ping(ctxPing, &proto.Empty{})
|
|
cancelPing()
|
|
if pingErr == nil {
|
|
break
|
|
}
|
|
pings--
|
|
time.Sleep(200 * time.Millisecond)
|
|
if pings == 0 {
|
|
log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr)
|
|
conn.Close()
|
|
return
|
|
}
|
|
}
|
|
transport := &http.Transport{
|
|
MaxIdleConns: 100, // Maximum idle connections
|
|
MaxIdleConnsPerHost: 100, // Maximum idle connections per host
|
|
IdleConnTimeout: 120 * time.Second, // Timeout for idle connections
|
|
}
|
|
|
|
client := &http.Client{
|
|
Transport: transport,
|
|
Timeout: 10 * time.Second, // Request timeout
|
|
}
|
|
|
|
remote := &RemoteHostGRPC{
|
|
Host: host,
|
|
Conn: conn,
|
|
Transport: transport,
|
|
Client: client,
|
|
ControlClient: controlClient,
|
|
MissedPings: 0,
|
|
}
|
|
|
|
p.mu.Lock()
|
|
p.remoteHosts[host] = remote
|
|
p.mu.Unlock()
|
|
connectedRemotes.Set(float64(p.RemoteCount()))
|
|
// Rebuild consistent hashing ring including this new host
|
|
//p.rebuildRing()
|
|
|
|
log.Printf("Connected to remote host %s", host)
|
|
|
|
go p.pingLoop(remote)
|
|
go p.initializeRemote(remote)
|
|
go p.Negotiate()
|
|
}
|
|
|
|
// initializeRemote fetches remote cart ids and sets up remote grain proxies.
|
|
func (p *SyncedPool) initializeRemote(remote *RemoteHostGRPC) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
reply, err := remote.ControlClient.GetCartIds(ctx, &proto.Empty{})
|
|
if err != nil {
|
|
log.Printf("Init remote %s: GetCartIds error: %v", remote.Host, err)
|
|
return
|
|
}
|
|
count := 0
|
|
// Record remote ownership (first-touch model) instead of spawning remote grain proxies.
|
|
p.mu.Lock()
|
|
for _, cid := range reply.CartIds {
|
|
|
|
// Only set if not already claimed (first claim wins)
|
|
if _, exists := p.remoteOwners[CartId(cid)]; !exists {
|
|
p.remoteOwners[CartId(cid)] = remote
|
|
}
|
|
count++
|
|
}
|
|
p.mu.Unlock()
|
|
log.Printf("Remote %s reported %d remote-owned carts (ownership cached)", remote.Host, count)
|
|
}
|
|
|
|
// RemoveHost removes remote host and its grains.
|
|
func (p *SyncedPool) RemoveHost(host string) {
|
|
p.mu.Lock()
|
|
remote, exists := p.remoteHosts[host]
|
|
if exists {
|
|
delete(p.remoteHosts, host)
|
|
}
|
|
// purge remote ownership entries for this host
|
|
for id, h := range p.remoteOwners {
|
|
if h.Host == host {
|
|
delete(p.remoteOwners, id)
|
|
}
|
|
}
|
|
p.mu.Unlock()
|
|
|
|
if exists {
|
|
remote.Conn.Close()
|
|
}
|
|
connectedRemotes.Set(float64(p.RemoteCount()))
|
|
// Rebuild ring after host removal
|
|
// p.rebuildRing()
|
|
}
|
|
|
|
// RemoteCount returns number of tracked remote hosts.
|
|
func (p *SyncedPool) RemoteCount() int {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
return len(p.remoteHosts)
|
|
}
|
|
|
|
func (p *SyncedPool) IsKnown(host string) bool {
|
|
if host == p.LocalHostname {
|
|
return true
|
|
}
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
_, ok := p.remoteHosts[host]
|
|
return ok
|
|
}
|
|
|
|
func (p *SyncedPool) ExcludeKnown(hosts []string) []string {
|
|
ret := make([]string, 0, len(hosts))
|
|
for _, h := range hosts {
|
|
if !p.IsKnown(h) {
|
|
ret = append(ret, h)
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// ------------------------- Health / Ping -------------------------------------
|
|
|
|
func (p *SyncedPool) pingLoop(remote *RemoteHostGRPC) {
|
|
ticker := time.NewTicker(3 * time.Second)
|
|
defer ticker.Stop()
|
|
for range ticker.C {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
_, err := remote.ControlClient.Ping(ctx, &proto.Empty{})
|
|
cancel()
|
|
if err != nil {
|
|
remote.MissedPings++
|
|
log.Printf("Ping %s failed (%d)", remote.Host, remote.MissedPings)
|
|
if !remote.IsHealthy() {
|
|
log.Printf("Remote %s unhealthy, removing", remote.Host)
|
|
p.RemoveHost(remote.Host)
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
remote.MissedPings = 0
|
|
}
|
|
}
|
|
|
|
func (p *SyncedPool) IsHealthy() bool {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
for _, r := range p.remoteHosts {
|
|
if !r.IsHealthy() {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// ------------------------- Negotiation ---------------------------------------
|
|
|
|
func (p *SyncedPool) Negotiate() {
|
|
negotiationCount.Inc()
|
|
|
|
p.mu.RLock()
|
|
hosts := make([]string, 0, len(p.remoteHosts)+1)
|
|
hosts = append(hosts, p.LocalHostname)
|
|
for h := range p.remoteHosts {
|
|
hosts = append(hosts, h)
|
|
}
|
|
remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
|
|
for _, r := range p.remoteHosts {
|
|
remotes = append(remotes, r)
|
|
}
|
|
p.mu.RUnlock()
|
|
|
|
for _, r := range remotes {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
reply, err := r.ControlClient.Negotiate(ctx, &proto.NegotiateRequest{KnownHosts: hosts})
|
|
cancel()
|
|
if err != nil {
|
|
log.Printf("Negotiate with %s failed: %v", r.Host, err)
|
|
continue
|
|
}
|
|
for _, h := range reply.Hosts {
|
|
if !p.IsKnown(h) {
|
|
p.AddRemote(h)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Ring rebuild removed (first-touch ownership model no longer uses ring)
|
|
}
|
|
|
|
// ------------------------- Grain / Ring Ownership ----------------------------
|
|
|
|
// RemoveRemoteGrain obsolete in first-touch model (no remote grain proxies retained)
|
|
|
|
// SpawnRemoteGrain removed (remote grain proxies eliminated in first-touch model)
|
|
|
|
// GetHealthyRemotes retained (still useful for broadcasting ownership)
|
|
func (p *SyncedPool) GetHealthyRemotes() []*RemoteHostGRPC {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
ret := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
|
|
for _, r := range p.remoteHosts {
|
|
if r.IsHealthy() {
|
|
ret = append(ret, r)
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
func (p *SyncedPool) removeLocalGrain(id CartId) {
|
|
p.mu.Lock()
|
|
delete(p.local.grains, uint64(id))
|
|
p.mu.Unlock()
|
|
}
|
|
|
|
// ------------------------- First-Touch Ownership Resolution ------------------
|
|
|
|
// ErrNotOwner is returned when an operation is attempted on a cart that is
|
|
// owned by a different host (according to first-touch ownership mapping).
|
|
var ErrNotOwner = fmt.Errorf("not owner")
|
|
|
|
// resolveOwnerFirstTouch implements the new semantics:
|
|
// 1. If local grain exists -> local host owns it.
|
|
// 2. Else if remoteOwners has an entry -> return that host.
|
|
// 3. Else: claim locally (spawn), insert into remoteOwners map locally for
|
|
// idempotency, and asynchronously announce ownership to all remotes.
|
|
//
|
|
// NOTE: This does NOT (yet) reconcile conflicting announcements; first claim
|
|
// wins. Later improvements can add tie-break via timestamp or host ordering.
|
|
func (p *SyncedPool) resolveOwnerFirstTouch(id CartId) error {
|
|
// Fast local existence check
|
|
p.local.mu.RLock()
|
|
_, existsLocal := p.local.grains[uint64(id)]
|
|
p.local.mu.RUnlock()
|
|
if existsLocal {
|
|
return nil
|
|
}
|
|
|
|
// Remote ownership map lookup
|
|
p.mu.RLock()
|
|
remoteHost, foundRemote := p.remoteOwners[id]
|
|
p.mu.RUnlock()
|
|
if foundRemote && remoteHost.Host != "" {
|
|
log.Printf("other owner exists %s", remoteHost.Host)
|
|
return nil
|
|
}
|
|
|
|
// Claim: spawn locally
|
|
_, err := p.local.GetGrain(id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Announce asynchronously
|
|
go p.broadcastOwnership([]CartId{id})
|
|
return nil
|
|
}
|
|
|
|
// broadcastOwnership sends an AnnounceOwnership RPC to all healthy remotes.
|
|
// Best-effort: failures are logged and ignored.
|
|
func (p *SyncedPool) broadcastOwnership(ids []CartId) {
|
|
if len(ids) == 0 {
|
|
return
|
|
}
|
|
|
|
uids := make([]uint64, 0, len(ids))
|
|
for _, id := range ids {
|
|
uids = append(uids, uint64(id))
|
|
}
|
|
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
|
|
for _, r := range p.remoteHosts {
|
|
if r.IsHealthy() {
|
|
go func(rh *RemoteHostGRPC) {
|
|
rh.ControlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{
|
|
Host: p.LocalHostname,
|
|
CartIds: uids,
|
|
})
|
|
}(r)
|
|
}
|
|
}
|
|
}
|
|
|
|
// AdoptRemoteOwnership processes an incoming ownership announcement for cart ids.
|
|
func (p *SyncedPool) AdoptRemoteOwnership(host string, ids []string) {
|
|
if host == "" || host == p.LocalHostname {
|
|
return
|
|
}
|
|
remoteHost, ok := p.remoteHosts[host]
|
|
if !ok {
|
|
log.Printf("remote host does not exist!!")
|
|
}
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
for _, s := range ids {
|
|
if s == "" {
|
|
continue
|
|
}
|
|
parsed, ok := ParseCartId(s)
|
|
if !ok {
|
|
continue // skip invalid cart id strings
|
|
}
|
|
id := parsed
|
|
// Do not overwrite if already claimed by another host (first wins).
|
|
if existing, ok := p.remoteOwners[id]; ok && existing != remoteHost {
|
|
continue
|
|
}
|
|
// Skip if we own locally (local wins for our own process)
|
|
p.local.mu.RLock()
|
|
_, localHas := p.local.grains[uint64(id)]
|
|
p.local.mu.RUnlock()
|
|
if localHas {
|
|
continue
|
|
}
|
|
p.remoteOwners[id] = remoteHost
|
|
}
|
|
}
|
|
|
|
// getGrain returns a local grain if this host is (or becomes) the owner under
|
|
// the first-touch model. If another host owns the cart, ErrNotOwner is returned.
|
|
// Remote grain proxy logic and ring-based spawning have been removed.
|
|
func (p *SyncedPool) getGrain(id CartId) (Grain, error) {
|
|
|
|
// Owner is local (either existing or just claimed), fetch/create grain.
|
|
grain, err := p.local.GetGrain(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p.resolveOwnerFirstTouch(id)
|
|
return grain, nil
|
|
}
|
|
|
|
// Apply applies a single mutation to a grain (local or remote).
|
|
// Replication (RF>1) scaffolding: future enhancement will fan-out mutations
|
|
// to replica owners (best-effort) and reconcile quorum on read.
|
|
func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) {
|
|
grain, err := p.getGrain(id)
|
|
if err != nil {
|
|
log.Printf("could not get grain %v", err)
|
|
return nil, err
|
|
}
|
|
// if err == ErrNotOwner {
|
|
// // Remote owner reported but either unreachable or failed earlier in stack.
|
|
// // Takeover strategy: remove remote mapping (first-touch override) and claim locally.
|
|
// p.mu.Lock()
|
|
// delete(p.remoteOwners, id)
|
|
// p.mu.Unlock()
|
|
// if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil {
|
|
// return nil, terr
|
|
// } else if owner == p.LocalHostname {
|
|
// // Fetch (now-local) grain
|
|
// grain, err = p.local.GetGrain(id)
|
|
// if err != nil {
|
|
// return nil, err
|
|
// }
|
|
// } else {
|
|
// // Another host reclaimed before us; treat as not owner.
|
|
// return nil, ErrNotOwner
|
|
// }
|
|
// } else if err != nil {
|
|
// return nil, err
|
|
// }
|
|
|
|
start := time.Now()
|
|
result, applyErr := grain.Apply(mutation, false)
|
|
|
|
// Derive mutation type label (strip pointer)
|
|
mutationType := "unknown"
|
|
if mutation != nil {
|
|
if t := reflect.TypeOf(mutation); t != nil {
|
|
if t.Kind() == reflect.Ptr {
|
|
t = t.Elem()
|
|
}
|
|
if t.Name() != "" {
|
|
mutationType = t.Name()
|
|
}
|
|
}
|
|
}
|
|
cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds())
|
|
|
|
if applyErr == nil && result != nil {
|
|
cartMutationsTotal.Inc()
|
|
//if p.ownerHostFor(id) == p.LocalHostname {
|
|
// Update active grains gauge only for local ownership
|
|
cartActiveGrains.Set(float64(p.local.DebugGrainCount()))
|
|
//}
|
|
} else if applyErr != nil {
|
|
cartMutationFailuresTotal.Inc()
|
|
}
|
|
return result, applyErr
|
|
}
|
|
|
|
// Get returns current state of a grain (local or remote).
|
|
// Future replication hook: Read-repair or quorum read can be added here.
|
|
func (p *SyncedPool) Get(id CartId) (*CartGrain, error) {
|
|
grain, err := p.getGrain(id)
|
|
if err != nil {
|
|
log.Printf("could not get grain %v", err)
|
|
return nil, err
|
|
}
|
|
return grain.GetCurrentState()
|
|
}
|
|
|
|
// Close notifies remotes this host is terminating.
|
|
func (p *SyncedPool) Close() {
|
|
p.mu.RLock()
|
|
remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
|
|
for _, r := range p.remoteHosts {
|
|
remotes = append(remotes, r)
|
|
}
|
|
p.mu.RUnlock()
|
|
|
|
for _, r := range remotes {
|
|
go func(rh *RemoteHostGRPC) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
_, err := rh.ControlClient.Closing(ctx, &proto.ClosingNotice{Host: p.LocalHostname})
|
|
cancel()
|
|
if err != nil {
|
|
log.Printf("Close notify to %s failed: %v", rh.Host, err)
|
|
}
|
|
}(r)
|
|
}
|
|
}
|
|
|
|
// Hostname implements the GrainPool interface, returning this node's hostname.
|
|
func (p *SyncedPool) Hostname() string {
|
|
return p.LocalHostname
|
|
}
|
|
|
|
// OwnerHost returns the primary owning host for a given cart id (ring lookup).
|
|
func (p *SyncedPool) OwnerHost(id CartId) (Host, bool) {
|
|
ownerHost, ok := p.remoteOwners[id]
|
|
return ownerHost, ok
|
|
}
|