diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index 126608a0173..82385354d4d 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -469,21 +469,31 @@ def run(self): nno_rec = 0 for ts in ws.has_what: - if len(ts.who_has) == 1: - nrepl += 1 - pending_repl, _ = self.manager.pending[ts] - if not pending_repl: - rec_ws = (yield "replicate", ts, None) - if not rec_ws: - # replication was rejected by the AMM (see _find_recipient) - nno_rec += 1 - else: - # This may be rejected by either the AMM (see _find_dropper) or by the - # Worker; e.g. a running task may depend on this key. If so we'll try - # again at the next iteration. Anyway, this is just to facilitate - # spilled tasks to be moved back into memory and not mandatory for - # retirement. - yield "drop", ts, {ws} + if len(ts.who_has) > 1: + # Use cases: + # 1. The suggestion is accepted by the AMM and by the Worker. + # Replica is dropped. + # 2. The suggestion is accepted by the AMM, but rejected by the Worker. + # We'll try again at the next AMM iteration. + # 3. The suggestion is rejected by the AMM, because: + # 3a. Another policy (e.g. ReduceReplicas) already suggested the same + # 3b. All replicas of the key are on workers being retired, and the + # other RetireWorker instances already asked the same suggestion. + # We need to deal with this case explicitly and create a replica + # elsewhere. + drop_ws = (yield "drop", ts, {ws}) + if drop_ws: + continue + + # Either the worker holds the only replica or all replicas are being held + # by workers that are being retired + nrepl += 1 + pending_repl, _ = self.manager.pending[ts] + if not pending_repl: + rec_ws = (yield "replicate", ts, None) + if not rec_ws: + # replication was rejected by the AMM (see _find_recipient) + nno_rec += 1 if nno_rec: # All workers are paused or closing_gracefully. diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index 344c446c1ef..9d9e813c6b3 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -616,9 +616,13 @@ async def test_RetireWorker_all_replicas_are_being_retired(c, s, w1, w2, w3): """There are multiple replicas of a key, but they all reside on workers that are being retired """ + ws1 = s.workers[w1.address] + ws2 = s.workers[w2.address] + ws3 = s.workers[w3.address] fut = await c.scatter({"x": "x"}, workers=[w1.address, w2.address], broadcast=True) + assert s.tasks["x"].who_has == {ws1, ws2} await c.retire_workers([w1.address, w2.address]) - assert s.tasks["x"].who_has == {s.workers[w3.address]} + assert s.tasks["x"].who_has == {ws3} @gen_cluster(client=True, nthreads=[("", 1)] * 4, config=NO_AMM_START)