From 4c55e9b509c9e298f1638de3cca5cc301512e97a Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 19 Oct 2021 12:02:14 +0100 Subject: [PATCH] AMM to avoid paused workers This reverts commit b658914eb158266b78b8e9d4873495adf8a1410f. --- distributed/active_memory_manager.py | 16 ++++++- .../tests/test_active_memory_manager.py | 43 +++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index 0bcd83522ca..76ec642f58e 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -9,6 +9,7 @@ import dask from dask.utils import parse_timedelta +from .core import Status from .metrics import time from .utils import import_term, log_errors @@ -234,11 +235,16 @@ def _find_recipient( if ts.state != "memory": return None if candidates is None: - candidates = set(self.scheduler.workers.values()) + candidates = self.scheduler.running.copy() + else: + candidates &= self.scheduler.running + candidates -= ts.who_has candidates -= pending_repl if not candidates: return None + + # Select candidate with the lowest memory usage return min(candidates, key=self.workers_memory.__getitem__) def _find_dropper( @@ -268,7 +274,13 @@ def _find_dropper( candidates -= {waiter_ts.processing_on for waiter_ts in ts.waiters} if not candidates: return None - return max(candidates, key=self.workers_memory.__getitem__) + + # Select candidate with the highest memory usage. + # Drop from workers with status paused or closing_gracefully first. + return max( + candidates, + key=lambda ws: (ws.status != Status.running, self.workers_memory[ws]), + ) class ActiveMemoryManagerPolicy: diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index 66c29760bae..96f5218c60a 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -8,6 +8,7 @@ ActiveMemoryManagerExtension, ActiveMemoryManagerPolicy, ) +from distributed.core import Status from distributed.utils_test import gen_cluster, inc, slowinc NO_AMM_START = {"distributed.scheduler.active-memory-manager.start": False} @@ -328,6 +329,22 @@ async def test_drop_with_bad_candidates(c, s, a, b): assert s.tasks["x"].who_has == {ws0, ws1} +@gen_cluster(client=True, nthreads=[("", 1)] * 10, config=demo_config("drop", n=1)) +async def test_drop_prefers_paused_workers(c, s, *workers): + x = await c.scatter({"x": 1}, broadcast=True) + ts = s.tasks["x"] + assert len(ts.who_has) == 10 + ws = s.workers[workers[3].address] + workers[3].memory_pause_fraction = 1e-15 + while ws.status != Status.paused: + await asyncio.sleep(0.01) + + s.extensions["amm"].run_once() + while len(ts.who_has) != 9: + await asyncio.sleep(0.01) + assert ws not in ts.who_has + + @gen_cluster(nthreads=[("", 1)] * 4, client=True, config=demo_config("replicate", n=2)) async def test_replicate(c, s, *workers): futures = await c.scatter({"x": 123}) @@ -436,6 +453,32 @@ async def test_replicate_to_candidates_with_key(c, s, a, b): assert s.tasks["x"].who_has == {ws0} +@gen_cluster(client=True, nthreads=[("", 1)] * 3, config=demo_config("replicate")) +async def test_replicate_avoids_paused_workers_1(c, s, w0, w1, w2): + w1.memory_pause_fraction = 1e-15 + while s.workers[w1.address].status != Status.paused: + await asyncio.sleep(0.01) + + futures = await c.scatter({"x": 1}, workers=[w0.address]) + s.extensions["amm"].run_once() + while "x" not in w2.data: + await asyncio.sleep(0.01) + await asyncio.sleep(0.2) + assert "x" not in w1.data + + +@gen_cluster(client=True, config=demo_config("replicate")) +async def test_replicate_avoids_paused_workers_2(c, s, a, b): + b.memory_pause_fraction = 1e-15 + while s.workers[b.address].status != Status.paused: + await asyncio.sleep(0.01) + + futures = await c.scatter({"x": 1}, workers=[a.address]) + s.extensions["amm"].run_once() + await asyncio.sleep(0.2) + assert "x" not in b.data + + @gen_cluster( nthreads=[("", 1)] * 4, client=True,