From 1579834ded1a70193b2df2e9b18d6cecf7697fb7 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 19 Oct 2021 14:43:49 +0100 Subject: [PATCH] retire_workers to use AMM --- distributed/active_memory_manager.py | 56 +++++++++ distributed/scheduler.py | 167 +++++++++++++++------------ distributed/worker.py | 26 ++++- 3 files changed, 171 insertions(+), 78 deletions(-) diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index 0bcd83522ca..86571c88bce 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -344,3 +344,59 @@ def run(self): # ndrop could be negative, which for range() is the same as 0. for _ in range(ndrop): yield "drop", ts, None + + +class RetireWorker(ActiveMemoryManagerPolicy): + """Replicate somewhere else all unique keys on a worker, preparing for its shutdown. + Once the worker has been retired, this policy automatically removes itself from the + Active Memory Manager it's attached to. + + **Retiring a worker with spilled keys** + + On its very first iteration, this policy suggests other workers to fetch all unique + in-memory tasks. Frequently, this means that in the next few moments the worker to + be retired will be bombarded with ``Worker.get_data`` calls from the rest of the + cluster. This can be a problem if most of the managed memory of the worker has been + spilled out, as it could send the worker above the terminate threshold. + Two things are in place in order to avoid this: + + 1. At every iteration, this policy drops all keys that have already been replicated + somewhere else. This makes room for further keys to be moved out of the spill + file in order to be replicated onto another worker. + 2. Once a worker passes the ``pause`` threshold, ``Worker.get_data`` throttles the + number of outgoing connections to 1. + """ + + address: str + + def __init__(self, address: str): + self.address = address + + def __repr__(self) -> str: + return f"RetireWorker({self.address}, done={self.done})" + + def run(self): + ws = self.manager.scheduler.workers.get(self.address) + if ws is None: + self.manager.policies.remove(self) + return + + for ts in ws.has_what: + if len(ts.who_has) == 1: + yield "replicate", ts, None + else: + # This may be rejected by either the AMM (see _find_dropper) or by the + # Worker; if so we'll try again at the next iteration + yield "drop", ts, {ws} + + def done(self) -> bool: + """Return True if it is safe to close the worker down, or False otherwise. True + doesn't necessarily mean that run() won't issue any more suggestions - it could + continue issuing ``drop`` suggestions afterwards. + """ + ws = self.manager.scheduler.workers.get(self.address) + if ws is None: + return True + if ws.processing: + return False + return all(len(ts.who_has) > 1 for ts in ws.has_what) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index fd873469af9..26e10bbe00c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -51,7 +51,7 @@ from . import preloading, profile from . import versions as version_module -from .active_memory_manager import ActiveMemoryManagerExtension +from .active_memory_manager import ActiveMemoryManagerExtension, RetireWorker from .batched import BatchedSend from .comm import ( get_address_host, @@ -6610,11 +6610,11 @@ def _key(group): async def retire_workers( self, comm=None, - workers=None, - remove=True, - close_workers=False, - names=None, - lock=True, + *, + workers: "list[str] | None" = None, + names: "list[str] | None" = None, + close_workers: bool = False, + remove: bool = True, **kwargs, ) -> dict: """Gracefully retire workers from cluster @@ -6623,16 +6623,18 @@ async def retire_workers( ---------- workers: list (optional) List of worker addresses to retire. - If not provided we call ``workers_to_close`` which finds a good set names: list (optional) List of worker names to retire. - remove: bool (defaults to True) - Whether or not to remove the worker metadata immediately or else - wait for the worker to contact us + Mutually exclusive with ``workers``. + If neither ``workers`` nor ``names`` are provided, we call + ``workers_to_close`` which finds a good set. close_workers: bool (defaults to False) Whether or not to actually close the worker explicitly from here. Otherwise we expect some external job scheduler to finish off the worker. + remove: bool (defaults to True) + Whether or not to remove the worker metadata immediately or else + wait for the worker to contact us **kwargs: dict Extra options to pass to workers_to_close to determine which workers we should drop @@ -6650,78 +6652,91 @@ async def retire_workers( ws: WorkerState ts: TaskState with log_errors(): - async with self._lock if lock else empty_context: - if names is not None: - if workers is not None: - raise TypeError("names and workers are mutually exclusive") - if names: - logger.info("Retire worker names %s", names) - names = set(map(str, names)) - workers = { - ws._address - for ws in parent._workers_dv.values() - if str(ws._name) in names - } - elif workers is None: - while True: - try: - workers = self.workers_to_close(**kwargs) - if not workers: - return {} - return await self.retire_workers( - workers=workers, - remove=remove, - close_workers=close_workers, - lock=False, - ) - except KeyError: # keys left during replicate - pass - - workers = { - parent._workers_dv[w] for w in workers if w in parent._workers_dv + if names is not None: + if workers is not None: + raise TypeError("names and workers are mutually exclusive") + if names: + logger.info("Retire worker names %s", names) + names_set = {str(name) for name in names} + wss = { + ws + for ws in parent._workers_dv.values() + if str(ws._name) in names_set } - if not workers: - return {} - logger.info("Retire workers %s", workers) - - # Keys orphaned by retiring those workers - keys = {k for w in workers for k in w.has_what} - keys = {ts._key for ts in keys if ts._who_has.issubset(workers)} - - if keys: - other_workers = set(parent._workers_dv.values()) - workers - if not other_workers: - return {} - logger.info("Moving %d keys to other workers", len(keys)) - await self.replicate( - keys=keys, - workers=[ws._address for ws in other_workers], - n=1, - delete=False, - lock=False, - ) + elif workers is not None: + wss = { + parent._workers_dv[address] + for address in workers + if address in parent._workers_dv + } + else: + wss = { + parent._workers_dv[address] + for address in self.workers_to_close(**kwargs) + } + if not wss: + return {} - worker_keys = {ws._address: ws.identity() for ws in workers} - if close_workers: - await asyncio.gather( - *[self.close_worker(worker=w, safe=True) for w in worker_keys] - ) - if remove: + workers_info = {ws._address: ws.identity() for ws in wss} + + stop_amm = False + amm: ActiveMemoryManagerExtension = self.extensions["amm"] + if not amm.started: + amm = ActiveMemoryManagerExtension( + self, register=False, start=True, interval=2.0 + ) + stop_amm = True + + # This lock makes retire_workers, rebalance, and replicate mutually + # exclusive and will no longer be necessary once rebalance and replicate are + # migrated to the Active Memory Manager. + async with self._lock: + try: await asyncio.gather( - *[self.remove_worker(address=w, safe=True) for w in worker_keys] + *( + self._retire_worker(ws, amm, close_workers, remove) + for ws in wss + ) ) + finally: + if stop_amm: + amm.stop() - self.log_event( - "all", - { - "action": "retire-workers", - "workers": worker_keys, - "moved-keys": len(keys), - }, - ) - self.log_event(list(worker_keys), {"action": "retired"}) + self.log_event("all", {"action": "retire-workers", "workers": workers_info}) + self.log_event(list(workers_info), {"action": "retired"}) + + return workers_info + + async def _retire_worker( + self, + ws: WorkerState, + amm: ActiveMemoryManagerExtension, + close_workers: bool, + remove: bool, + ) -> None: + logger.info("Retiring worker %s", ws) + + policy = RetireWorker(ws._address) + amm.add_policy(policy) + + ws.status = Status.closing_gracefully + self.running.discard(ws) + self.stream_comms[ws.address].send( + {"op": "worker-status-change", "status": ws.status.name} + ) + + while not policy.done(): + # Sleep 0.1s when there are 10 in-memory tasks or less + # Sleep 3s when there are 300 or more + poll_interval = max(0.1, min(3.0, len(ws.has_what) / 100)) + await asyncio.sleep(poll_interval) + + if close_workers and ws._address in self._workers_dv: + await self.close_worker(worker=ws._address, safe=True) + if remove: + await self.remove_worker(address=ws._address, safe=True) - return worker_keys + logger.info("Retired worker %s", ws) def add_keys(self, comm=None, worker=None, keys=(), stimulus_id=None): """ diff --git a/distributed/worker.py b/distributed/worker.py index 9fca11e9f8b..df3d3e03eca 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -885,6 +885,7 @@ def __init__( "free-keys": self.handle_free_keys, "remove-replicas": self.handle_remove_replicas, "steal-request": self.handle_steal_request, + "worker-status-change": self.handle_worker_status_change, } super().__init__( @@ -1551,7 +1552,9 @@ async def close_gracefully(self, restart=None): restart = self.lifetime_restart logger.info("Closing worker gracefully: %s", self.address) - self.status = Status.closing_gracefully + # Wait for all tasks to leave the worker and don't accept any new ones. + # Scheduler.retire_workers will set the status to closing_gracefully and push it + # back to this worker. await self.scheduler.retire_workers(workers=[self.address], remove=False) await self.close(safe=True, nanny=not restart) @@ -2862,6 +2865,24 @@ def handle_steal_request(self, key, stimulus_id): # `transition_constrained_executing` self.transition(ts, "forgotten", stimulus_id=stimulus_id) + def handle_worker_status_change(self, status: str) -> None: + new_status = Status.lookup[status] # type: ignore + + if new_status == Status.closing_gracefully and self.status not in ( + Status.running, + Status.paused, + ): + logger.error( + "Invalid Worker.status transition: %s -> %s", self.status, new_status + ) + # Reiterate the current status to the scheduler to restore sync + # (see status.setter) + self.status = self.status + return + + # Update status and send confirmation to the Scheduler (see status.setter). + self.status = new_status + def release_key( self, key: str, @@ -3073,7 +3094,8 @@ async def _maybe_deserialize_task(self, ts, *, stimulus_id): raise def ensure_computing(self): - if self.status == Status.paused: + if self.status in (Status.paused, Status.closing_gracefully): + # Pending tasks shall be stolen return try: stimulus_id = f"ensure-computing-{time()}"