Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMM: AMM to avoid paused workers #5440

Merged
merged 3 commits into from Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 14 additions & 2 deletions distributed/active_memory_manager.py
Expand Up @@ -9,6 +9,7 @@
import dask
from dask.utils import parse_timedelta

from .core import Status
from .metrics import time
from .utils import import_term, log_errors

Expand Down Expand Up @@ -234,11 +235,16 @@ def _find_recipient(
if ts.state != "memory":
return None
if candidates is None:
candidates = set(self.scheduler.workers.values())
candidates = self.scheduler.running.copy()
else:
candidates &= self.scheduler.running

candidates -= ts.who_has
candidates -= pending_repl
if not candidates:
return None

# Select candidate with the lowest memory usage
return min(candidates, key=self.workers_memory.__getitem__)

def _find_dropper(
Expand Down Expand Up @@ -268,7 +274,13 @@ def _find_dropper(
candidates -= {waiter_ts.processing_on for waiter_ts in ts.waiters}
if not candidates:
return None
return max(candidates, key=self.workers_memory.__getitem__)

# Select candidate with the highest memory usage.
# Drop from workers with status paused or closing_gracefully first.
return max(
candidates,
key=lambda ws: (ws.status != Status.running, self.workers_memory[ws]),
)


class ActiveMemoryManagerPolicy:
Expand Down
43 changes: 43 additions & 0 deletions distributed/tests/test_active_memory_manager.py
Expand Up @@ -8,6 +8,7 @@
ActiveMemoryManagerExtension,
ActiveMemoryManagerPolicy,
)
from distributed.core import Status
from distributed.utils_test import gen_cluster, inc, slowinc

NO_AMM_START = {"distributed.scheduler.active-memory-manager.start": False}
Expand Down Expand Up @@ -328,6 +329,22 @@ async def test_drop_with_bad_candidates(c, s, a, b):
assert s.tasks["x"].who_has == {ws0, ws1}


@gen_cluster(client=True, nthreads=[("", 1)] * 10, config=demo_config("drop", n=1))
async def test_drop_prefers_paused_workers(c, s, *workers):
x = await c.scatter({"x": 1}, broadcast=True)
ts = s.tasks["x"]
assert len(ts.who_has) == 10
ws = s.workers[workers[3].address]
workers[3].memory_pause_fraction = 1e-15
while ws.status != Status.paused:
await asyncio.sleep(0.01)

s.extensions["amm"].run_once()
while len(ts.who_has) != 9:
await asyncio.sleep(0.01)
assert ws not in ts.who_has


@gen_cluster(nthreads=[("", 1)] * 4, client=True, config=demo_config("replicate", n=2))
async def test_replicate(c, s, *workers):
futures = await c.scatter({"x": 123})
Expand Down Expand Up @@ -436,6 +453,32 @@ async def test_replicate_to_candidates_with_key(c, s, a, b):
assert s.tasks["x"].who_has == {ws0}


@gen_cluster(client=True, nthreads=[("", 1)] * 3, config=demo_config("replicate"))
async def test_replicate_avoids_paused_workers_1(c, s, w0, w1, w2):
w1.memory_pause_fraction = 1e-15
while s.workers[w1.address].status != Status.paused:
await asyncio.sleep(0.01)

futures = await c.scatter({"x": 1}, workers=[w0.address])
s.extensions["amm"].run_once()
while "x" not in w2.data:
await asyncio.sleep(0.01)
await asyncio.sleep(0.2)
assert "x" not in w1.data


@gen_cluster(client=True, config=demo_config("replicate"))
async def test_replicate_avoids_paused_workers_2(c, s, a, b):
b.memory_pause_fraction = 1e-15
while s.workers[b.address].status != Status.paused:
await asyncio.sleep(0.01)

futures = await c.scatter({"x": 1}, workers=[a.address])
s.extensions["amm"].run_once()
await asyncio.sleep(0.2)
assert "x" not in b.data


@gen_cluster(
nthreads=[("", 1)] * 4,
client=True,
Expand Down