diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index 3dcf7abb6bb..a1903aab9b9 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from collections import defaultdict from collections.abc import Generator from typing import TYPE_CHECKING @@ -17,6 +18,8 @@ from .client import Client from .scheduler import Scheduler, TaskState, WorkerState +logger = logging.getLogger(__name__) + class ActiveMemoryManagerExtension: """Scheduler extension that optimizes memory usage across the cluster. @@ -49,6 +52,7 @@ def __init__( # The following parameters are exposed so that one may create, run, and throw # away on the fly a specialized manager, separate from the main one. policies: set[ActiveMemoryManagerPolicy] | None = None, + *, register: bool = True, start: bool | None = None, interval: float | None = None, @@ -132,52 +136,9 @@ def run_once(self) -> None: # populate self.pending self._run_policies() - drop_by_worker: ( - defaultdict[WorkerState, set[TaskState]] - ) = defaultdict(set) - repl_by_worker: ( - defaultdict[WorkerState, dict[TaskState, set[str]]] - ) = defaultdict(dict) - - for ts, (pending_repl, pending_drop) in self.pending.items(): - if not ts.who_has: - continue - who_has = {ws_snd.address for ws_snd in ts.who_has - pending_drop} - assert who_has # Never drop the last replica - for ws_rec in pending_repl: - assert ws_rec not in ts.who_has - repl_by_worker[ws_rec][ts] = who_has - for ws in pending_drop: - assert ws in ts.who_has - drop_by_worker[ws].add(ts) - - # Fire-and-forget enact recommendations from policies - stimulus_id = str(time()) - for ws_rec, ts_to_who_has in repl_by_worker.items(): - self.scheduler.stream_comms[ws_rec.address].send( - { - "op": "acquire-replicas", - "keys": [ts.key for ts in ts_to_who_has], - "stimulus_id": "acquire-replicas-" + stimulus_id, - "priorities": {ts.key: ts.priority for ts in ts_to_who_has}, - "who_has": {ts.key: v for ts, v in ts_to_who_has.items()}, - }, - ) - - for ws, tss in drop_by_worker.items(): - # The scheduler immediately forgets about the replica and suggests - # the worker to drop it. The worker may refuse, at which point it - # will send back an add-keys message to reinstate it. - for ts in tss: - self.scheduler.remove_replica(ts, ws) - self.scheduler.stream_comms[ws.address].send( - { - "op": "remove-replicas", - "keys": [ts.key for ts in tss], - "stimulus_id": "remove-replicas-" + stimulus_id, - } - ) - + if self.pending: + logger.debug("Enacting suggestions for %d keys", len(self.pending)) + self._enact_suggestions() finally: del self.workers_memory del self.pending @@ -286,6 +247,54 @@ def _find_dropper( key=lambda ws: (ws.status != Status.running, self.workers_memory[ws]), ) + def _enact_suggestions(self) -> None: + """Iterate through self.pending, which was filled by self._run_policies(), and + push the suggestions to the workers through bulk comms. Return immediately. + """ + drop_by_worker: (defaultdict[WorkerState, set[TaskState]]) = defaultdict(set) + repl_by_worker: ( + defaultdict[WorkerState, dict[TaskState, set[str]]] + ) = defaultdict(dict) + + for ts, (pending_repl, pending_drop) in self.pending.items(): + if not ts.who_has: + continue + who_has = {ws_snd.address for ws_snd in ts.who_has - pending_drop} + assert who_has # Never drop the last replica + for ws_rec in pending_repl: + assert ws_rec not in ts.who_has + repl_by_worker[ws_rec][ts] = who_has + for ws in pending_drop: + assert ws in ts.who_has + drop_by_worker[ws].add(ts) + + # Fire-and-forget enact recommendations from policies + stimulus_id = str(time()) + for ws_rec, ts_to_who_has in repl_by_worker.items(): + self.scheduler.stream_comms[ws_rec.address].send( + { + "op": "acquire-replicas", + "keys": [ts.key for ts in ts_to_who_has], + "stimulus_id": "acquire-replicas-" + stimulus_id, + "priorities": {ts.key: ts.priority for ts in ts_to_who_has}, + "who_has": {ts.key: v for ts, v in ts_to_who_has.items()}, + }, + ) + + for ws, tss in drop_by_worker.items(): + # The scheduler immediately forgets about the replica and suggests the + # worker to drop it. The worker may refuse, at which point it will send back + # an add-keys message to reinstate it. + for ts in tss: + self.scheduler.remove_replica(ts, ws) + self.scheduler.stream_comms[ws.address].send( + { + "op": "remove-replicas", + "keys": [ts.key for ts in tss], + "stimulus_id": "remove-replicas-" + stimulus_id, + } + ) + class ActiveMemoryManagerPolicy: """Abstract parent class""" @@ -374,6 +383,9 @@ class ReduceReplicas(ActiveMemoryManagerPolicy): """ def run(self): + nkeys = 0 + ndrop = 0 + for ts in self.manager.scheduler.replicated_tasks: desired_replicas = 1 # TODO have a marker on TaskState @@ -383,11 +395,94 @@ def run(self): # worker, don't double count them. nwaiters = len({waiter.processing_on or waiter for waiter in ts.waiters}) - ndrop = len(ts.who_has) - max(desired_replicas, nwaiters) + ndrop_key = len(ts.who_has) - max(desired_replicas, nwaiters) if ts in self.manager.pending: pending_repl, pending_drop = self.manager.pending[ts] - ndrop += len(pending_repl) - len(pending_drop) + ndrop_key += len(pending_repl) - len(pending_drop) + + if ndrop_key > 0: + nkeys += 1 + ndrop += ndrop_key + for _ in range(ndrop_key): + yield "drop", ts, None + + if ndrop: + logger.debug("Dropping %d superfluous replicas of %d tasks", ndrop, nkeys) + + +class RetireWorker(ActiveMemoryManagerPolicy): + """Replicate somewhere else all unique keys on a worker, preparing for its shutdown. + Once the worker has been retired, this policy automatically removes itself from the + Active Memory Manager it's attached to. + + **Retiring a worker with spilled keys** + + On its very first iteration, this policy suggests other workers to fetch all unique + in-memory tasks. Frequently, this means that in the next few moments the worker to + be retired will be bombarded with ``Worker.get_data`` calls from the rest of the + cluster. This can be a problem if most of the managed memory of the worker has been + spilled out, as it could send the worker above the terminate threshold. + Two things are in place in order to avoid this: - # ndrop could be negative, which for range() is the same as 0. - for _ in range(ndrop): - yield "drop", ts, None + 1. At every iteration, this policy drops all keys that have already been replicated + somewhere else. This makes room for further keys to be moved out of the spill + file in order to be replicated onto another worker. + 2. Once a worker passes the ``pause`` threshold, ``Worker.get_data`` throttles the + number of outgoing connections to 1. + """ + + address: str + + def __init__(self, address: str): + self.address = address + + def __repr__(self) -> str: + return f"RetireWorker({self.address}, done={self.done})" + + def run(self): + ws = self.manager.scheduler.workers.get(self.address) + if ws is None: + self.manager.policies.remove(self) + return + + n_repl = 0 + n_no_rec = 0 + + for ts in ws.has_what: + if len(ts.who_has) == 1: + n_repl += 1 + pending_repl, _ = self.manager.pending[ts] + if not pending_repl: + rec_ws = (yield "replicate", ts, None) + if not rec_ws: + # replication was rejected by the AMM (see _find_recipient) + n_no_rec += 1 + else: + # This may be rejected by either the AMM (see _find_dropper) or by the + # Worker; e.g. a running task may depend on this key. If so we'll try + # again at the next iteration. Anyway, this is just to allow spilled + # keys to be moved back into memory and not mandatory for retirement. + yield "drop", ts, {ws} + + if n_no_rec: + logger.warning( + f"Retiring worker {self.address}; {n_repl - n_no_rec} keys are being " + f"moved away while {n_no_rec} keys won't be moved for now as there are " + "no suitable workers to receive them; this typically happens when this " + "is the only worker on the cluster or all other workers are either " + "paused or are being shut down themselves. Trying again later..." + ) + elif n_repl: + logger.info( + f"Retiring worker {self.address}; {n_repl} keys are being moved away.", + ) + + def done(self) -> bool: + """Return True if it is safe to close the worker down, or False otherwise. True + doesn't necessarily mean that run() won't issue any more suggestions - it could + continue issuing ``drop`` suggestions afterwards. + """ + ws = self.manager.scheduler.workers.get(self.address) + if ws is None: + return True + return all(len(ts.who_has) > 1 for ts in ws.has_what) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d56cf8e0bfa..9662f1df9dd 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -53,7 +53,7 @@ from . import preloading, profile from . import versions as version_module -from .active_memory_manager import ActiveMemoryManagerExtension +from .active_memory_manager import ActiveMemoryManagerExtension, RetireWorker from .batched import BatchedSend from .comm import ( Comm, @@ -6735,11 +6735,11 @@ def _key(group): async def retire_workers( self, comm=None, - workers=None, - remove=True, - close_workers=False, - names=None, - lock=True, + *, + workers: "list[str] | None" = None, + names: "list[str] | None" = None, + close_workers: bool = False, + remove: bool = True, **kwargs, ) -> dict: """Gracefully retire workers from cluster @@ -6748,16 +6748,18 @@ async def retire_workers( ---------- workers: list (optional) List of worker addresses to retire. - If not provided we call ``workers_to_close`` which finds a good set names: list (optional) List of worker names to retire. - remove: bool (defaults to True) - Whether or not to remove the worker metadata immediately or else - wait for the worker to contact us + Mutually exclusive with ``workers``. + If neither ``workers`` nor ``names`` are provided, we call + ``workers_to_close`` which finds a good set. close_workers: bool (defaults to False) Whether or not to actually close the worker explicitly from here. Otherwise we expect some external job scheduler to finish off the worker. + remove: bool (defaults to True) + Whether or not to remove the worker metadata immediately or else + wait for the worker to contact us **kwargs: dict Extra options to pass to workers_to_close to determine which workers we should drop @@ -6775,78 +6777,97 @@ async def retire_workers( ws: WorkerState ts: TaskState with log_errors(): - async with self._lock if lock else empty_context: - if names is not None: - if workers is not None: - raise TypeError("names and workers are mutually exclusive") - if names: - logger.info("Retire worker names %s", names) - names = set(map(str, names)) - workers = { - ws._address - for ws in parent._workers_dv.values() - if str(ws._name) in names - } - elif workers is None: - while True: - try: - workers = self.workers_to_close(**kwargs) - if not workers: - return {} - return await self.retire_workers( - workers=workers, - remove=remove, - close_workers=close_workers, - lock=False, - ) - except KeyError: # keys left during replicate - pass - - workers = { - parent._workers_dv[w] for w in workers if w in parent._workers_dv + if names is not None: + if workers is not None: + raise TypeError("names and workers are mutually exclusive") + if names: + logger.info("Retire worker names %s", names) + names_set = {str(name) for name in names} + wss = { + ws + for ws in parent._workers_dv.values() + if str(ws._name) in names_set } - if not workers: - return {} - logger.info("Retire workers %s", workers) - - # Keys orphaned by retiring those workers - keys = {k for w in workers for k in w.has_what} - keys = {ts._key for ts in keys if ts._who_has.issubset(workers)} - - if keys: - other_workers = set(parent._workers_dv.values()) - workers - if not other_workers: - return {} - logger.info("Moving %d keys to other workers", len(keys)) - await self.replicate( - keys=keys, - workers=[ws._address for ws in other_workers], - n=1, - delete=False, - lock=False, - ) + elif workers is not None: + wss = { + parent._workers_dv[address] + for address in workers + if address in parent._workers_dv + } + else: + wss = { + parent._workers_dv[address] + for address in self.workers_to_close(**kwargs) + } + if not wss: + return {} - worker_keys = {ws._address: ws.identity() for ws in workers} - if close_workers: - await asyncio.gather( - *[self.close_worker(worker=w, safe=True) for w in worker_keys] - ) - if remove: + workers_info = {ws._address: ws.identity() for ws in wss} + + stop_amm = False + amm: ActiveMemoryManagerExtension = self.extensions["amm"] + if not amm.running: + amm = ActiveMemoryManagerExtension( + self, policies=set(), register=False, start=True, interval=2.0 + ) + stop_amm = True + + # This lock makes retire_workers, rebalance, and replicate mutually + # exclusive and will no longer be necessary once rebalance and replicate are + # migrated to the Active Memory Manager. + async with self._lock: + try: await asyncio.gather( - *[self.remove_worker(address=w, safe=True) for w in worker_keys] + *( + self._retire_worker(ws, amm, close_workers, remove) + for ws in wss + ) ) + finally: + if stop_amm: + amm.stop() - self.log_event( - "all", - { - "action": "retire-workers", - "workers": worker_keys, - "moved-keys": len(keys), - }, - ) - self.log_event(list(worker_keys), {"action": "retired"}) + self.log_event("all", {"action": "retire-workers", "workers": workers_info}) + self.log_event(list(workers_info), {"action": "retired"}) + + return workers_info + + async def _retire_worker( + self, + ws: WorkerState, + amm: ActiveMemoryManagerExtension, + close_workers: bool, + remove: bool, + ) -> None: + parent: SchedulerState = cast(SchedulerState, self) + + logger.info("Retiring worker %s", ws._address) + + policy = RetireWorker(ws._address) + amm.add_policy(policy) + + ws.status = Status.closing_gracefully + self.running.discard(ws) + self.stream_comms[ws.address].send( + {"op": "worker-status-change", "status": ws.status.name} + ) + + while not policy.done(): + # Sleep 0.01s when there's a single in-memory task + # Sleep 2s when there are 200 or more + poll_interval = min(len(ws.has_what) / 100, 2.0) + await asyncio.sleep(poll_interval) + + logger.debug( + "All unique keys on worker %s have been replicated elsewhere", ws._address + ) + + if close_workers and ws._address in parent._workers_dv: + await self.close_worker(worker=ws._address, safe=True) + if remove: + await self.remove_worker(address=ws._address, safe=True) - return worker_keys + logger.info("Retired worker %s", ws._address) def add_keys(self, comm=None, worker=None, keys=(), stimulus_id=None): """ diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index ff56fa9e052..3741597f62d 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -4279,24 +4279,24 @@ async def test_retire_workers_2(c, s, a, b): assert a.address not in s.workers -@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10) -async def test_retire_many_workers(c, s, *workers): - futures = await c.scatter(list(range(100))) - - await s.retire_workers(workers=[w.address for w in workers[:7]]) - - results = await c.gather(futures) - assert results == list(range(100)) - - while len(s.workers) != 3: - await asyncio.sleep(0.01) - - assert len(s.has_what) == len(s.nthreads) == 3 +@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10, Worker=Nanny) +async def test_retire_many_workers(c, s, *nannies): + # When moving data away from a retiring worker, the Active Memory Manager picks + # receiving workers by process memory. Use new processes and chunk sizes large + # enough so that imbalances in unmanaged memory don't cause flakiness. + to_retire = list(s.workers)[:7] + futures = c.map(lambda i: "x" * 2 ** 20, range(500), workers=to_retire) + await wait(futures) + await s.retire_workers(workers=to_retire) + assert len(s.has_what) == len(s.workers) == 3 assert all(future.done() for future in futures) assert all(s.tasks[future.key].state == "memory" for future in futures) + + # Notably, this check would fail if retire_workers() under the hood didn't create a + # single Active Memory Manager with multiple policies that interact with each other for w, keys in s.has_what.items(): - assert 15 < len(keys) < 50 + assert 100 < len(keys) < 250 @gen_cluster( diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index e1d95f91f0c..4d854eadb15 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1565,17 +1565,18 @@ async def test_worker_listens_on_same_interface_by_default(cleanup, Worker): @gen_cluster(client=True) async def test_close_gracefully(c, s, a, b): futures = c.map(slowinc, range(200), delay=0.1) - while not b.data: - await asyncio.sleep(0.1) + while not b.data: + await asyncio.sleep(0.01) mem = set(b.data) - proc = [ts for ts in b.tasks.values() if ts.state == "executing"] + proc = {ts for ts in b.tasks.values() if ts.state == "executing"} + assert proc await b.close_gracefully() assert b.status == Status.closed assert b.address not in s.workers - assert mem.issubset(set(a.data)) + assert mem.issubset(a.data.keys()) for ts in proc: assert ts.state in ("executing", "memory") @@ -1584,11 +1585,15 @@ async def test_close_gracefully(c, s, a, b): @gen_cluster(client=True, nthreads=[]) async def test_lifetime(c, s): async with Worker(s.address) as a, Worker(s.address, lifetime="1 seconds") as b: - futures = c.map(slowinc, range(200), delay=0.1, worker=[b.address]) - await asyncio.sleep(1.5) + futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address]) + await asyncio.sleep(0.6) + assert not a.data + assert b.data + b_keys = set(b.data) + await asyncio.sleep(0.9) assert b.status not in (Status.running, Status.paused) await b.finished() - assert set(b.data) == set(a.data) # successfully moved data over + assert b_keys.issubset(a.data) # successfully moved data over @gen_cluster(worker_kwargs={"lifetime": "10s", "lifetime_stagger": "2s"}) diff --git a/distributed/worker.py b/distributed/worker.py index 943fe41aa16..1f67f247fa0 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -917,6 +917,7 @@ def __init__( "free-keys": self.handle_free_keys, "remove-replicas": self.handle_remove_replicas, "steal-request": self.handle_steal_request, + "worker-status-change": self.handle_worker_status_change, } super().__init__( @@ -1632,8 +1633,12 @@ async def close_gracefully(self, restart=None): restart = self.lifetime_restart logger.info("Closing worker gracefully: %s", self.address) - self.status = Status.closing_gracefully - await self.scheduler.retire_workers(workers=[self.address], remove=False) + # Wait for all tasks to leave the worker and don't accept any new ones. + # Scheduler.retire_workers will set the status to closing_gracefully and push it + # back to this worker. + await self.scheduler.retire_workers( + workers=[self.address], close_workers=False, remove=False + ) await self.close(safe=True, nanny=not restart) async def terminate(self, comm=None, report=True, **kwargs): @@ -3086,6 +3091,22 @@ def handle_steal_request(self, key, stimulus_id): # `transition_constrained_executing` self.transition(ts, "released", stimulus_id=stimulus_id) + def handle_worker_status_change(self, status: str) -> None: + new_status = Status.lookup[status] # type: ignore + + if new_status == Status.closing_gracefully and self._status not in ( + Status.running, + Status.paused, + ): + logger.error( + "Invalid Worker.status transition: %s -> %s", self._status, new_status + ) + # Reiterate the current status to the scheduler to restore sync + self._send_worker_status_change() + else: + # Update status and send confirmation to the Scheduler (see status.setter) + self.status = new_status + def release_key( self, key: str, @@ -3291,7 +3312,7 @@ async def _maybe_deserialize_task(self, ts, *, stimulus_id): raise def ensure_computing(self): - if self.status == Status.paused: + if self.status in (Status.paused, Status.closing_gracefully): return try: stimulus_id = f"ensure-computing-{time()}"