Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Nov 21, 2021
1 parent f1de6c0 commit 245978b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 16 deletions.
40 changes: 25 additions & 15 deletions distributed/active_memory_manager.py
Expand Up @@ -469,21 +469,31 @@ def run(self):
nno_rec = 0

for ts in ws.has_what:
if len(ts.who_has) == 1:
nrepl += 1
pending_repl, _ = self.manager.pending[ts]
if not pending_repl:
rec_ws = (yield "replicate", ts, None)
if not rec_ws:
# replication was rejected by the AMM (see _find_recipient)
nno_rec += 1
else:
# This may be rejected by either the AMM (see _find_dropper) or by the
# Worker; e.g. a running task may depend on this key. If so we'll try
# again at the next iteration. Anyway, this is just to facilitate
# spilled tasks to be moved back into memory and not mandatory for
# retirement.
yield "drop", ts, {ws}
if len(ts.who_has) > 1:
# Use cases:
# 1. The suggestion is accepted by the AMM and by the Worker.
# Replica is dropped.
# 2. The suggestion is accepted by the AMM, but rejected by the Worker.
# We'll try again at the next AMM iteration.
# 3. The suggestion is rejected by the AMM, because:
# 3a. Another policy (e.g. ReduceReplicas) already suggested the same
# 3b. All replicas of the key are on workers being retired, and the
# other RetireWorker instances already asked the same suggestion.
# We need to deal with this case explicitly and create a replica
# elsewhere.
drop_ws = (yield "drop", ts, {ws})
if drop_ws:
continue

# Either the worker holds the only replica or all replicas are being held
# by workers that are being retired
nrepl += 1
pending_repl, _ = self.manager.pending[ts]
if not pending_repl:
rec_ws = (yield "replicate", ts, None)
if not rec_ws:
# replication was rejected by the AMM (see _find_recipient)
nno_rec += 1

if nno_rec:
# All workers are paused or closing_gracefully.
Expand Down
6 changes: 5 additions & 1 deletion distributed/tests/test_active_memory_manager.py
Expand Up @@ -616,9 +616,13 @@ async def test_RetireWorker_all_replicas_are_being_retired(c, s, w1, w2, w3):
"""There are multiple replicas of a key, but they all reside on workers that are
being retired
"""
ws1 = s.workers[w1.address]
ws2 = s.workers[w2.address]
ws3 = s.workers[w3.address]
fut = await c.scatter({"x": "x"}, workers=[w1.address, w2.address], broadcast=True)
assert s.tasks["x"].who_has == {ws1, ws2}
await c.retire_workers([w1.address, w2.address])
assert s.tasks["x"].who_has == {s.workers[w3.address]}
assert s.tasks["x"].who_has == {ws3}


@gen_cluster(client=True, nthreads=[("", 1)] * 4, config=NO_AMM_START)
Expand Down

0 comments on commit 245978b

Please sign in to comment.