Skip to content

Commit

Permalink
Code review
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Feb 1, 2022
1 parent 78f9512 commit 36928dc
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 29 deletions.
36 changes: 19 additions & 17 deletions distributed/active_memory_manager.py
Expand Up @@ -515,28 +515,30 @@ class RetireWorker(ActiveMemoryManagerPolicy):
1. This is the only worker in the cluster, or
2. All workers are either paused or being retired at the same time
In either case, this policy will fail to move out all keys and
Scheduler.retire_workers will abort the retirement. The flag ``no_recipients`` will
be raised.
In either case, this policy will fail to move out all keys and set the
``no_recipients`` boolean to True. :meth:`~distributed.Scheduler.retire_workers`
will abort the retirement.
There is a third use case, where a task fails to be replicated away for whatever
reason; in this case we'll just wait for the next AMM iteration and try again
(possibly with a different receiving worker, e.g. if the receiving worker was
hung but not yet declared dead).
reason, e.g. because its recipient is unresponsive but the Scheduler doesn't know
yet. In this case we'll just wait for the next AMM iteration and try again (possibly
with a different receiving worker, e.g. if the receiving worker was hung but not yet
declared dead).
**Retiring a worker with spilled tasks**
On its very first iteration, this policy suggests that other workers should fetch
all unique in-memory tasks. Frequently, this means that in the next few moments the
worker to be retired will be bombarded by :meth:`distributed.worker.Worker.get_data`
calls from the rest of the cluster. This can be a problem if most of the managed
memory of the worker has been spilled out, as it could send the worker above the
terminate threshold. Two measures are in place in order to prevent this:
- At every iteration, this policy drops all tasks that have already been replicated
somewhere else. This makes room for further tasks to be moved out of the spill
file in order to be replicated onto another worker.
- Once a worker passes the ``pause`` threshold,
all unique in-memory tasks of the retiring worker. Frequently, this means that in
the next few moments the retiring worker will be bombarded by
:meth:`distributed.worker.Worker.get_data` calls from the rest of the cluster. This
can be a problem if most of the managed memory of the worker has been spilled out,
as it could send the worker above its terminate threshold. Two measures are in place
in order to prevent this:
- At every iteration, this policy drops all tasks on the retiring worker that have
already been replicated somewhere else. This makes room for further tasks to be
moved out of the spill file in order to be replicated onto another worker.
- Once the worker passes the ``pause`` threshold,
:meth:`~distributed.worker.Worker.get_data` throttles the number of outgoing
connections to 1.
Expand All @@ -554,7 +556,7 @@ def __init__(self, address: str):
self.no_recipients = False

def __repr__(self) -> str:
return f"RetireWorker({self.address})"
return f"RetireWorker({self.address!r})"

def run(self):
""""""
Expand Down
6 changes: 4 additions & 2 deletions distributed/client.py
Expand Up @@ -4103,10 +4103,12 @@ def unsubscribe_topic(self, topic):
else:
raise ValueError(f"No event handler known for topic {topic}.")

def retire_workers(self, workers=None, close_workers=True, **kwargs):
def retire_workers(
self, workers: list[str] | None = None, close_workers: bool = True, **kwargs
):
"""Retire certain workers on the scheduler
See dask.distributed.Scheduler.retire_workers for the full docstring.
See :meth:`distributed.Scheduler.retire_workers` for the full docstring.
Parameters
----------
Expand Down
13 changes: 8 additions & 5 deletions distributed/scheduler.py
Expand Up @@ -6807,7 +6807,7 @@ async def retire_workers(
comm=None,
*,
workers: "list[str] | None" = None,
names: "list[str] | None" = None,
names: "list | None" = None,
close_workers: bool = False,
remove: bool = True,
**kwargs,
Expand All @@ -6816,7 +6816,7 @@ async def retire_workers(
Parameters
----------
workers: list (optional)
workers: list[str] (optional)
List of worker addresses to retire.
names: list (optional)
List of worker names to retire.
Expand Down Expand Up @@ -6858,6 +6858,8 @@ async def retire_workers(
raise TypeError("names and workers are mutually exclusive")
if names:
logger.info("Retire worker names %s", names)
# Support cases where names are passed through a CLI and become
# strings
names_set = {str(name) for name in names}
wss = {
ws
Expand Down Expand Up @@ -6904,7 +6906,7 @@ async def retire_workers(
)

coros.append(
self._retire_worker(
self._track_retire_worker(
ws,
policy,
prev_status=prev_status,
Expand All @@ -6929,7 +6931,7 @@ async def retire_workers(

return workers_info

async def _retire_worker(
async def _track_retire_worker(
self,
ws: WorkerState,
policy: RetireWorker,
Expand All @@ -6942,7 +6944,8 @@ async def _retire_worker(
while not policy.done():
if policy.no_recipients:
# Abort retirement. This time we don't need to worry about race
# conditions and we can wait for a round-trip.
# conditions and we can wait for a scheduler->worker->scheduler
# round-trip.
self.stream_comms[ws.address].send(
{"op": "worker-status-change", "status": prev_status.name}
)
Expand Down
6 changes: 3 additions & 3 deletions distributed/tests/test_active_memory_manager.py
Expand Up @@ -852,7 +852,7 @@ async def test_RetireWorker_no_recipients(c, s, w1, w2, w3, w4):
assert not s.extensions["amm"].policies
assert set(s.workers) in ({w2.address, w3.address}, {w2.address, w4.address})
# After a Scheduler -> Worker -> WorkerState roundtrip, workers that failed to
# retired went back from closing_gracefully to running and can run tasks
# retire went back from closing_gracefully to running and can run tasks
while any(ws.status != Status.running for ws in s.workers.values()):
await asyncio.sleep(0.01)
assert await c.submit(inc, 1) == 2
Expand Down Expand Up @@ -881,7 +881,7 @@ async def test_RetireWorker_all_recipients_are_paused(c, s, a, b):
assert set(s.workers) == {a.address, b.address}

# After a Scheduler -> Worker -> WorkerState roundtrip, workers that failed to
# retired went back from closing_gracefully to running and can run tasks
# retire went back from closing_gracefully to running and can run tasks
while ws_a.status != Status.running:
await asyncio.sleep(0.01)
assert await c.submit(inc, 1) == 2
Expand Down Expand Up @@ -1011,7 +1011,7 @@ async def test_ReduceReplicas_stress(c, s, *nannies):
await tensordot_stress(c)


# @pytest.mark.slow
@pytest.mark.slow
@pytest.mark.avoid_ci(reason="distributed#5371")
@pytest.mark.parametrize("use_ReduceReplicas", [False, True])
@gen_cluster(
Expand Down
2 changes: 0 additions & 2 deletions distributed/tests/test_worker.py
Expand Up @@ -1640,8 +1640,6 @@ def assert_amm_transfer_story(key: str, w_from: Worker, w_to: Worker) -> None:
[
(key, "ensure-task-exists", "released"),
(key, "released", "fetch", "fetch", {}),
(key, "fetch", "missing", "missing", {}),
(key, "missing", "fetch", "fetch", {}),
("gather-dependencies", w_from.address, lambda set_: key in set_),
(key, "fetch", "flight", "flight", {}),
("request-dep", w_from.address, lambda set_: key in set_),
Expand Down

0 comments on commit 36928dc

Please sign in to comment.