major refactor
This commit is contained in:
251
synced-pool.go
251
synced-pool.go
@@ -3,11 +3,14 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
messages "git.tornberg.me/go-cart-actor/proto"
|
||||
proto "git.tornberg.me/go-cart-actor/proto"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
@@ -31,7 +34,7 @@ type SyncedPool struct {
|
||||
|
||||
// New ownership tracking (first-touch / announcement model)
|
||||
// remoteOwners maps cart id -> owning host (excluding locally owned carts which live in local.grains)
|
||||
remoteOwners map[CartId]string
|
||||
remoteOwners map[CartId]*RemoteHostGRPC
|
||||
|
||||
mu sync.RWMutex
|
||||
|
||||
@@ -46,10 +49,55 @@ type SyncedPool struct {
|
||||
type RemoteHostGRPC struct {
|
||||
Host string
|
||||
Conn *grpc.ClientConn
|
||||
Transport *http.Transport
|
||||
Client *http.Client
|
||||
ControlClient proto.ControlPlaneClient
|
||||
MissedPings int
|
||||
}
|
||||
|
||||
func (h *RemoteHostGRPC) Name() string {
|
||||
return h.Host
|
||||
}
|
||||
|
||||
func (h *RemoteHostGRPC) Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error) {
|
||||
|
||||
req, err := http.NewRequestWithContext(r.Context(), r.Method, h.Host, r.Body)
|
||||
|
||||
if err != nil {
|
||||
http.Error(w, "proxy build error", http.StatusBadGateway)
|
||||
return true, err
|
||||
}
|
||||
for k, v := range r.Header {
|
||||
for _, vv := range v {
|
||||
req.Header.Add(k, vv)
|
||||
}
|
||||
}
|
||||
res, err := h.Client.Do(req)
|
||||
if err != nil {
|
||||
http.Error(w, "proxy request error", http.StatusBadGateway)
|
||||
return true, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
for k, v := range res.Header {
|
||||
for _, vv := range v {
|
||||
w.Header().Add(k, vv)
|
||||
}
|
||||
}
|
||||
w.Header().Set("X-Cart-Owner-Routed", "true")
|
||||
if res.StatusCode >= 200 && res.StatusCode <= 299 {
|
||||
w.WriteHeader(res.StatusCode)
|
||||
_, copyErr := io.Copy(w, res.Body)
|
||||
if copyErr != nil {
|
||||
return true, copyErr
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, fmt.Errorf("proxy response status %d", res.StatusCode)
|
||||
|
||||
}
|
||||
|
||||
func (r *RemoteHostGRPC) IsHealthy() bool {
|
||||
return r.MissedPings < 3
|
||||
}
|
||||
@@ -87,12 +135,10 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery)
|
||||
LocalHostname: hostname,
|
||||
local: local,
|
||||
remoteHosts: make(map[string]*RemoteHostGRPC),
|
||||
remoteOwners: make(map[CartId]string),
|
||||
remoteOwners: make(map[CartId]*RemoteHostGRPC),
|
||||
discardedHostHandler: NewDiscardedHostHandler(1338),
|
||||
}
|
||||
p.discardedHostHandler.SetReconnectHandler(p.AddRemote)
|
||||
// Initialize empty ring (will be rebuilt after first AddRemote or discovery event)
|
||||
p.rebuildRing()
|
||||
|
||||
if discovery != nil {
|
||||
go func() {
|
||||
@@ -143,9 +189,9 @@ func (p *SyncedPool) AddRemote(host string) {
|
||||
p.mu.Unlock()
|
||||
|
||||
target := fmt.Sprintf("%s:1337", host)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
conn, err := grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithBlock())
|
||||
//ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
//defer cancel()
|
||||
conn, err := grpc.NewClient(target) //grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithBlock())
|
||||
if err != nil {
|
||||
log.Printf("AddRemote: dial %s failed: %v", target, err)
|
||||
return
|
||||
@@ -170,11 +216,22 @@ func (p *SyncedPool) AddRemote(host string) {
|
||||
return
|
||||
}
|
||||
}
|
||||
transport := &http.Transport{
|
||||
MaxIdleConns: 100, // Maximum idle connections
|
||||
MaxIdleConnsPerHost: 100, // Maximum idle connections per host
|
||||
IdleConnTimeout: 120 * time.Second, // Timeout for idle connections
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
Timeout: 10 * time.Second, // Request timeout
|
||||
}
|
||||
|
||||
remote := &RemoteHostGRPC{
|
||||
Host: host,
|
||||
Conn: conn,
|
||||
|
||||
Host: host,
|
||||
Conn: conn,
|
||||
Transport: transport,
|
||||
Client: client,
|
||||
ControlClient: controlClient,
|
||||
MissedPings: 0,
|
||||
}
|
||||
@@ -184,7 +241,7 @@ func (p *SyncedPool) AddRemote(host string) {
|
||||
p.mu.Unlock()
|
||||
connectedRemotes.Set(float64(p.RemoteCount()))
|
||||
// Rebuild consistent hashing ring including this new host
|
||||
p.rebuildRing()
|
||||
//p.rebuildRing()
|
||||
|
||||
log.Printf("Connected to remote host %s", host)
|
||||
|
||||
@@ -209,7 +266,7 @@ func (p *SyncedPool) initializeRemote(remote *RemoteHostGRPC) {
|
||||
|
||||
// Only set if not already claimed (first claim wins)
|
||||
if _, exists := p.remoteOwners[CartId(cid)]; !exists {
|
||||
p.remoteOwners[CartId(cid)] = remote.Host
|
||||
p.remoteOwners[CartId(cid)] = remote
|
||||
}
|
||||
count++
|
||||
}
|
||||
@@ -226,7 +283,7 @@ func (p *SyncedPool) RemoveHost(host string) {
|
||||
}
|
||||
// purge remote ownership entries for this host
|
||||
for id, h := range p.remoteOwners {
|
||||
if h == host {
|
||||
if h.Host == host {
|
||||
delete(p.remoteOwners, id)
|
||||
}
|
||||
}
|
||||
@@ -237,7 +294,7 @@ func (p *SyncedPool) RemoveHost(host string) {
|
||||
}
|
||||
connectedRemotes.Set(float64(p.RemoteCount()))
|
||||
// Rebuild ring after host removal
|
||||
p.rebuildRing()
|
||||
// p.rebuildRing()
|
||||
}
|
||||
|
||||
// RemoteCount returns number of tracked remote hosts.
|
||||
@@ -355,29 +412,6 @@ func (p *SyncedPool) GetHealthyRemotes() []*RemoteHostGRPC {
|
||||
return ret
|
||||
}
|
||||
|
||||
// rebuildRing removed (ring no longer used in first-touch ownership model)
|
||||
func (p *SyncedPool) rebuildRing() {}
|
||||
|
||||
// (All ring construction & metrics removed)
|
||||
|
||||
// ForceRingRefresh kept as no-op for backward compatibility.
|
||||
func (p *SyncedPool) ForceRingRefresh() {}
|
||||
|
||||
// ownersFor removed (ring-based ownership deprecated)
|
||||
func (p *SyncedPool) ownersFor(id CartId) []string {
|
||||
return []string{p.LocalHostname}
|
||||
}
|
||||
|
||||
// ownerHostFor retained as wrapper to satisfy existing calls (always local)
|
||||
func (p *SyncedPool) ownerHostFor(id CartId) string {
|
||||
return p.LocalHostname
|
||||
}
|
||||
|
||||
// DebugOwnerHost exposes (for tests) the currently computed primary owner host.
|
||||
func (p *SyncedPool) DebugOwnerHost(id CartId) string {
|
||||
return p.ownerHostFor(id)
|
||||
}
|
||||
|
||||
func (p *SyncedPool) removeLocalGrain(id CartId) {
|
||||
p.mu.Lock()
|
||||
delete(p.local.grains, uint64(id))
|
||||
@@ -398,42 +432,33 @@ var ErrNotOwner = fmt.Errorf("not owner")
|
||||
//
|
||||
// NOTE: This does NOT (yet) reconcile conflicting announcements; first claim
|
||||
// wins. Later improvements can add tie-break via timestamp or host ordering.
|
||||
func (p *SyncedPool) resolveOwnerFirstTouch(id CartId) (string, error) {
|
||||
func (p *SyncedPool) resolveOwnerFirstTouch(id CartId) error {
|
||||
// Fast local existence check
|
||||
p.local.mu.RLock()
|
||||
_, existsLocal := p.local.grains[uint64(id)]
|
||||
p.local.mu.RUnlock()
|
||||
if existsLocal {
|
||||
return p.LocalHostname, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remote ownership map lookup
|
||||
p.mu.RLock()
|
||||
remoteHost, foundRemote := p.remoteOwners[id]
|
||||
p.mu.RUnlock()
|
||||
if foundRemote && remoteHost != "" {
|
||||
return remoteHost, nil
|
||||
if foundRemote && remoteHost.Host != "" {
|
||||
log.Printf("other owner exists %s", remoteHost.Host)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Claim: spawn locally
|
||||
_, err := p.local.GetGrain(id)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return err
|
||||
}
|
||||
|
||||
// Record (defensive) in remoteOwners pointing to self (not strictly needed
|
||||
// for local queries, but keeps a single lookup structure).
|
||||
p.mu.Lock()
|
||||
if _, stillMissing := p.remoteOwners[id]; !stillMissing {
|
||||
// Another goroutine inserted meanwhile; keep theirs (first claim wins).
|
||||
} else {
|
||||
p.remoteOwners[id] = p.LocalHostname
|
||||
}
|
||||
p.mu.Unlock()
|
||||
|
||||
// Announce asynchronously
|
||||
go p.broadcastOwnership([]CartId{id})
|
||||
return p.LocalHostname, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// broadcastOwnership sends an AnnounceOwnership RPC to all healthy remotes.
|
||||
@@ -442,33 +467,25 @@ func (p *SyncedPool) broadcastOwnership(ids []CartId) {
|
||||
if len(ids) == 0 {
|
||||
return
|
||||
}
|
||||
// Prepare payload (convert to string slice)
|
||||
payload := make([]string, 0, len(ids))
|
||||
|
||||
uids := make([]uint64, 0, len(ids))
|
||||
for _, id := range ids {
|
||||
if id.String() != "" {
|
||||
payload = append(payload, id.String())
|
||||
}
|
||||
}
|
||||
if len(payload) == 0 {
|
||||
return
|
||||
uids = append(uids, uint64(id))
|
||||
}
|
||||
|
||||
p.mu.RLock()
|
||||
remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts))
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
for _, r := range p.remoteHosts {
|
||||
if r.IsHealthy() {
|
||||
remotes = append(remotes, r)
|
||||
go func(rh *RemoteHostGRPC) {
|
||||
rh.ControlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{
|
||||
Host: p.LocalHostname,
|
||||
CartIds: uids,
|
||||
})
|
||||
}(r)
|
||||
}
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
|
||||
for _, r := range remotes {
|
||||
go func(rh *RemoteHostGRPC) {
|
||||
// AnnounceOwnership RPC not yet available (proto regeneration pending); no-op broadcast for now.
|
||||
// Intended announcement: host=p.LocalHostname ids=payload
|
||||
_ = rh
|
||||
}(r)
|
||||
}
|
||||
}
|
||||
|
||||
// AdoptRemoteOwnership processes an incoming ownership announcement for cart ids.
|
||||
@@ -476,6 +493,10 @@ func (p *SyncedPool) AdoptRemoteOwnership(host string, ids []string) {
|
||||
if host == "" || host == p.LocalHostname {
|
||||
return
|
||||
}
|
||||
remoteHost, ok := p.remoteHosts[host]
|
||||
if !ok {
|
||||
log.Printf("remote host does not exist!!")
|
||||
}
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
for _, s := range ids {
|
||||
@@ -488,7 +509,7 @@ func (p *SyncedPool) AdoptRemoteOwnership(host string, ids []string) {
|
||||
}
|
||||
id := parsed
|
||||
// Do not overwrite if already claimed by another host (first wins).
|
||||
if existing, ok := p.remoteOwners[id]; ok && existing != host {
|
||||
if existing, ok := p.remoteOwners[id]; ok && existing != remoteHost {
|
||||
continue
|
||||
}
|
||||
// Skip if we own locally (local wins for our own process)
|
||||
@@ -498,7 +519,7 @@ func (p *SyncedPool) AdoptRemoteOwnership(host string, ids []string) {
|
||||
if localHas {
|
||||
continue
|
||||
}
|
||||
p.remoteOwners[id] = host
|
||||
p.remoteOwners[id] = remoteHost
|
||||
}
|
||||
}
|
||||
|
||||
@@ -506,20 +527,13 @@ func (p *SyncedPool) AdoptRemoteOwnership(host string, ids []string) {
|
||||
// the first-touch model. If another host owns the cart, ErrNotOwner is returned.
|
||||
// Remote grain proxy logic and ring-based spawning have been removed.
|
||||
func (p *SyncedPool) getGrain(id CartId) (Grain, error) {
|
||||
owner, err := p.resolveOwnerFirstTouch(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if owner != p.LocalHostname {
|
||||
// Another host owns it; signal caller to proxy / forward.
|
||||
return nil, ErrNotOwner
|
||||
}
|
||||
|
||||
// Owner is local (either existing or just claimed), fetch/create grain.
|
||||
grain, err := p.local.GetGrain(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.resolveOwnerFirstTouch(id)
|
||||
return grain, nil
|
||||
}
|
||||
|
||||
@@ -528,27 +542,31 @@ func (p *SyncedPool) getGrain(id CartId) (Grain, error) {
|
||||
// to replica owners (best-effort) and reconcile quorum on read.
|
||||
func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) {
|
||||
grain, err := p.getGrain(id)
|
||||
if err == ErrNotOwner {
|
||||
// Remote owner reported but either unreachable or failed earlier in stack.
|
||||
// Takeover strategy: remove remote mapping (first-touch override) and claim locally.
|
||||
p.mu.Lock()
|
||||
delete(p.remoteOwners, id)
|
||||
p.mu.Unlock()
|
||||
if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil {
|
||||
return nil, terr
|
||||
} else if owner == p.LocalHostname {
|
||||
// Fetch (now-local) grain
|
||||
grain, err = p.local.GetGrain(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
// Another host reclaimed before us; treat as not owner.
|
||||
return nil, ErrNotOwner
|
||||
}
|
||||
} else if err != nil {
|
||||
if err != nil {
|
||||
log.Printf("could not get grain %v", err)
|
||||
return nil, err
|
||||
}
|
||||
// if err == ErrNotOwner {
|
||||
// // Remote owner reported but either unreachable or failed earlier in stack.
|
||||
// // Takeover strategy: remove remote mapping (first-touch override) and claim locally.
|
||||
// p.mu.Lock()
|
||||
// delete(p.remoteOwners, id)
|
||||
// p.mu.Unlock()
|
||||
// if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil {
|
||||
// return nil, terr
|
||||
// } else if owner == p.LocalHostname {
|
||||
// // Fetch (now-local) grain
|
||||
// grain, err = p.local.GetGrain(id)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// } else {
|
||||
// // Another host reclaimed before us; treat as not owner.
|
||||
// return nil, ErrNotOwner
|
||||
// }
|
||||
// } else if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
start := time.Now()
|
||||
result, applyErr := grain.Apply(mutation, false)
|
||||
@@ -569,10 +587,10 @@ func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error)
|
||||
|
||||
if applyErr == nil && result != nil {
|
||||
cartMutationsTotal.Inc()
|
||||
if p.ownerHostFor(id) == p.LocalHostname {
|
||||
// Update active grains gauge only for local ownership
|
||||
cartActiveGrains.Set(float64(p.local.DebugGrainCount()))
|
||||
}
|
||||
//if p.ownerHostFor(id) == p.LocalHostname {
|
||||
// Update active grains gauge only for local ownership
|
||||
cartActiveGrains.Set(float64(p.local.DebugGrainCount()))
|
||||
//}
|
||||
} else if applyErr != nil {
|
||||
cartMutationFailuresTotal.Inc()
|
||||
}
|
||||
@@ -583,22 +601,8 @@ func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error)
|
||||
// Future replication hook: Read-repair or quorum read can be added here.
|
||||
func (p *SyncedPool) Get(id CartId) (*CartGrain, error) {
|
||||
grain, err := p.getGrain(id)
|
||||
if err == ErrNotOwner {
|
||||
// Attempt takeover on read as well (e.g. owner dead).
|
||||
p.mu.Lock()
|
||||
delete(p.remoteOwners, id)
|
||||
p.mu.Unlock()
|
||||
if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil {
|
||||
return nil, terr
|
||||
} else if owner == p.LocalHostname {
|
||||
grain, err = p.local.GetGrain(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
return nil, ErrNotOwner
|
||||
}
|
||||
} else if err != nil {
|
||||
if err != nil {
|
||||
log.Printf("could not get grain %v", err)
|
||||
return nil, err
|
||||
}
|
||||
return grain.GetCurrentState()
|
||||
@@ -631,6 +635,7 @@ func (p *SyncedPool) Hostname() string {
|
||||
}
|
||||
|
||||
// OwnerHost returns the primary owning host for a given cart id (ring lookup).
|
||||
func (p *SyncedPool) OwnerHost(id CartId) string {
|
||||
return p.ownerHostFor(id)
|
||||
func (p *SyncedPool) OwnerHost(id CartId) (Host, bool) {
|
||||
ownerHost, ok := p.remoteOwners[id]
|
||||
return ownerHost, ok
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user