diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index f11aa35712..4a61609590 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -515,28 +515,30 @@ class RetireWorker(ActiveMemoryManagerPolicy): 1. This is the only worker in the cluster, or 2. All workers are either paused or being retired at the same time - In either case, this policy will fail to move out all keys and - Scheduler.retire_workers will abort the retirement. The flag ``no_recipients`` will - be raised. + In either case, this policy will fail to move out all keys and set the + ``no_recipients`` boolean to True. :meth:`~distributed.Scheduler.retire_workers` + will abort the retirement. There is a third use case, where a task fails to be replicated away for whatever - reason; in this case we'll just wait for the next AMM iteration and try again - (possibly with a different receiving worker, e.g. if the receiving worker was - hung but not yet declared dead). + reason, e.g. because its recipient is unresponsive but the Scheduler doesn't know + yet. In this case we'll just wait for the next AMM iteration and try again (possibly + with a different receiving worker, e.g. if the receiving worker was hung but not yet + declared dead). **Retiring a worker with spilled tasks** On its very first iteration, this policy suggests that other workers should fetch - all unique in-memory tasks. Frequently, this means that in the next few moments the - worker to be retired will be bombarded by :meth:`distributed.worker.Worker.get_data` - calls from the rest of the cluster. This can be a problem if most of the managed - memory of the worker has been spilled out, as it could send the worker above the - terminate threshold. Two measures are in place in order to prevent this: - - - At every iteration, this policy drops all tasks that have already been replicated - somewhere else. This makes room for further tasks to be moved out of the spill - file in order to be replicated onto another worker. - - Once a worker passes the ``pause`` threshold, + all unique in-memory tasks of the retiring worker. Frequently, this means that in + the next few moments the retiring worker will be bombarded by + :meth:`distributed.worker.Worker.get_data` calls from the rest of the cluster. This + can be a problem if most of the managed memory of the worker has been spilled out, + as it could send the worker above its terminate threshold. Two measures are in place + in order to prevent this: + + - At every iteration, this policy drops all tasks on the retiring worker that have + already been replicated somewhere else. This makes room for further tasks to be + moved out of the spill file in order to be replicated onto another worker. + - Once the worker passes the ``pause`` threshold, :meth:`~distributed.worker.Worker.get_data` throttles the number of outgoing connections to 1. @@ -554,7 +556,7 @@ def __init__(self, address: str): self.no_recipients = False def __repr__(self) -> str: - return f"RetireWorker({self.address})" + return f"RetireWorker({self.address!r})" def run(self): """""" diff --git a/distributed/client.py b/distributed/client.py index 6ab04f9b5c..d3f0427261 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -4103,10 +4103,12 @@ def unsubscribe_topic(self, topic): else: raise ValueError(f"No event handler known for topic {topic}.") - def retire_workers(self, workers=None, close_workers=True, **kwargs): + def retire_workers( + self, workers: list[str] | None = None, close_workers: bool = True, **kwargs + ): """Retire certain workers on the scheduler - See dask.distributed.Scheduler.retire_workers for the full docstring. + See :meth:`distributed.Scheduler.retire_workers` for the full docstring. Parameters ---------- diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 1fb3363ae6..31788d9b21 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -6807,7 +6807,7 @@ async def retire_workers( comm=None, *, workers: "list[str] | None" = None, - names: "list[str] | None" = None, + names: "list | None" = None, close_workers: bool = False, remove: bool = True, **kwargs, @@ -6816,7 +6816,7 @@ async def retire_workers( Parameters ---------- - workers: list (optional) + workers: list[str] (optional) List of worker addresses to retire. names: list (optional) List of worker names to retire. @@ -6858,6 +6858,8 @@ async def retire_workers( raise TypeError("names and workers are mutually exclusive") if names: logger.info("Retire worker names %s", names) + # Support cases where names are passed through a CLI and become + # strings names_set = {str(name) for name in names} wss = { ws @@ -6904,7 +6906,7 @@ async def retire_workers( ) coros.append( - self._retire_worker( + self._track_retire_worker( ws, policy, prev_status=prev_status, @@ -6929,7 +6931,7 @@ async def retire_workers( return workers_info - async def _retire_worker( + async def _track_retire_worker( self, ws: WorkerState, policy: RetireWorker, @@ -6942,7 +6944,8 @@ async def _retire_worker( while not policy.done(): if policy.no_recipients: # Abort retirement. This time we don't need to worry about race - # conditions and we can wait for a round-trip. + # conditions and we can wait for a scheduler->worker->scheduler + # round-trip. self.stream_comms[ws.address].send( {"op": "worker-status-change", "status": prev_status.name} ) diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index cc7dcbd88b..934d13d87b 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -852,7 +852,7 @@ async def test_RetireWorker_no_recipients(c, s, w1, w2, w3, w4): assert not s.extensions["amm"].policies assert set(s.workers) in ({w2.address, w3.address}, {w2.address, w4.address}) # After a Scheduler -> Worker -> WorkerState roundtrip, workers that failed to - # retired went back from closing_gracefully to running and can run tasks + # retire went back from closing_gracefully to running and can run tasks while any(ws.status != Status.running for ws in s.workers.values()): await asyncio.sleep(0.01) assert await c.submit(inc, 1) == 2 @@ -881,7 +881,7 @@ async def test_RetireWorker_all_recipients_are_paused(c, s, a, b): assert set(s.workers) == {a.address, b.address} # After a Scheduler -> Worker -> WorkerState roundtrip, workers that failed to - # retired went back from closing_gracefully to running and can run tasks + # retire went back from closing_gracefully to running and can run tasks while ws_a.status != Status.running: await asyncio.sleep(0.01) assert await c.submit(inc, 1) == 2 @@ -1011,7 +1011,7 @@ async def test_ReduceReplicas_stress(c, s, *nannies): await tensordot_stress(c) -# @pytest.mark.slow +@pytest.mark.slow @pytest.mark.avoid_ci(reason="distributed#5371") @pytest.mark.parametrize("use_ReduceReplicas", [False, True]) @gen_cluster( diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index d39ec54bc3..eca8f8572d 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1640,8 +1640,6 @@ def assert_amm_transfer_story(key: str, w_from: Worker, w_to: Worker) -> None: [ (key, "ensure-task-exists", "released"), (key, "released", "fetch", "fetch", {}), - (key, "fetch", "missing", "missing", {}), - (key, "missing", "fetch", "fetch", {}), ("gather-dependencies", w_from.address, lambda set_: key in set_), (key, "fetch", "flight", "flight", {}), ("request-dep", w_from.address, lambda set_: key in set_),