update
This commit is contained in:
@@ -219,6 +219,7 @@ func (p *CartPool) HandleOwnershipChange(host string, ids []uint64) error {
|
||||
p.localMu.Lock()
|
||||
defer p.localMu.Unlock()
|
||||
for _, id := range ids {
|
||||
log.Printf("Handling ownership change for cart %d to host %s", id, host)
|
||||
delete(p.grains, id)
|
||||
p.remoteOwners[id] = remoteHost
|
||||
}
|
||||
@@ -226,15 +227,11 @@ func (p *CartPool) HandleOwnershipChange(host string, ids []uint64) error {
|
||||
}
|
||||
|
||||
// SnapshotGrains returns a copy of the currently resident grains keyed by id.
|
||||
func (p *CartPool) SnapshotGrains() map[CartId]*CartGrain {
|
||||
func (p *CartPool) SnapshotGrains() map[uint64]*CartGrain {
|
||||
p.localMu.RLock()
|
||||
defer p.localMu.RUnlock()
|
||||
out := make(map[CartId]*CartGrain, len(p.grains))
|
||||
for _, g := range p.grains {
|
||||
if g != nil {
|
||||
out[g.GetId()] = g
|
||||
}
|
||||
}
|
||||
out := maps.Clone(p.grains)
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -258,6 +255,11 @@ func (p *CartPool) SnapshotGrains() map[CartId]*CartGrain {
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func (p *CartPool) TakeOwnership(id uint64) {
|
||||
|
||||
if p.grains[id] != nil {
|
||||
return
|
||||
}
|
||||
log.Printf("taking ownership of: %d", id)
|
||||
p.broadcastOwnership([]uint64{id})
|
||||
}
|
||||
|
||||
@@ -402,19 +404,19 @@ func (p *CartPool) SendNegotiation() {
|
||||
}
|
||||
p.remoteMu.RUnlock()
|
||||
|
||||
for _, r := range remotes {
|
||||
knownByRemote, err := r.Negotiate(hosts)
|
||||
p.forAllHosts(func(remote *proxy.RemoteHost) {
|
||||
knownByRemote, err := remote.Negotiate(hosts)
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Negotiate with %s failed: %v", r.Host, err)
|
||||
continue
|
||||
log.Printf("Negotiate with %s failed: %v", remote.Host, err)
|
||||
return
|
||||
}
|
||||
for _, h := range knownByRemote {
|
||||
if !p.IsKnown(h) {
|
||||
p.AddRemote(h)
|
||||
go p.AddRemote(h)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (p *CartPool) forAllHosts(fn func(*proxy.RemoteHost)) {
|
||||
@@ -493,8 +495,7 @@ func (p *CartPool) Apply(id uint64, mutation any) (*CartGrain, error) {
|
||||
|
||||
if applyErr == nil && result != nil {
|
||||
cartMutationsTotal.Inc()
|
||||
//p.RefreshExpiry(id)
|
||||
//cartActiveGrains.Set(float64(len(p.grains)))
|
||||
|
||||
} else if applyErr != nil {
|
||||
cartMutationFailuresTotal.Inc()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user