Skip to content

Commit

Permalink
Merge branch 'main' into AMM/RetireWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Nov 29, 2021
2 parents c6aaa89 + 5d23af9 commit 3de46eb
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 68 deletions.
122 changes: 70 additions & 52 deletions distributed/active_memory_manager.py
Expand Up @@ -52,6 +52,7 @@ def __init__(
# The following parameters are exposed so that one may create, run, and throw
# away on the fly a specialized manager, separate from the main one.
policies: set[ActiveMemoryManagerPolicy] | None = None,
*,
register: bool = True,
start: bool | None = None,
interval: float | None = None,
Expand Down Expand Up @@ -135,52 +136,9 @@ def run_once(self) -> None:
# populate self.pending
self._run_policies()

drop_by_worker: (
defaultdict[WorkerState, set[TaskState]]
) = defaultdict(set)
repl_by_worker: (
defaultdict[WorkerState, dict[TaskState, set[str]]]
) = defaultdict(dict)

for ts, (pending_repl, pending_drop) in self.pending.items():
if not ts.who_has:
continue
who_has = {ws_snd.address for ws_snd in ts.who_has - pending_drop}
assert who_has # Never drop the last replica
for ws_rec in pending_repl:
assert ws_rec not in ts.who_has
repl_by_worker[ws_rec][ts] = who_has
for ws in pending_drop:
assert ws in ts.who_has
drop_by_worker[ws].add(ts)

# Fire-and-forget enact recommendations from policies
stimulus_id = str(time())
for ws_rec, ts_to_who_has in repl_by_worker.items():
self.scheduler.stream_comms[ws_rec.address].send(
{
"op": "acquire-replicas",
"keys": [ts.key for ts in ts_to_who_has],
"stimulus_id": "acquire-replicas-" + stimulus_id,
"priorities": {ts.key: ts.priority for ts in ts_to_who_has},
"who_has": {ts.key: v for ts, v in ts_to_who_has.items()},
},
)

for ws, tss in drop_by_worker.items():
# The scheduler immediately forgets about the replica and suggests
# the worker to drop it. The worker may refuse, at which point it
# will send back an add-keys message to reinstate it.
for ts in tss:
self.scheduler.remove_replica(ts, ws)
self.scheduler.stream_comms[ws.address].send(
{
"op": "remove-replicas",
"keys": [ts.key for ts in tss],
"stimulus_id": "remove-replicas-" + stimulus_id,
}
)

if self.pending:
logger.debug("Enacting suggestions for %d tasks", len(self.pending))
self._enact_suggestions()
finally:
del self.workers_memory
del self.pending
Expand Down Expand Up @@ -289,6 +247,54 @@ def _find_dropper(
key=lambda ws: (ws.status != Status.running, self.workers_memory[ws]),
)

def _enact_suggestions(self) -> None:
"""Iterate through self.pending, which was filled by self._run_policies(), and
push the suggestions to the workers through bulk comms. Return immediately.
"""
drop_by_worker: (defaultdict[WorkerState, set[TaskState]]) = defaultdict(set)
repl_by_worker: (
defaultdict[WorkerState, dict[TaskState, set[str]]]
) = defaultdict(dict)

for ts, (pending_repl, pending_drop) in self.pending.items():
if not ts.who_has:
continue
who_has = {ws_snd.address for ws_snd in ts.who_has - pending_drop}
assert who_has # Never drop the last replica
for ws_rec in pending_repl:
assert ws_rec not in ts.who_has
repl_by_worker[ws_rec][ts] = who_has
for ws in pending_drop:
assert ws in ts.who_has
drop_by_worker[ws].add(ts)

# Fire-and-forget enact recommendations from policies
stimulus_id = str(time())
for ws_rec, ts_to_who_has in repl_by_worker.items():
self.scheduler.stream_comms[ws_rec.address].send(
{
"op": "acquire-replicas",
"keys": [ts.key for ts in ts_to_who_has],
"stimulus_id": "acquire-replicas-" + stimulus_id,
"priorities": {ts.key: ts.priority for ts in ts_to_who_has},
"who_has": {ts.key: v for ts, v in ts_to_who_has.items()},
},
)

for ws, tss in drop_by_worker.items():
# The scheduler immediately forgets about the replica and suggests the
# worker to drop it. The worker may refuse, at which point it will send back
# an add-keys message to reinstate it.
for ts in tss:
self.scheduler.remove_replica(ts, ws)
self.scheduler.stream_comms[ws.address].send(
{
"op": "remove-replicas",
"keys": [ts.key for ts in tss],
"stimulus_id": "remove-replicas-" + stimulus_id,
}
)


class ActiveMemoryManagerPolicy:
"""Abstract parent class"""
Expand Down Expand Up @@ -378,6 +384,9 @@ class ReduceReplicas(ActiveMemoryManagerPolicy):

def run(self):
""""""
nkeys = 0
ndrop = 0

for ts in self.manager.scheduler.replicated_tasks:
desired_replicas = 1 # TODO have a marker on TaskState

Expand All @@ -387,14 +396,23 @@ def run(self):
# worker, don't double count them.
nwaiters = len({waiter.processing_on or waiter for waiter in ts.waiters})

ndrop = len(ts.who_has) - max(desired_replicas, nwaiters)
ndrop_key = len(ts.who_has) - max(desired_replicas, nwaiters)
if ts in self.manager.pending:
pending_repl, pending_drop = self.manager.pending[ts]
ndrop += len(pending_repl) - len(pending_drop)

# ndrop could be negative, which for range() is the same as 0.
for _ in range(ndrop):
yield "drop", ts, None
ndrop_key += len(pending_repl) - len(pending_drop)

if ndrop_key > 0:
nkeys += 1
ndrop += ndrop_key
for _ in range(ndrop_key):
yield "drop", ts, None

if ndrop:
logger.debug(
"ReduceReplicas: Dropping %d superfluous replicas of %d tasks",
ndrop,
nkeys,
)


class RetireWorker(ActiveMemoryManagerPolicy):
Expand Down
48 changes: 36 additions & 12 deletions distributed/tests/test_active_memory_manager.py
@@ -1,4 +1,5 @@
import asyncio
import logging
import random
from time import sleep

Expand All @@ -10,20 +11,13 @@
ActiveMemoryManagerPolicy,
)
from distributed.core import Status
from distributed.utils_test import gen_cluster, inc, slowinc
from distributed.utils_test import captured_logger, gen_cluster, inc, slowinc

NO_AMM_START = {"distributed.scheduler.active-memory-manager.start": False}


@gen_cluster(
client=True,
config={
"distributed.scheduler.active-memory-manager.start": False,
"distributed.scheduler.active-memory-manager.policies": [],
},
)
async def test_no_policies(c, s, a, b):
s.extensions["amm"].run_once()
def captured_amm_logger():
return captured_logger("distributed.active_memory_manager", level=logging.DEBUG)


class DemoPolicy(ActiveMemoryManagerPolicy):
Expand Down Expand Up @@ -66,12 +60,30 @@ def demo_config(action, key="x", n=10, candidates=None, start=False, interval=0.
}


@gen_cluster(
client=True,
config={
"distributed.scheduler.active-memory-manager.start": False,
"distributed.scheduler.active-memory-manager.policies": [],
},
)
async def test_no_policies(c, s, a, b):
s.extensions["amm"].run_once()


@gen_cluster(nthreads=[("", 1)] * 4, client=True, config=demo_config("drop"))
async def test_drop(c, s, *workers):
with captured_amm_logger() as logs:
s.extensions["amm"].run_once()
# Logging is quiet if there are no suggestions
assert logs.getvalue() == ""

futures = await c.scatter({"x": 123}, broadcast=True)
assert len(s.tasks["x"].who_has) == 4
# Also test the extension handler
s.extensions["amm"].run_once()
with captured_amm_logger() as logs:
s.extensions["amm"].run_once()
assert logs.getvalue() == "Enacting suggestions for 1 tasks\n"
while len(s.tasks["x"].who_has) > 1:
await asyncio.sleep(0.01)
# The last copy is never dropped even if the policy asks so
Expand Down Expand Up @@ -518,9 +530,21 @@ async def test_replicate_avoids_paused_workers_2(c, s, a, b):
},
)
async def test_ReduceReplicas(c, s, *workers):
with captured_amm_logger() as logs:
s.extensions["amm"].run_once()
# Logging is quiet if there are no suggestions
assert logs.getvalue() == ""

futures = await c.scatter({"x": 123}, broadcast=True)
assert len(s.tasks["x"].who_has) == 4
s.extensions["amm"].run_once()

with captured_amm_logger() as logs:
s.extensions["amm"].run_once()
assert logs.getvalue() == (
"ReduceReplicas: Dropping 3 superfluous replicas of 1 tasks\n"
"Enacting suggestions for 1 tasks\n" # core AMM extension
)

while len(s.tasks["x"].who_has) > 1:
await asyncio.sleep(0.01)

Expand Down
4 changes: 3 additions & 1 deletion docs/source/scheduling-state.rst
Expand Up @@ -66,7 +66,9 @@ Tasks flow along the following states with the following allowed transitions:
* *No-worker*: Ready to be computed, but no appropriate worker exists
(for example because of resource restrictions, or because no worker is
connected at all).
* *Processing*: Actively being computed by one or more workers
* *Processing*: All dependencies are available and the task is assigned to a
worker for compute (the scheduler doesn't know whether it's in a worker
queue or actively being computed).
* *Memory*: In memory on one or more workers
* *Erred*: Task computation, or one of its dependencies, has encountered an error
* *Forgotten* (not actually a state): Task is no longer needed by any client
Expand Down
7 changes: 4 additions & 3 deletions docs/source/worker.rst
Expand Up @@ -225,9 +225,10 @@ usage then the worker will start dumping unused data to disk, even if internal
Halt worker threads
~~~~~~~~~~~~~~~~~~~

At 80% load, the worker's thread pool will stop accepting new tasks. This
gives time for the write-to-disk functionality to take effect even in the face
of rapidly accumulating data.
At 80% load, the worker's thread pool will stop starting computation on
additional tasks in the worker's queue. This gives time for the write-to-disk
functionality to take effect even in the face of rapidly accumulating data.
Currently executing tasks continue to run.


Kill Worker
Expand Down

0 comments on commit 3de46eb

Please sign in to comment.