Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMM: cosmetic tweaks #5584

Merged
merged 2 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 11 additions & 1 deletion distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def add_policy(self, policy: ActiveMemoryManagerPolicy) -> None:

def run_once(self) -> None:
"""Run all policies once and asynchronously (fire and forget) enact their
recommendations to replicate/drop keys
recommendations to replicate/drop tasks
"""
with log_errors():
# This should never fail since this is a synchronous method
Expand Down Expand Up @@ -198,6 +198,9 @@ def _find_recipient(
drops), or None if no eligible candidates are available.
"""
if ts.state != "memory":
logger.debug(
"(replicate, %s, %s) rejected: ts.state = %s", ts, candidates, ts.state
)
return None
if candidates is None:
candidates = self.scheduler.running.copy()
Expand All @@ -207,6 +210,9 @@ def _find_recipient(
candidates -= ts.who_has
candidates -= pending_repl
if not candidates:
logger.debug(
"(replicate, %s, %s) rejected: no valid candidates", ts, candidates
)
return None

# Select candidate with the lowest memory usage
Expand All @@ -230,6 +236,9 @@ def _find_dropper(
drops), or None if no eligible candidates are available.
"""
if len(ts.who_has) - len(pending_drop) < 2:
logger.debug(
"(drop, %s, %s) rejected: less than 2 replicas exist", ts, candidates
)
return None
if candidates is None:
candidates = ts.who_has.copy()
Expand All @@ -238,6 +247,7 @@ def _find_dropper(
candidates -= pending_drop
candidates -= {waiter_ts.processing_on for waiter_ts in ts.waiters}
if not candidates:
logger.debug("(drop, %s, %s) rejected: no valid candidates", ts, candidates)
return None

# Select candidate with the highest memory usage.
Expand Down
20 changes: 11 additions & 9 deletions distributed/tests/test_active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def test_no_policies(c, s, a, b):
s.extensions["amm"].run_once()


@gen_cluster(nthreads=[("", 1)] * 4, client=True, config=demo_config("drop"))
@gen_cluster(nthreads=[("", 1)] * 4, client=True, config=demo_config("drop", n=5))
async def test_drop(c, s, *workers):
with captured_amm_logger() as logs:
s.extensions["amm"].run_once()
Expand All @@ -82,7 +82,11 @@ async def test_drop(c, s, *workers):
# Also test the extension handler
with captured_amm_logger() as logs:
s.extensions["amm"].run_once()
assert logs.getvalue() == "Enacting suggestions for 1 tasks\n"
assert logs.getvalue() == (
"(drop, <TaskState 'x' memory>, None) rejected: less than 2 replicas exist\n"
"(drop, <TaskState 'x' memory>, None) rejected: less than 2 replicas exist\n"
"Enacting suggestions for 1 tasks\n"
)
while len(s.tasks["x"].who_has) > 1:
await asyncio.sleep(0.01)
# The last copy is never dropped even if the policy asks so
Expand Down Expand Up @@ -570,7 +574,7 @@ def run(self):
self.manager.policies.remove(self)


async def _tensordot_stress(c):
async def tensordot_stress(c):
da = pytest.importorskip("dask.array")

rng = da.random.RandomState(0)
Expand All @@ -580,7 +584,7 @@ async def _tensordot_stress(c):


@pytest.mark.slow
@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5371")
@pytest.mark.avoid_ci(reason="distributed#5371")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a subjective comment, but I find using the full hyperlink in these situations nicer because I can quickly copy and paste the link into a browser and go right to the issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, nice usage of avoid_ci 👍

@gen_cluster(
client=True,
nthreads=[("", 1)] * 4,
Expand All @@ -592,19 +596,18 @@ async def _tensordot_stress(c):
{"class": "distributed.tests.test_active_memory_manager.DropEverything"},
],
},
timeout=120,
)
async def test_drop_stress(c, s, *nannies):
"""A policy which suggests dropping everything won't break a running computation,
but only slow it down.

See also: test_ReduceReplicas_stress
"""
await _tensordot_stress(c)
await tensordot_stress(c)


@pytest.mark.slow
@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5371")
@pytest.mark.avoid_ci(reason="distributed#5371")
@gen_cluster(
client=True,
nthreads=[("", 1)] * 4,
Expand All @@ -616,11 +619,10 @@ async def test_drop_stress(c, s, *nannies):
{"class": "distributed.active_memory_manager.ReduceReplicas"},
],
},
timeout=120,
)
async def test_ReduceReplicas_stress(c, s, *nannies):
"""Running ReduceReplicas compulsively won't break a running computation. Unlike
test_drop_stress above, this test does not stop running after a few seconds - the
policy must not disrupt the computation too much.
"""
await _tensordot_stress(c)
await tensordot_stress(c)
9 changes: 5 additions & 4 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1568,17 +1568,18 @@ async def test_worker_listens_on_same_interface_by_default(cleanup, Worker):
@gen_cluster(client=True)
async def test_close_gracefully(c, s, a, b):
futures = c.map(slowinc, range(200), delay=0.1)
while not b.data:
await asyncio.sleep(0.1)

while not b.data:
await asyncio.sleep(0.01)
mem = set(b.data)
proc = [ts for ts in b.tasks.values() if ts.state == "executing"]
proc = {ts for ts in b.tasks.values() if ts.state == "executing"}
assert proc

await b.close_gracefully()

assert b.status == Status.closed
assert b.address not in s.workers
assert mem.issubset(set(a.data))
assert mem.issubset(a.data.keys())
for ts in proc:
assert ts.state in ("executing", "memory")

Expand Down
14 changes: 4 additions & 10 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1847,7 +1847,7 @@ def handle_remove_replicas(self, keys, stimulus_id):
{"op": "add-keys", "keys": rejected, "stimulus_id": stimulus_id}
)

self.transitions(recommendations=recommendations, stimulus_id=stimulus_id)
self.transitions(recommendations, stimulus_id=stimulus_id)

return "OK"

Expand Down Expand Up @@ -1889,7 +1889,6 @@ def handle_acquire_replicas(
self, comm=None, keys=None, priorities=None, who_has=None, stimulus_id=None
):
recommendations = {}
scheduler_msgs = []
for k in keys:
ts = self.ensure_task_exists(
k,
Expand All @@ -1900,9 +1899,6 @@ def handle_acquire_replicas(
recommendations[ts] = "fetch"

self.update_who_has(who_has, stimulus_id=stimulus_id)

for msg in scheduler_msgs:
self.batched_stream.send(msg)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nothing fills this list

self.transitions(recommendations, stimulus_id=stimulus_id)

def ensure_task_exists(
Expand Down Expand Up @@ -2680,7 +2676,7 @@ def ensure_communicating(self):
self.comm_nbytes += total_nbytes
self.in_flight_workers[worker] = to_gather
recommendations = {self.tasks[d]: ("flight", worker) for d in to_gather}
self.transitions(recommendations=recommendations, stimulus_id=stimulus_id)
self.transitions(recommendations, stimulus_id=stimulus_id)

self.loop.add_callback(
self.gather_dep,
Expand Down Expand Up @@ -3030,9 +3026,7 @@ async def gather_dep(
)
recommendations[ts] = "fetch"
del data, response
self.transitions(
recommendations=recommendations, stimulus_id=stimulus_id
)
self.transitions(recommendations, stimulus_id=stimulus_id)
self.ensure_computing()

if not busy:
Expand Down Expand Up @@ -3104,7 +3098,7 @@ def update_who_has(self, who_has, *, stimulus_id):
self.has_what[worker].add(dep)
self.pending_data_per_worker[worker].append(dep_ts.key)

self.transitions(recommendations=recommendations, stimulus_id=stimulus_id)
self.transitions(recommendations, stimulus_id=stimulus_id)
except Exception as e:
logger.exception(e)
if LOG_PDB:
Expand Down