Skip to content

Commit

Permalink
AMM: cosmetic tweaks (#5584)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Dec 10, 2021
1 parent b247fc2 commit b9f0517
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 24 deletions.
12 changes: 11 additions & 1 deletion distributed/active_memory_manager.py
Expand Up @@ -122,7 +122,7 @@ def add_policy(self, policy: ActiveMemoryManagerPolicy) -> None:

def run_once(self) -> None:
"""Run all policies once and asynchronously (fire and forget) enact their
recommendations to replicate/drop keys
recommendations to replicate/drop tasks
"""
with log_errors():
# This should never fail since this is a synchronous method
Expand Down Expand Up @@ -198,6 +198,9 @@ def _find_recipient(
drops), or None if no eligible candidates are available.
"""
if ts.state != "memory":
logger.debug(
"(replicate, %s, %s) rejected: ts.state = %s", ts, candidates, ts.state
)
return None
if candidates is None:
candidates = self.scheduler.running.copy()
Expand All @@ -207,6 +210,9 @@ def _find_recipient(
candidates -= ts.who_has
candidates -= pending_repl
if not candidates:
logger.debug(
"(replicate, %s, %s) rejected: no valid candidates", ts, candidates
)
return None

# Select candidate with the lowest memory usage
Expand All @@ -230,6 +236,9 @@ def _find_dropper(
drops), or None if no eligible candidates are available.
"""
if len(ts.who_has) - len(pending_drop) < 2:
logger.debug(
"(drop, %s, %s) rejected: less than 2 replicas exist", ts, candidates
)
return None
if candidates is None:
candidates = ts.who_has.copy()
Expand All @@ -238,6 +247,7 @@ def _find_dropper(
candidates -= pending_drop
candidates -= {waiter_ts.processing_on for waiter_ts in ts.waiters}
if not candidates:
logger.debug("(drop, %s, %s) rejected: no valid candidates", ts, candidates)
return None

# Select candidate with the highest memory usage.
Expand Down
20 changes: 11 additions & 9 deletions distributed/tests/test_active_memory_manager.py
Expand Up @@ -70,7 +70,7 @@ async def test_no_policies(c, s, a, b):
s.extensions["amm"].run_once()


@gen_cluster(nthreads=[("", 1)] * 4, client=True, config=demo_config("drop"))
@gen_cluster(nthreads=[("", 1)] * 4, client=True, config=demo_config("drop", n=5))
async def test_drop(c, s, *workers):
with captured_amm_logger() as logs:
s.extensions["amm"].run_once()
Expand All @@ -82,7 +82,11 @@ async def test_drop(c, s, *workers):
# Also test the extension handler
with captured_amm_logger() as logs:
s.extensions["amm"].run_once()
assert logs.getvalue() == "Enacting suggestions for 1 tasks\n"
assert logs.getvalue() == (
"(drop, <TaskState 'x' memory>, None) rejected: less than 2 replicas exist\n"
"(drop, <TaskState 'x' memory>, None) rejected: less than 2 replicas exist\n"
"Enacting suggestions for 1 tasks\n"
)
while len(s.tasks["x"].who_has) > 1:
await asyncio.sleep(0.01)
# The last copy is never dropped even if the policy asks so
Expand Down Expand Up @@ -570,7 +574,7 @@ def run(self):
self.manager.policies.remove(self)


async def _tensordot_stress(c):
async def tensordot_stress(c):
da = pytest.importorskip("dask.array")

rng = da.random.RandomState(0)
Expand All @@ -580,7 +584,7 @@ async def _tensordot_stress(c):


@pytest.mark.slow
@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5371")
@pytest.mark.avoid_ci(reason="distributed#5371")
@gen_cluster(
client=True,
nthreads=[("", 1)] * 4,
Expand All @@ -592,19 +596,18 @@ async def _tensordot_stress(c):
{"class": "distributed.tests.test_active_memory_manager.DropEverything"},
],
},
timeout=120,
)
async def test_drop_stress(c, s, *nannies):
"""A policy which suggests dropping everything won't break a running computation,
but only slow it down.
See also: test_ReduceReplicas_stress
"""
await _tensordot_stress(c)
await tensordot_stress(c)


@pytest.mark.slow
@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5371")
@pytest.mark.avoid_ci(reason="distributed#5371")
@gen_cluster(
client=True,
nthreads=[("", 1)] * 4,
Expand All @@ -616,11 +619,10 @@ async def test_drop_stress(c, s, *nannies):
{"class": "distributed.active_memory_manager.ReduceReplicas"},
],
},
timeout=120,
)
async def test_ReduceReplicas_stress(c, s, *nannies):
"""Running ReduceReplicas compulsively won't break a running computation. Unlike
test_drop_stress above, this test does not stop running after a few seconds - the
policy must not disrupt the computation too much.
"""
await _tensordot_stress(c)
await tensordot_stress(c)
9 changes: 5 additions & 4 deletions distributed/tests/test_worker.py
Expand Up @@ -1568,17 +1568,18 @@ async def test_worker_listens_on_same_interface_by_default(cleanup, Worker):
@gen_cluster(client=True)
async def test_close_gracefully(c, s, a, b):
futures = c.map(slowinc, range(200), delay=0.1)
while not b.data:
await asyncio.sleep(0.1)

while not b.data:
await asyncio.sleep(0.01)
mem = set(b.data)
proc = [ts for ts in b.tasks.values() if ts.state == "executing"]
proc = {ts for ts in b.tasks.values() if ts.state == "executing"}
assert proc

await b.close_gracefully()

assert b.status == Status.closed
assert b.address not in s.workers
assert mem.issubset(set(a.data))
assert mem.issubset(a.data.keys())
for ts in proc:
assert ts.state in ("executing", "memory")

Expand Down
14 changes: 4 additions & 10 deletions distributed/worker.py
Expand Up @@ -1847,7 +1847,7 @@ def handle_remove_replicas(self, keys, stimulus_id):
{"op": "add-keys", "keys": rejected, "stimulus_id": stimulus_id}
)

self.transitions(recommendations=recommendations, stimulus_id=stimulus_id)
self.transitions(recommendations, stimulus_id=stimulus_id)

return "OK"

Expand Down Expand Up @@ -1889,7 +1889,6 @@ def handle_acquire_replicas(
self, comm=None, keys=None, priorities=None, who_has=None, stimulus_id=None
):
recommendations = {}
scheduler_msgs = []
for k in keys:
ts = self.ensure_task_exists(
k,
Expand All @@ -1900,9 +1899,6 @@ def handle_acquire_replicas(
recommendations[ts] = "fetch"

self.update_who_has(who_has, stimulus_id=stimulus_id)

for msg in scheduler_msgs:
self.batched_stream.send(msg)
self.transitions(recommendations, stimulus_id=stimulus_id)

def ensure_task_exists(
Expand Down Expand Up @@ -2680,7 +2676,7 @@ def ensure_communicating(self):
self.comm_nbytes += total_nbytes
self.in_flight_workers[worker] = to_gather
recommendations = {self.tasks[d]: ("flight", worker) for d in to_gather}
self.transitions(recommendations=recommendations, stimulus_id=stimulus_id)
self.transitions(recommendations, stimulus_id=stimulus_id)

self.loop.add_callback(
self.gather_dep,
Expand Down Expand Up @@ -3030,9 +3026,7 @@ async def gather_dep(
)
recommendations[ts] = "fetch"
del data, response
self.transitions(
recommendations=recommendations, stimulus_id=stimulus_id
)
self.transitions(recommendations, stimulus_id=stimulus_id)
self.ensure_computing()

if not busy:
Expand Down Expand Up @@ -3104,7 +3098,7 @@ def update_who_has(self, who_has, *, stimulus_id):
self.has_what[worker].add(dep)
self.pending_data_per_worker[worker].append(dep_ts.key)

self.transitions(recommendations=recommendations, stimulus_id=stimulus_id)
self.transitions(recommendations, stimulus_id=stimulus_id)
except Exception as e:
logger.exception(e)
if LOG_PDB:
Expand Down

0 comments on commit b9f0517

Please sign in to comment.