From d3fa246f03f28035a7c6c0636dbe89c4365e5c04 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 3 Dec 2021 10:21:20 +0000 Subject: [PATCH] retire_workers to use AMM polish Split out #5530 docs Unit tests f fix test_lifetime fix tests test unresponsive worker Replace xfail with avoid_ci f --- distributed/active_memory_manager.py | 137 +++++++++- distributed/scheduler.py | 195 ++++++++----- .../tests/test_active_memory_manager.py | 257 +++++++++++++++++- distributed/tests/test_client.py | 28 +- distributed/tests/test_worker.py | 21 +- distributed/worker.py | 27 +- docs/source/active_memory_manager.rst | 5 +- 7 files changed, 562 insertions(+), 108 deletions(-) diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index 22f904ddf96..7531aa97836 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -122,7 +122,7 @@ def add_policy(self, policy: ActiveMemoryManagerPolicy) -> None: def run_once(self) -> None: """Run all policies once and asynchronously (fire and forget) enact their - recommendations to replicate/drop keys + recommendations to replicate/drop tasks """ with log_errors(): # This should never fail since this is a synchronous method @@ -383,6 +383,7 @@ class ReduceReplicas(ActiveMemoryManagerPolicy): """ def run(self): + """""" nkeys = 0 ndrop = 0 @@ -412,3 +413,137 @@ def run(self): ndrop, nkeys, ) + + +class RetireWorker(ActiveMemoryManagerPolicy): + """Replicate somewhere else all unique in-memory tasks on a worker, preparing for + its shutdown. + + At any given time, the AMM may have registered multiple instances of this policy, + one for each worker currently being retired - meaning that most of the time no + instances will be registered at all. For this reason, this policy doesn't figure in + the dask config (:file:`distributed.yaml`). Instances are added by + :meth:`distributed.Scheduler.retire_workers` and automatically remove themselves + once the worker has been retired. If the AMM is disabled in the dask config, + :meth:`~distributed.Scheduler.retire_workers` will start a temporary ad-hoc one. + + **Failure condition** + + There may not be any suitable workers to receive the tasks from the retiring worker. + This happens in two use cases: + + 1. This is the only worker in the cluster, or + 2. All workers are either paused or being retired at the same time + + In either case, this policy will fail to move out all keys and + Scheduler.retire_workers will abort the retirement. The flag ``no_recipients`` will + be raised. + + There is a third use case, where a task fails to be replicated away for whatever + reason; in this case we'll just wait for the next AMM iteration and try again + (possibly with a different receiving worker, e.g. if the receiving worker was + hung but not yet declared dead). + + **Retiring a worker with spilled tasks** + + On its very first iteration, this policy suggests that other workers should fetch + all unique in-memory tasks. Frequently, this means that in the next few moments the + worker to be retired will be bombarded by :meth:`distributed.worker.Worker.get_data` + calls from the rest of the cluster. This can be a problem if most of the managed + memory of the worker has been spilled out, as it could send the worker above the + terminate threshold. Two measures are in place in order to prevent this: + + - At every iteration, this policy drops all tasks that have already been replicated + somewhere else. This makes room for further tasks to be moved out of the spill + file in order to be replicated onto another worker. + - Once a worker passes the ``pause`` threshold, + :meth:`~distributed.worker.Worker.get_data` throttles the number of outgoing + connections to 1. + + Parameters + ========== + address: str + URI of the worker to be retired + """ + + address: str + no_recipients: bool + + def __init__(self, address: str): + self.address = address + self.no_recipients = False + + def __repr__(self) -> str: + return f"RetireWorker({self.address}, done={self.done})" + + def run(self): + """""" + ws = self.manager.scheduler.workers.get(self.address) + if ws is None: + self.manager.policies.remove(self) + return + + nrepl = 0 + nno_rec = 0 + + for ts in ws.has_what: + pending_repl, pending_drop = self.manager.pending[ts] + + if len(ts.who_has) > 1: + # There are already replicas of this key on other workers. + # Use cases: + # 1. The suggestion is accepted by the AMM and by the Worker. + # The replica on this worker is dropped. + # 2. The suggestion is accepted by the AMM, but rejected by the Worker. + # We'll try again at the next AMM iteration. + # 3. The suggestion is rejected by the AMM, because another policy + # (e.g. ReduceReplicas) already suggested the same for this worker + # 4. The suggestion is rejected by the AMM, because all replicas of the + # key are on workers being retired and the other RetireWorker + # instances already asked the same suggestion. We need to deal with + # this case and create a replica elsewhere. + if ws in pending_drop: + continue # Use case 3 + drop_ws = (yield "drop", ts, {ws}) + if drop_ws: + continue # Use case 1 or 2 + + # Either the worker holds the only replica or all replicas are being held + # by workers that are being retired + nrepl += 1 + if not pending_repl: + rec_ws = (yield "replicate", ts, None) + if not rec_ws: + # replication was rejected by the AMM (see _find_recipient) + nno_rec += 1 + + if nno_rec: + # All workers are paused or closing_gracefully. + # Scheduler.retire_workers will read this flag and exit immediately. + # TODO after we implement the automatic transition of workers from paused + # to closing_gracefully after a timeout expires, we should revisit this + # code to wait for paused workers and only exit immediately if all + # workers are in closing_gracefully status. + self.no_recipients = True + logger.warning( + f"Tried retiring worker {self.address}, but {nno_rec} tasks could not " + "be moved as there are no suitable workers to receive them. " + "The worker will not be retired." + ) + self.manager.policies.remove(self) + elif nrepl: + logger.info( + f"Retiring worker {self.address}; {nrepl} keys are being moved away.", + ) + else: + logger.info( + f"Retiring worker {self.address}; no unique keys need to be moved away." + ) + self.manager.policies.remove(self) + + def done(self) -> bool: + """Return True if it is safe to close the worker down; False otherwise""" + ws = self.manager.scheduler.workers.get(self.address) + if ws is None: + return True + return all(len(ts.who_has) > 1 for ts in ws.has_what) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 859699f9b8e..05ab0e8f908 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -53,7 +53,7 @@ from . import preloading, profile from . import versions as version_module -from .active_memory_manager import ActiveMemoryManagerExtension +from .active_memory_manager import ActiveMemoryManagerExtension, RetireWorker from .batched import BatchedSend from .comm import ( Comm, @@ -6740,11 +6740,11 @@ def _key(group): async def retire_workers( self, comm=None, - workers=None, - remove=True, - close_workers=False, - names=None, - lock=True, + *, + workers: "list[str] | None" = None, + names: "list[str] | None" = None, + close_workers: bool = False, + remove: bool = True, **kwargs, ) -> dict: """Gracefully retire workers from cluster @@ -6753,16 +6753,18 @@ async def retire_workers( ---------- workers: list (optional) List of worker addresses to retire. - If not provided we call ``workers_to_close`` which finds a good set names: list (optional) List of worker names to retire. - remove: bool (defaults to True) - Whether or not to remove the worker metadata immediately or else - wait for the worker to contact us + Mutually exclusive with ``workers``. + If neither ``workers`` nor ``names`` are provided, we call + ``workers_to_close`` which finds a good set. close_workers: bool (defaults to False) Whether or not to actually close the worker explicitly from here. Otherwise we expect some external job scheduler to finish off the worker. + remove: bool (defaults to True) + Whether or not to remove the worker metadata immediately or else + wait for the worker to contact us **kwargs: dict Extra options to pass to workers_to_close to determine which workers we should drop @@ -6780,78 +6782,121 @@ async def retire_workers( ws: WorkerState ts: TaskState with log_errors(): - async with self._lock if lock else empty_context: - if names is not None: - if workers is not None: - raise TypeError("names and workers are mutually exclusive") - if names: - logger.info("Retire worker names %s", names) - names = set(map(str, names)) - workers = { - ws._address - for ws in parent._workers_dv.values() - if str(ws._name) in names - } - elif workers is None: - while True: - try: - workers = self.workers_to_close(**kwargs) - if not workers: - return {} - return await self.retire_workers( - workers=workers, - remove=remove, + if names is not None: + if workers is not None: + raise TypeError("names and workers are mutually exclusive") + if names: + logger.info("Retire worker names %s", names) + names_set = {str(name) for name in names} + wss = { + ws + for ws in parent._workers_dv.values() + if str(ws._name) in names_set + } + elif workers is not None: + wss = { + parent._workers_dv[address] + for address in workers + if address in parent._workers_dv + } + else: + wss = { + parent._workers_dv[address] + for address in self.workers_to_close(**kwargs) + } + if not wss: + return {} + + stop_amm = False + amm: ActiveMemoryManagerExtension = self.extensions["amm"] + if not amm.running: + amm = ActiveMemoryManagerExtension( + self, policies=set(), register=False, start=True, interval=2.0 + ) + stop_amm = True + + # This lock makes retire_workers, rebalance, and replicate mutually + # exclusive and will no longer be necessary once rebalance and replicate are + # migrated to the Active Memory Manager. + async with self._lock: + try: + coros = [] + for ws in wss: + logger.info("Retiring worker %s", ws._address) + + policy = RetireWorker(ws._address) + amm.add_policy(policy) + + # Change Worker.status to closing_gracefully. Immediately set + # the same on the scheduler to prevent race conditions. + prev_status = ws.status + ws.status = Status.closing_gracefully + self.running.discard(ws) + self.stream_comms[ws.address].send( + {"op": "worker-status-change", "status": ws.status.name} + ) + + coros.append( + self._retire_worker( + ws, + policy, + prev_status=prev_status, close_workers=close_workers, - lock=False, + remove=remove, ) - except KeyError: # keys left during replicate - pass + ) - workers = { - parent._workers_dv[w] for w in workers if w in parent._workers_dv - } - if not workers: - return {} - logger.info("Retire workers %s", workers) - - # Keys orphaned by retiring those workers - keys = {k for w in workers for k in w.has_what} - keys = {ts._key for ts in keys if ts._who_has.issubset(workers)} - - if keys: - other_workers = set(parent._workers_dv.values()) - workers - if not other_workers: - return {} - logger.info("Moving %d keys to other workers", len(keys)) - await self.replicate( - keys=keys, - workers=[ws._address for ws in other_workers], - n=1, - delete=False, - lock=False, - ) + # Give the AMM a kick, in addition to its periodic running. This is + # to avoid unnecessarily waiting for a potentially arbitrarily long + # time (depending on interval settings) + amm.run_once() - worker_keys = {ws._address: ws.identity() for ws in workers} - if close_workers: - await asyncio.gather( - *[self.close_worker(worker=w, safe=True) for w in worker_keys] - ) - if remove: - await asyncio.gather( - *[self.remove_worker(address=w, safe=True) for w in worker_keys] - ) + workers_info = dict(await asyncio.gather(*coros)) + workers_info.pop(None, None) + finally: + if stop_amm: + amm.stop() - self.log_event( - "all", - { - "action": "retire-workers", - "workers": worker_keys, - "moved-keys": len(keys), - }, + self.log_event("all", {"action": "retire-workers", "workers": workers_info}) + self.log_event(list(workers_info), {"action": "retired"}) + + return workers_info + + async def _retire_worker( + self, + ws: WorkerState, + policy: RetireWorker, + prev_status: Status, + close_workers: bool, + remove: bool, + ) -> tuple: # tuple[str | None, dict] + parent: SchedulerState = cast(SchedulerState, self) + + while not policy.done(): + if policy.no_recipients: + # Abort retirement. This time we don't need to worry about race + # conditions and we can wait for a round-trip. + self.stream_comms[ws.address].send( + {"op": "worker-status-change", "status": prev_status.name} ) - self.log_event(list(worker_keys), {"action": "retired"}) + return None, {} + + # Sleep 0.01s when there are 4 tasks or less + # Sleep 0.5s when there are 200 or more + poll_interval = max(0.01, min(0.5, len(ws.has_what) / 400)) + await asyncio.sleep(poll_interval) + + logger.debug( + "All unique keys on worker %s have been replicated elsewhere", ws._address + ) + + if close_workers and ws._address in parent._workers_dv: + await self.close_worker(worker=ws._address, safe=True) + if remove: + await self.remove_worker(address=ws._address, safe=True) - return worker_keys + logger.info("Retired worker %s", ws._address) + return ws._address, ws.identity() def add_keys(self, comm=None, worker=None, keys=(), stimulus_id=None): """ diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index 80942cf0c19..85d9a2a4554 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -1,10 +1,11 @@ import asyncio import logging import random +from time import sleep import pytest -from distributed import Nanny +from distributed import Nanny, wait from distributed.active_memory_manager import ( ActiveMemoryManagerExtension, ActiveMemoryManagerPolicy, @@ -548,6 +549,218 @@ async def test_ReduceReplicas(c, s, *workers): await asyncio.sleep(0.01) +@pytest.mark.parametrize("start_amm", [False, True]) +@gen_cluster(client=True) +async def test_RetireWorker_amm_on_off(c, s, a, b, start_amm): + """retire_workers must work both with and without the AMM started""" + if start_amm: + await c.amm.start() + else: + await c.amm.stop() + + futures = await c.scatter({"x": 1}, workers=[a.address]) + await c.retire_workers([a.address]) + assert a.address not in s.workers + assert "x" in b.data + + +@gen_cluster( + client=True, + config={ + "distributed.scheduler.active-memory-manager.start": True, + "distributed.scheduler.active-memory-manager.interval": 0.1, + "distributed.scheduler.active-memory-manager.policies": [], + }, +) +async def test_RetireWorker_no_remove(c, s, a, b): + """Test RetireWorker behaviour on retire_workers(..., remove=False)""" + + x = await c.scatter({"x": "x"}, workers=[a.address]) + await c.retire_workers([a.address], close_workers=False, remove=False) + # Wait 2 AMM iterations + # retire_workers may return before all keys have been dropped from a + while s.tasks["x"].who_has != {s.workers[b.address]}: + await asyncio.sleep(0.01) + assert a.address in s.workers + # Policy has been removed without waiting for worker to disappear from + # Scheduler.workers + assert not s.extensions["amm"].policies + + +@pytest.mark.slow +@pytest.mark.parametrize("use_ReduceReplicas", [False, True]) +@gen_cluster( + client=True, + Worker=Nanny, + config={ + "distributed.scheduler.active-memory-manager.start": True, + "distributed.scheduler.active-memory-manager.interval": 0.1, + "distributed.scheduler.active-memory-manager.policies": [ + {"class": "distributed.active_memory_manager.ReduceReplicas"}, + ], + }, +) +async def test_RetireWorker_with_ReduceReplicas(c, s, *nannies, use_ReduceReplicas): + """RetireWorker and ReduceReplicas work well with each other. + + If ReduceReplicas is enabled, + 1. On the first AMM iteration, either ReduceReplicas or RetireWorker (arbitrarily + depending on which comes first in the iteration of + ActiveMemoryManagerExtension.policies) deletes non-unique keys, choosing from + workers to be retired first. At the same time, RetireWorker replicates unique + keys. + 2. On the second AMM iteration, either ReduceReplicas or RetireWorker deletes the + keys replicated at the previous round from the worker to be retired. + + If ReduceReplicas is not enabled, all drops are performed by RetireWorker. + + This test fundamentally relies on workers in the process of being retired to be + always picked first by ActiveMemoryManagerExtension._find_dropper. + """ + ws_a, ws_b = s.workers.values() + if not use_ReduceReplicas: + s.extensions["amm"].policies.clear() + + x = c.submit(lambda: "x" * 2 ** 26, key="x", workers=[ws_a.address]) # 64 MiB + y = c.submit(lambda: "y" * 2 ** 26, key="y", workers=[ws_a.address]) # 64 MiB + z = c.submit(lambda x: None, x, key="z", workers=[ws_b.address]) # copy x to ws_b + # Make sure that the worker NOT being retired has the most RAM usage to test that + # it is not being picked first since there's a retiring worker. + w = c.submit(lambda: "w" * 2 ** 28, key="w", workers=[ws_b.address]) # 256 MiB + await wait([x, y, z, w]) + + await c.retire_workers([ws_a.address], remove=False) + # retire_workers may return before all keys have been dropped from a + while ws_a.has_what: + await asyncio.sleep(0.01) + assert {ts.key for ts in ws_b.has_what} == {"x", "y", "z", "w"} + + +@gen_cluster(client=True, nthreads=[("", 1)] * 3, config=NO_AMM_START) +async def test_RetireWorker_all_replicas_are_being_retired(c, s, w1, w2, w3): + """There are multiple replicas of a key, but they all reside on workers that are + being retired + """ + ws1 = s.workers[w1.address] + ws2 = s.workers[w2.address] + ws3 = s.workers[w3.address] + fut = await c.scatter({"x": "x"}, workers=[w1.address, w2.address], broadcast=True) + assert s.tasks["x"].who_has == {ws1, ws2} + await c.retire_workers([w1.address, w2.address]) + assert s.tasks["x"].who_has == {ws3} + + +@gen_cluster( + client=True, + nthreads=[("", 1)] * 4, + config={ + "distributed.scheduler.active-memory-manager.start": True, + # test that we're having a manual amm.run_once() "kick" from retire_workers + "distributed.scheduler.active-memory-manager.interval": 999, + "distributed.scheduler.active-memory-manager.policies": [], + }, +) +async def test_RetireWorker_no_recipients(c, s, w1, w2, w3, w4): + """All workers are retired at once. + + Test use cases: + 1. (w1) worker contains no data -> it is retired + 2. (w2) worker contains unique data -> it is not retired + 3. (w3, w4) worker contains non-unique data, but all replicas are on workers that + are being retired -> all but one are retired + """ + x = await c.scatter({"x": "x"}, workers=[w2.address]) + y = await c.scatter({"y": "y"}, workers=[w3.address, w4.address], broadcast=True) + + print(w1.address, w2.address, w3.address, w4.address) + + out = await c.retire_workers([w1.address, w2.address, w3.address, w4.address]) + + assert out.keys() in ({w1.address, w3.address}, {w1.address, w4.address}) + assert not s.extensions["amm"].policies + assert set(s.workers) in ({w2.address, w3.address}, {w2.address, w4.address}) + # After a Scheduler -> Worker -> WorkerState roundtrip, workers that failed to + # retired went back from closing_gracefully to running and can run tasks + while any(ws.status != Status.running for ws in s.workers.values()): + await asyncio.sleep(0.01) + assert await c.submit(inc, 1) == 2 + + +@gen_cluster( + client=True, + config={ + "distributed.scheduler.active-memory-manager.start": True, + "distributed.scheduler.active-memory-manager.interval": 999, + "distributed.scheduler.active-memory-manager.policies": [], + }, +) +async def test_RetireWorker_all_recipients_are_paused(c, s, a, b): + ws_a = s.workers[a.address] + ws_b = s.workers[b.address] + + b.memory_pause_fraction = 1e-15 + while ws_b.status != Status.paused: + await asyncio.sleep(0.01) + + x = await c.scatter("x", workers=[a.address]) + out = await c.retire_workers([a.address]) + assert out == {} + assert not s.extensions["amm"].policies + assert set(s.workers) == {a.address, b.address} + + # After a Scheduler -> Worker -> WorkerState roundtrip, workers that failed to + # retired went back from closing_gracefully to running and can run tasks + while ws_a.status != Status.running: + await asyncio.sleep(0.01) + assert await c.submit(inc, 1) == 2 + + +@pytest.mark.slow +@gen_cluster( + client=True, + Worker=Nanny, + nthreads=[("", 1)] * 3, + config={ + # FIXME an unresponsive worker takes ~88s to be removed from the scheduler. + # The below setting seems to do nothing. + "distributed.scheduler.worker-ttl": "5s", + "distributed.scheduler.active-memory-manager.start": True, + "distributed.scheduler.active-memory-manager.interval": 0.1, + "distributed.scheduler.active-memory-manager.policies": [], + }, + timeout=240, +) +async def test_RetireWorker_faulty_recipient(c, s, *nannies): + """RetireWorker requests to replicate a key onto a unresponsive worker. + The AMM will iterate multiple times, repeating the command, until eventually the + scheduler declares the worker dead and removes it from the pool; at that point the + AMM will choose another valid worker and complete the job. + """ + # ws1 is being retired + # ws2 has the lowest RAM usage and is chosen as a recipient, but is unresponsive + ws1, ws2, ws3 = s.workers.values() + f = c.submit(lambda: "x", key="x", workers=[ws1.address]) + await wait(f) + assert s.tasks["x"].who_has == {ws1} + + # Fill ws3 with 400 MB of managed memory + # We're using plenty to make sure it's safely more than the unmanaged memory of ws2 + clutter = c.map(lambda i: "x" * 4_000_000, range(50), workers=[ws3.address]) + await wait([f] + clutter) + while ws3.memory.process < 200_000_000: + # Wait for heartbeat + await asyncio.sleep(0.01) + assert ws2.memory.process < ws3.memory.process + + # Make ws2 unresponsive + cripple_fut = asyncio.create_task(c.run(sleep, 3600, workers=[ws2.address])) + await asyncio.sleep(0.5) + + await c.retire_workers([ws1.address]) + assert ws1.address not in s.workers + assert s.tasks["x"].who_has == {ws3} + + class DropEverything(ActiveMemoryManagerPolicy): """Inanely suggest to drop every single key in the cluster""" @@ -570,7 +783,7 @@ def run(self): self.manager.policies.remove(self) -async def _tensordot_stress(c): +async def tensordot_stress(c): da = pytest.importorskip("dask.array") rng = da.random.RandomState(0) @@ -580,7 +793,7 @@ async def _tensordot_stress(c): @pytest.mark.slow -@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5371") +@pytest.mark.avoid_ci(reason="distributed#5371") @gen_cluster( client=True, nthreads=[("", 1)] * 4, @@ -600,11 +813,11 @@ async def test_drop_stress(c, s, *nannies): See also: test_ReduceReplicas_stress """ - await _tensordot_stress(c) + await tensordot_stress(c) @pytest.mark.slow -@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5371") +@pytest.mark.avoid_ci(reason="distributed#5371") @gen_cluster( client=True, nthreads=[("", 1)] * 4, @@ -623,4 +836,36 @@ async def test_ReduceReplicas_stress(c, s, *nannies): test_drop_stress above, this test does not stop running after a few seconds - the policy must not disrupt the computation too much. """ - await _tensordot_stress(c) + await tensordot_stress(c) + + +@pytest.mark.slow +@pytest.mark.avoid_ci(reason="distributed#5371") +@gen_cluster( + client=True, + nthreads=[("", 1)] * 10, + Worker=Nanny, + config={ + "distributed.scheduler.active-memory-manager.start": True, + "distributed.scheduler.active-memory-manager.interval": 0.1, + "distributed.scheduler.active-memory-manager.policies": [ + # Also test interactions between RetireWorker and ReduceReplicas + {"class": "distributed.active_memory_manager.ReduceReplicas"}, + ], + }, + timeout=120, +) +async def test_RetireWorker_stress(c, s, *nannies): + """It is safe to retire the best part of a cluster in the middle of a computation""" + addrs = list(s.workers) + random.shuffle(addrs) + + tasks = [asyncio.create_task(tensordot_stress(c))] + await asyncio.sleep(1) + tasks.append(asyncio.create_task(c.retire_workers(addrs[0:2]))) + await asyncio.sleep(1) + tasks.append(asyncio.create_task(c.retire_workers(addrs[2:5]))) + await asyncio.sleep(1) + tasks.append(asyncio.create_task(c.retire_workers(addrs[5:9]))) + + await asyncio.gather(*tasks) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 4209b801446..db58e82beb2 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -4280,24 +4280,24 @@ async def test_retire_workers_2(c, s, a, b): assert a.address not in s.workers -@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10) -async def test_retire_many_workers(c, s, *workers): - futures = await c.scatter(list(range(100))) - - await s.retire_workers(workers=[w.address for w in workers[:7]]) - - results = await c.gather(futures) - assert results == list(range(100)) - - while len(s.workers) != 3: - await asyncio.sleep(0.01) - - assert len(s.has_what) == len(s.nthreads) == 3 +@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10, Worker=Nanny) +async def test_retire_many_workers(c, s, *nannies): + # When moving data away from a retiring worker, the Active Memory Manager picks + # receiving workers by process memory. Use new processes and chunk sizes large + # enough so that imbalances in unmanaged memory don't cause flakiness. + to_retire = list(s.workers)[:7] + futures = c.map(lambda i: "x" * 2 ** 20, range(500), workers=to_retire) + await wait(futures) + await s.retire_workers(workers=to_retire) + assert len(s.has_what) == len(s.workers) == 3 assert all(future.done() for future in futures) assert all(s.tasks[future.key].state == "memory" for future in futures) + + # Notably, this check would fail if retire_workers() under the hood didn't create a + # single Active Memory Manager with multiple policies that interact with each other for w, keys in s.has_what.items(): - assert 15 < len(keys) < 50 + assert 100 < len(keys) < 250 @gen_cluster( diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index e1d95f91f0c..75aa818d836 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1565,17 +1565,18 @@ async def test_worker_listens_on_same_interface_by_default(cleanup, Worker): @gen_cluster(client=True) async def test_close_gracefully(c, s, a, b): futures = c.map(slowinc, range(200), delay=0.1) - while not b.data: - await asyncio.sleep(0.1) + while not b.data: + await asyncio.sleep(0.01) mem = set(b.data) - proc = [ts for ts in b.tasks.values() if ts.state == "executing"] + proc = {ts for ts in b.tasks.values() if ts.state == "executing"} + assert proc await b.close_gracefully() assert b.status == Status.closed assert b.address not in s.workers - assert mem.issubset(set(a.data)) + assert mem.issubset(a.data.keys()) for ts in proc: assert ts.state in ("executing", "memory") @@ -1584,11 +1585,15 @@ async def test_close_gracefully(c, s, a, b): @gen_cluster(client=True, nthreads=[]) async def test_lifetime(c, s): async with Worker(s.address) as a, Worker(s.address, lifetime="1 seconds") as b: - futures = c.map(slowinc, range(200), delay=0.1, worker=[b.address]) - await asyncio.sleep(1.5) - assert b.status not in (Status.running, Status.paused) + futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address]) + await asyncio.sleep(0.5) + assert not a.data + assert b.data + b_keys = set(b.data) + while b.status == Status.running: + await asyncio.sleep(0.01) await b.finished() - assert set(b.data) == set(a.data) # successfully moved data over + assert b_keys.issubset(a.data) # successfully moved data over from b to a @gen_cluster(worker_kwargs={"lifetime": "10s", "lifetime_stagger": "2s"}) diff --git a/distributed/worker.py b/distributed/worker.py index 42b16c1adb5..e1c1446570c 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -910,6 +910,7 @@ def __init__( "free-keys": self.handle_free_keys, "remove-replicas": self.handle_remove_replicas, "steal-request": self.handle_steal_request, + "worker-status-change": self.handle_worker_status_change, } super().__init__( @@ -1624,8 +1625,12 @@ async def close_gracefully(self, restart=None): restart = self.lifetime_restart logger.info("Closing worker gracefully: %s", self.address) - self.status = Status.closing_gracefully - await self.scheduler.retire_workers(workers=[self.address], remove=False) + # Wait for all tasks to leave the worker and don't accept any new ones. + # Scheduler.retire_workers will set the status to closing_gracefully and push it + # back to this worker. + await self.scheduler.retire_workers( + workers=[self.address], close_workers=False, remove=False + ) await self.close(safe=True, nanny=not restart) async def terminate(self, comm=None, report=True, **kwargs): @@ -3081,6 +3086,22 @@ def handle_steal_request(self, key, stimulus_id): # `transition_constrained_executing` self.transition(ts, "released", stimulus_id=stimulus_id) + def handle_worker_status_change(self, status: str) -> None: + new_status = Status.lookup[status] # type: ignore + + if new_status == Status.closing_gracefully and self._status not in ( + Status.running, + Status.paused, + ): + logger.error( + "Invalid Worker.status transition: %s -> %s", self._status, new_status + ) + # Reiterate the current status to the scheduler to restore sync + self._send_worker_status_change() + else: + # Update status and send confirmation to the Scheduler (see status.setter) + self.status = new_status + def release_key( self, key: str, @@ -3286,7 +3307,7 @@ async def _maybe_deserialize_task(self, ts, *, stimulus_id): raise def ensure_computing(self): - if self.status == Status.paused: + if self.status in (Status.paused, Status.closing_gracefully): return try: stimulus_id = f"ensure-computing-{time()}" diff --git a/docs/source/active_memory_manager.rst b/docs/source/active_memory_manager.rst index 8f515204f1a..7e839f20085 100644 --- a/docs/source/active_memory_manager.rst +++ b/docs/source/active_memory_manager.rst @@ -254,6 +254,9 @@ API reference .. autoclass:: distributed.active_memory_manager.AMMClientProxy :members: - :undoc-members: .. autoclass:: distributed.active_memory_manager.ReduceReplicas + :members: + +.. autoclass:: distributed.active_memory_manager.RetireWorker + :members: