From e1d0e7f0e565c53d03b54c171f34cb5cb56e65bd Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 26 Oct 2021 17:04:36 +0100 Subject: [PATCH] retire_workers to use AMM black doc tweaks --- distributed/active_memory_manager.py | 56 +++++++++ distributed/scheduler.py | 167 ++++++++++++++------------ distributed/worker.py | 24 +++- docs/source/active_memory_manager.rst | 19 +-- 4 files changed, 181 insertions(+), 85 deletions(-) diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index 4b368ff5220..095614d2ea8 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -356,3 +356,59 @@ def run(self): # ndrop could be negative, which for range() is the same as 0. for _ in range(ndrop): yield "drop", ts, None + + +class RetireWorker(ActiveMemoryManagerPolicy): + """Replicate somewhere else all unique keys on a worker, preparing for its shutdown. + Once the worker has been retired, this policy automatically removes itself from the + Active Memory Manager it's attached to. + + **Retiring a worker with spilled keys** + + On its very first iteration, this policy suggests other workers to fetch all unique + in-memory tasks. Frequently, this means that in the next few moments the worker to + be retired will be bombarded with ``Worker.get_data`` calls from the rest of the + cluster. This can be a problem if most of the managed memory of the worker has been + spilled out, as it could send the worker above the terminate threshold. + Two things are in place in order to avoid this: + + 1. At every iteration, this policy drops all keys that have already been replicated + somewhere else. This makes room for further keys to be moved out of the spill + file in order to be replicated onto another worker. + 2. Once a worker passes the ``pause`` threshold, ``Worker.get_data`` throttles the + number of outgoing connections to 1. + """ + + address: str + + def __init__(self, address: str): + self.address = address + + def __repr__(self) -> str: + return f"RetireWorker({self.address}, done={self.done})" + + def run(self): + ws = self.manager.scheduler.workers.get(self.address) + if ws is None: + self.manager.policies.remove(self) + return + + for ts in ws.has_what: + if len(ts.who_has) == 1: + yield "replicate", ts, None + else: + # This may be rejected by either the AMM (see _find_dropper) or by the + # Worker; if so we'll try again at the next iteration + yield "drop", ts, {ws} + + def done(self) -> bool: + """Return True if it is safe to close the worker down, or False otherwise. True + doesn't necessarily mean that run() won't issue any more suggestions - it could + continue issuing ``drop`` suggestions afterwards. + """ + ws = self.manager.scheduler.workers.get(self.address) + if ws is None: + return True + if ws.processing: + return False + return all(len(ts.who_has) > 1 for ts in ws.has_what) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 185ecacc6f6..a5ac892c970 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -51,7 +51,7 @@ from . import preloading, profile from . import versions as version_module -from .active_memory_manager import ActiveMemoryManagerExtension +from .active_memory_manager import ActiveMemoryManagerExtension, RetireWorker from .batched import BatchedSend from .comm import ( get_address_host, @@ -6673,11 +6673,11 @@ def _key(group): async def retire_workers( self, comm=None, - workers=None, - remove=True, - close_workers=False, - names=None, - lock=True, + *, + workers: "list[str] | None" = None, + names: "list[str] | None" = None, + close_workers: bool = False, + remove: bool = True, **kwargs, ) -> dict: """Gracefully retire workers from cluster @@ -6686,16 +6686,18 @@ async def retire_workers( ---------- workers: list (optional) List of worker addresses to retire. - If not provided we call ``workers_to_close`` which finds a good set names: list (optional) List of worker names to retire. - remove: bool (defaults to True) - Whether or not to remove the worker metadata immediately or else - wait for the worker to contact us + Mutually exclusive with ``workers``. + If neither ``workers`` nor ``names`` are provided, we call + ``workers_to_close`` which finds a good set. close_workers: bool (defaults to False) Whether or not to actually close the worker explicitly from here. Otherwise we expect some external job scheduler to finish off the worker. + remove: bool (defaults to True) + Whether or not to remove the worker metadata immediately or else + wait for the worker to contact us **kwargs: dict Extra options to pass to workers_to_close to determine which workers we should drop @@ -6713,78 +6715,91 @@ async def retire_workers( ws: WorkerState ts: TaskState with log_errors(): - async with self._lock if lock else empty_context: - if names is not None: - if workers is not None: - raise TypeError("names and workers are mutually exclusive") - if names: - logger.info("Retire worker names %s", names) - names = set(map(str, names)) - workers = { - ws._address - for ws in parent._workers_dv.values() - if str(ws._name) in names - } - elif workers is None: - while True: - try: - workers = self.workers_to_close(**kwargs) - if not workers: - return {} - return await self.retire_workers( - workers=workers, - remove=remove, - close_workers=close_workers, - lock=False, - ) - except KeyError: # keys left during replicate - pass - - workers = { - parent._workers_dv[w] for w in workers if w in parent._workers_dv + if names is not None: + if workers is not None: + raise TypeError("names and workers are mutually exclusive") + if names: + logger.info("Retire worker names %s", names) + names_set = {str(name) for name in names} + wss = { + ws + for ws in parent._workers_dv.values() + if str(ws._name) in names_set } - if not workers: - return {} - logger.info("Retire workers %s", workers) - - # Keys orphaned by retiring those workers - keys = {k for w in workers for k in w.has_what} - keys = {ts._key for ts in keys if ts._who_has.issubset(workers)} - - if keys: - other_workers = set(parent._workers_dv.values()) - workers - if not other_workers: - return {} - logger.info("Moving %d keys to other workers", len(keys)) - await self.replicate( - keys=keys, - workers=[ws._address for ws in other_workers], - n=1, - delete=False, - lock=False, - ) + elif workers is not None: + wss = { + parent._workers_dv[address] + for address in workers + if address in parent._workers_dv + } + else: + wss = { + parent._workers_dv[address] + for address in self.workers_to_close(**kwargs) + } + if not wss: + return {} - worker_keys = {ws._address: ws.identity() for ws in workers} - if close_workers: - await asyncio.gather( - *[self.close_worker(worker=w, safe=True) for w in worker_keys] - ) - if remove: + workers_info = {ws._address: ws.identity() for ws in wss} + + stop_amm = False + amm: ActiveMemoryManagerExtension = self.extensions["amm"] + if not amm.started: + amm = ActiveMemoryManagerExtension( + self, register=False, start=True, interval=2.0 + ) + stop_amm = True + + # This lock makes retire_workers, rebalance, and replicate mutually + # exclusive and will no longer be necessary once rebalance and replicate are + # migrated to the Active Memory Manager. + async with self._lock: + try: await asyncio.gather( - *[self.remove_worker(address=w, safe=True) for w in worker_keys] + *( + self._retire_worker(ws, amm, close_workers, remove) + for ws in wss + ) ) + finally: + if stop_amm: + amm.stop() - self.log_event( - "all", - { - "action": "retire-workers", - "workers": worker_keys, - "moved-keys": len(keys), - }, - ) - self.log_event(list(worker_keys), {"action": "retired"}) + self.log_event("all", {"action": "retire-workers", "workers": workers_info}) + self.log_event(list(workers_info), {"action": "retired"}) + + return workers_info + + async def _retire_worker( + self, + ws: WorkerState, + amm: ActiveMemoryManagerExtension, + close_workers: bool, + remove: bool, + ) -> None: + logger.info("Retiring worker %s", ws) + + policy = RetireWorker(ws._address) + amm.add_policy(policy) + + ws.status = Status.closing_gracefully + self.running.discard(ws) + self.stream_comms[ws.address].send( + {"op": "worker-status-change", "status": ws.status.name} + ) + + while not policy.done(): + # Sleep 0.1s when there are 10 in-memory tasks or less + # Sleep 3s when there are 300 or more + poll_interval = max(0.1, min(3.0, len(ws.has_what) / 100)) + await asyncio.sleep(poll_interval) + + if close_workers and ws._address in self._workers_dv: + await self.close_worker(worker=ws._address, safe=True) + if remove: + await self.remove_worker(address=ws._address, safe=True) - return worker_keys + logger.info("Retired worker %s", ws) def add_keys(self, comm=None, worker=None, keys=(), stimulus_id=None): """ diff --git a/distributed/worker.py b/distributed/worker.py index af44f1f6609..712caa06485 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -888,6 +888,7 @@ def __init__( "free-keys": self.handle_free_keys, "remove-replicas": self.handle_remove_replicas, "steal-request": self.handle_steal_request, + "worker-status-change": self.handle_worker_status_change, } super().__init__( @@ -1555,7 +1556,9 @@ async def close_gracefully(self, restart=None): restart = self.lifetime_restart logger.info("Closing worker gracefully: %s", self.address) - self.status = Status.closing_gracefully + # Wait for all tasks to leave the worker and don't accept any new ones. + # Scheduler.retire_workers will set the status to closing_gracefully and push it + # back to this worker. await self.scheduler.retire_workers(workers=[self.address], remove=False) await self.close(safe=True, nanny=not restart) @@ -2908,6 +2911,22 @@ def handle_steal_request(self, key, stimulus_id): # `transition_constrained_executing` self.transition(ts, "forgotten", stimulus_id=stimulus_id) + def handle_worker_status_change(self, status: str) -> None: + new_status = Status.lookup[status] # type: ignore + + if new_status == Status.closing_gracefully and self._status not in ( + Status.running, + Status.paused, + ): + logger.error( + "Invalid Worker.status transition: %s -> %s", self._status, new_status + ) + # Reiterate the current status to the scheduler to restore sync + self._send_worker_status_change() + else: + # Update status and send confirmation to the Scheduler (see status.setter) + self.status = new_status + def release_key( self, key: str, @@ -3119,7 +3138,8 @@ async def _maybe_deserialize_task(self, ts, *, stimulus_id): raise def ensure_computing(self): - if self.status == Status.paused: + if self.status in (Status.paused, Status.closing_gracefully): + # Pending tasks shall be stolen return try: stimulus_id = f"ensure-computing-{time()}" diff --git a/docs/source/active_memory_manager.rst b/docs/source/active_memory_manager.rst index 3fd5a68f238..40d2be643df 100644 --- a/docs/source/active_memory_manager.rst +++ b/docs/source/active_memory_manager.rst @@ -7,8 +7,9 @@ usage of workers across the Dask cluster. It is disabled by default. Memory imbalance and duplication -------------------------------- Whenever a Dask task returns data, it is stored on the worker that executed the task for -as long as it's a dependency of other tasks, is referenced by a ``Client`` through a -``Future``, or is part of a :doc:`published dataset `. +as long as it's a dependency of other tasks, is referenced by a +:class:`~distributed.Client` through a :class:`~distributed.Future`, or is part of a +:doc:`published dataset `. Dask assigns tasks to workers following criteria of CPU occupancy, :doc:`resources`, and locality. In the trivial use case of tasks that are not connected to each other, take @@ -38,7 +39,7 @@ The AMM can be enabled through the :doc:`Dask configuration file The above is the recommended setup and will run all enabled *AMM policies* (see below) every two seconds. Alternatively, you can manually start/stop the AMM from the -``Client`` or trigger a one-off iteration: +:class:`~distributed.`Client` or trigger a one-off iteration: .. code-block:: python @@ -58,6 +59,10 @@ long as they don't harm data integrity. These suggestions can be of two types: - Delete one or more replicas of an in-memory task. The AMM will never delete the last replica of a task, even if a policy asks to. +There are no "move" operations. A move is performed in two passes: first a policy +creates a copy; in the next AMM iteration, the same or another policy deletes the +original (if the copy succeeded). + Unless a policy puts constraints on which workers should be impacted, the AMM will automatically create replicas on workers with the lowest memory usage first and delete them from workers with the highest memory usage first. @@ -90,7 +95,7 @@ Built-in policies ReduceReplicas ++++++++++++++ class - ``distributed.active_memory_manager.ReduceReplicas`` + :class:`distributed.active_memory_manager.ReduceReplicas` parameters None @@ -128,7 +133,7 @@ define two methods: Create one replica of the target task on the worker with the lowest memory among the listed candidates. ``yield "drop", , None`` - Delete one replica of the target task one the worker with the highest memory + Delete one replica of the target task on the worker with the highest memory usage across the whole cluster. ``yield "drop", , {, , ...}`` Delete one replica of the target task on the worker with the highest memory @@ -186,8 +191,8 @@ Example The following custom policy ensures that keys "foo" and "bar" are replicated on all workers at all times. New workers will receive a replica soon after connecting to the scheduler. The policy will do nothing if the target keys are not in memory somewhere or -if all workers already hold a replica. -Note that this example is incompatible with the ``ReduceReplicas`` built-in policy. +if all workers already hold a replica. Note that this example is incompatible with the +:class:`~distributed.active_memory_manager.ReduceReplicas built-in policy. In mymodule.py (it must be accessible by the scheduler):