Skip to content

Commit

Permalink
AMM to avoid paused workers
Browse files Browse the repository at this point in the history
This reverts commit b658914eb158266b78b8e9d4873495adf8a1410f.
  • Loading branch information
crusaderky committed Oct 19, 2021
1 parent 6837b77 commit 4c55e9b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 2 deletions.
16 changes: 14 additions & 2 deletions distributed/active_memory_manager.py
Expand Up @@ -9,6 +9,7 @@
import dask
from dask.utils import parse_timedelta

from .core import Status
from .metrics import time
from .utils import import_term, log_errors

Expand Down Expand Up @@ -234,11 +235,16 @@ def _find_recipient(
if ts.state != "memory":
return None
if candidates is None:
candidates = set(self.scheduler.workers.values())
candidates = self.scheduler.running.copy()
else:
candidates &= self.scheduler.running

candidates -= ts.who_has
candidates -= pending_repl
if not candidates:
return None

# Select candidate with the lowest memory usage
return min(candidates, key=self.workers_memory.__getitem__)

def _find_dropper(
Expand Down Expand Up @@ -268,7 +274,13 @@ def _find_dropper(
candidates -= {waiter_ts.processing_on for waiter_ts in ts.waiters}
if not candidates:
return None
return max(candidates, key=self.workers_memory.__getitem__)

# Select candidate with the highest memory usage.
# Drop from workers with status paused or closing_gracefully first.
return max(
candidates,
key=lambda ws: (ws.status != Status.running, self.workers_memory[ws]),
)


class ActiveMemoryManagerPolicy:
Expand Down
43 changes: 43 additions & 0 deletions distributed/tests/test_active_memory_manager.py
Expand Up @@ -8,6 +8,7 @@
ActiveMemoryManagerExtension,
ActiveMemoryManagerPolicy,
)
from distributed.core import Status
from distributed.utils_test import gen_cluster, inc, slowinc

NO_AMM_START = {"distributed.scheduler.active-memory-manager.start": False}
Expand Down Expand Up @@ -328,6 +329,22 @@ async def test_drop_with_bad_candidates(c, s, a, b):
assert s.tasks["x"].who_has == {ws0, ws1}


@gen_cluster(client=True, nthreads=[("", 1)] * 10, config=demo_config("drop", n=1))
async def test_drop_prefers_paused_workers(c, s, *workers):
x = await c.scatter({"x": 1}, broadcast=True)
ts = s.tasks["x"]
assert len(ts.who_has) == 10
ws = s.workers[workers[3].address]
workers[3].memory_pause_fraction = 1e-15
while ws.status != Status.paused:
await asyncio.sleep(0.01)

s.extensions["amm"].run_once()
while len(ts.who_has) != 9:
await asyncio.sleep(0.01)
assert ws not in ts.who_has


@gen_cluster(nthreads=[("", 1)] * 4, client=True, config=demo_config("replicate", n=2))
async def test_replicate(c, s, *workers):
futures = await c.scatter({"x": 123})
Expand Down Expand Up @@ -436,6 +453,32 @@ async def test_replicate_to_candidates_with_key(c, s, a, b):
assert s.tasks["x"].who_has == {ws0}


@gen_cluster(client=True, nthreads=[("", 1)] * 3, config=demo_config("replicate"))
async def test_replicate_avoids_paused_workers_1(c, s, w0, w1, w2):
w1.memory_pause_fraction = 1e-15
while s.workers[w1.address].status != Status.paused:
await asyncio.sleep(0.01)

futures = await c.scatter({"x": 1}, workers=[w0.address])
s.extensions["amm"].run_once()
while "x" not in w2.data:
await asyncio.sleep(0.01)
await asyncio.sleep(0.2)
assert "x" not in w1.data


@gen_cluster(client=True, config=demo_config("replicate"))
async def test_replicate_avoids_paused_workers_2(c, s, a, b):
b.memory_pause_fraction = 1e-15
while s.workers[b.address].status != Status.paused:
await asyncio.sleep(0.01)

futures = await c.scatter({"x": 1}, workers=[a.address])
s.extensions["amm"].run_once()
await asyncio.sleep(0.2)
assert "x" not in b.data


@gen_cluster(
nthreads=[("", 1)] * 4,
client=True,
Expand Down

0 comments on commit 4c55e9b

Please sign in to comment.