From d19e737990819f27c3ab1547c424dc45e9704ad2 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Sun, 21 Nov 2021 20:41:57 +0000 Subject: [PATCH] Unit tests --- distributed/active_memory_manager.py | 82 +++++--- distributed/scheduler.py | 64 ++++-- .../tests/test_active_memory_manager.py | 182 +++++++++++++++++- 3 files changed, 276 insertions(+), 52 deletions(-) diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index d302ec3fdbb..82385354d4d 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -409,16 +409,22 @@ class RetireWorker(ActiveMemoryManagerPolicy): once the worker has been retired. If the AMM is disabled in the dask config, :meth:`~distributed.Scheduler.retire_workers` will start a temporary ad-hoc one. - **Hang condition** + **Failure condition** There may not be any suitable workers to receive the tasks from the retiring worker. - This typically happens in two use cases: + This happens in two use cases: 1. This is the only worker in the cluster, or 2. All workers are either paused or being retired at the same time - In either case, this policy will fail to move out all keys within a single AMM - iteration and try again at the next one (typically 2 seconds later). + In either case, this policy will fail to move out all keys and + Scheduler.retire_workers will abort the retirement. The flag ``no_recipients`` will + be raised. + + There is a third use case, where a task fails to be replicated away for whatever + reason; in this case we'll just wait for the next AMM iteration and try again + (possibly with a different receiving worker, e.g. if the receiving worker was + hung but not yet declared dead). **Retiring a worker with spilled tasks** @@ -443,9 +449,11 @@ class RetireWorker(ActiveMemoryManagerPolicy): """ address: str + no_recipients: bool def __init__(self, address: str): self.address = address + self.no_recipients = False def __repr__(self) -> str: return f"RetireWorker({self.address}, done={self.done})" @@ -461,40 +469,58 @@ def run(self): nno_rec = 0 for ts in ws.has_what: - if len(ts.who_has) == 1: - nrepl += 1 - pending_repl, _ = self.manager.pending[ts] - if not pending_repl: - rec_ws = (yield "replicate", ts, None) - if not rec_ws: - # replication was rejected by the AMM (see _find_recipient) - nno_rec += 1 - else: - # This may be rejected by either the AMM (see _find_dropper) or by the - # Worker; e.g. a running task may depend on this key. If so we'll try - # again at the next iteration. Anyway, this is just to facilitate - # spilled tasks to be moved back into memory and not mandatory for - # retirement. - yield "drop", ts, {ws} + if len(ts.who_has) > 1: + # Use cases: + # 1. The suggestion is accepted by the AMM and by the Worker. + # Replica is dropped. + # 2. The suggestion is accepted by the AMM, but rejected by the Worker. + # We'll try again at the next AMM iteration. + # 3. The suggestion is rejected by the AMM, because: + # 3a. Another policy (e.g. ReduceReplicas) already suggested the same + # 3b. All replicas of the key are on workers being retired, and the + # other RetireWorker instances already asked the same suggestion. + # We need to deal with this case explicitly and create a replica + # elsewhere. + drop_ws = (yield "drop", ts, {ws}) + if drop_ws: + continue + + # Either the worker holds the only replica or all replicas are being held + # by workers that are being retired + nrepl += 1 + pending_repl, _ = self.manager.pending[ts] + if not pending_repl: + rec_ws = (yield "replicate", ts, None) + if not rec_ws: + # replication was rejected by the AMM (see _find_recipient) + nno_rec += 1 if nno_rec: + # All workers are paused or closing_gracefully. + # scheduler.retire_workers will read this flag and exit immediately. + # TODO after we implement the automatic transition of workers from paused + # to closing_gracefully after a timeout expires, we should revisit this + # code to wait for paused workers and only exit immediately if all + # workers are in closing_gracefully status. + self.no_recipients = True logger.warning( - f"Retiring worker {self.address}; {nrepl - nno_rec} in-memory tasks " - f"are being moved away while {nno_rec} keys won't be moved for now as " - "there are no suitable workers to receive them; this typically happens " - "when this is the only worker on the cluster or all other workers are " - "either paused or retiring themselves. Trying again later..." + f"Tried retiring worker {self.address}, but {nno_rec} tasks could not " + "be moved as there are no suitable workers to receive them. " + "The worker will not be retired." ) + self.manager.policies.remove(self) elif nrepl: logger.info( f"Retiring worker {self.address}; {nrepl} keys are being moved away.", ) + else: + logger.info( + f"Retiring worker {self.address}; no unique keys need to be moved away." + ) + self.manager.policies.remove(self) def done(self) -> bool: - """Return True if it is safe to close the worker down; False otherwise. True - doesn't necessarily mean that run() won't issue any more suggestions - it could - continue issuing ``drop`` suggestions afterwards. - """ + """Return True if it is safe to close the worker down; False otherwise""" ws = self.manager.scheduler.workers.get(self.address) if ws is None: return True diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9662f1df9dd..657d072d78a 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -6802,8 +6802,6 @@ async def retire_workers( if not wss: return {} - workers_info = {ws._address: ws.identity() for ws in wss} - stop_amm = False amm: ActiveMemoryManagerExtension = self.extensions["amm"] if not amm.running: @@ -6817,12 +6815,39 @@ async def retire_workers( # migrated to the Active Memory Manager. async with self._lock: try: - await asyncio.gather( - *( - self._retire_worker(ws, amm, close_workers, remove) - for ws in wss + coros = [] + for ws in wss: + logger.info("Retiring worker %s", ws._address) + + policy = RetireWorker(ws._address) + amm.add_policy(policy) + + # Change Worker.status to closing_gracefully. Immediately set + # the same on the scheduler to prevent race conditions. + prev_status = ws.status + ws.status = Status.closing_gracefully + self.running.discard(ws) + self.stream_comms[ws.address].send( + {"op": "worker-status-change", "status": ws.status.name} ) - ) + + coros.append( + self._retire_worker( + ws, + policy, + prev_status=prev_status, + close_workers=close_workers, + remove=remove, + ) + ) + + # Give the AMM a kick, in addition to its periodic running. This is + # to avoid unnecessarily waiting for a potentially arbitrarily long + # time (depending on interval settings) + amm.run_once() + + workers_info = dict(await asyncio.gather(*coros)) + workers_info.pop(None, None) finally: if stop_amm: amm.stop() @@ -6835,24 +6860,22 @@ async def retire_workers( async def _retire_worker( self, ws: WorkerState, - amm: ActiveMemoryManagerExtension, + policy: RetireWorker, + prev_status: Status, close_workers: bool, remove: bool, - ) -> None: + ) -> tuple: # tuple[str | None, dict] parent: SchedulerState = cast(SchedulerState, self) - logger.info("Retiring worker %s", ws._address) - - policy = RetireWorker(ws._address) - amm.add_policy(policy) - - ws.status = Status.closing_gracefully - self.running.discard(ws) - self.stream_comms[ws.address].send( - {"op": "worker-status-change", "status": ws.status.name} - ) - while not policy.done(): + if policy.no_recipients: + # Abort retirement. This time we don't need to worry about race + # conditions and we can wait for a round-trip. + self.stream_comms[ws.address].send( + {"op": "worker-status-change", "status": prev_status.name} + ) + return None, {} + # Sleep 0.01s when there's a single in-memory task # Sleep 2s when there are 200 or more poll_interval = min(len(ws.has_what) / 100, 2.0) @@ -6868,6 +6891,7 @@ async def _retire_worker( await self.remove_worker(address=ws._address, safe=True) logger.info("Retired worker %s", ws._address) + return ws._address, ws.identity() def add_keys(self, comm=None, worker=None, keys=(), stimulus_id=None): """ diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index 34165bd0522..9d9e813c6b3 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -3,7 +3,7 @@ import pytest -from distributed import Nanny +from distributed import Nanny, wait from distributed.active_memory_manager import ( ActiveMemoryManagerExtension, ActiveMemoryManagerPolicy, @@ -524,6 +524,148 @@ async def test_ReduceReplicas(c, s, *workers): await asyncio.sleep(0.01) +@pytest.mark.parametrize("start_amm", [False, True]) +@gen_cluster(client=True) +async def test_RetireWorker_amm_on_off(c, s, a, b, start_amm): + """retire_workers must work both with and without the AMM started""" + if start_amm: + await c.amm.start() + else: + await c.amm.stop() + + futures = await c.scatter({"x": 1}, workers=[a.address]) + await c.retire_workers([a.address]) + assert a.address not in s.workers + assert "x" in b.data + + +@gen_cluster( + client=True, + config={ + "distributed.scheduler.active-memory-manager.start": True, + "distributed.scheduler.active-memory-manager.interval": 0.1, + "distributed.scheduler.active-memory-manager.policies": [], + }, +) +async def test_RetireWorker_no_remove(c, s, a, b): + """Test RetireWorker behaviour on retire_workers(..., remove=False)""" + + x = await c.scatter({"x": "x"}, workers=[a.address]) + await c.retire_workers([a.address], close_workers=False, remove=False) + # Wait 2 AMM iterations + # retire_workers may return before all keys have been dropped from a + while s.tasks["x"].who_has != {s.workers[b.address]}: + await asyncio.sleep(0.01) + assert a.address in s.workers + # Policy has been removed without waiting for worker to disappear from + # Scheduler.workers + assert not s.extensions["amm"].policies + + +@pytest.mark.slow +@pytest.mark.parametrize("use_ReduceReplicas", [False, True]) +@gen_cluster( + client=True, + Worker=Nanny, + config={ + "distributed.scheduler.active-memory-manager.start": True, + "distributed.scheduler.active-memory-manager.interval": 0.1, + "distributed.scheduler.active-memory-manager.policies": [ + {"class": "distributed.active_memory_manager.ReduceReplicas"}, + ], + }, +) +async def test_RetireWorker_with_ReduceReplicas(c, s, *nannies, use_ReduceReplicas): + """RetireWorker and ReduceReplicas work well with each other. + + If ReduceReplicas is enabled, + 1. On the first AMM iteration, either ReduceReplicas or RetireWorker (arbitrarily + depending on which comes first in the iteration of + ActiveMemoryManagerExtension.policies) deletes non-unique keys, choosing from + workers to be retired first. At the same time, RetireWorker replicates unique + keys. + 2. On the second AMM iteration, either ReduceReplicas or RetireWorker deletes the + keys replicated at the previous round from the worker to be retired. + + If ReduceReplicas is not enabled, all drops are performed by RetireWorker. + + This test fundamentally relies on workers in the process of being retired to be + always picked first by ActiveMemoryManagerExtension._find_dropper. + """ + ws_a, ws_b = s.workers.values() + if not use_ReduceReplicas: + s.extensions["amm"].policies.clear() + + x = c.submit(lambda: "x" * 2 ** 26, key="x", workers=[ws_a.address]) # 64 MiB + y = c.submit(lambda: "y" * 2 ** 26, key="y", workers=[ws_a.address]) # 64 MiB + z = c.submit(lambda x: None, x, key="z", workers=[ws_b.address]) # copy x to ws_b + # Make sure that the worker NOT being retired has the most RAM usage to test that + # it is not being picked first since there's a retiring worker. + w = c.submit(lambda: "w" * 2 ** 28, key="w", workers=[ws_b.address]) # 256 MiB + await wait([x, y, z, w]) + + await c.retire_workers([ws_a.address], remove=False) + # retire_workers may return before all keys have been dropped from a + while ws_a.has_what: + await asyncio.sleep(0.01) + assert {ts.key for ts in ws_b.has_what} == {"x", "y", "z", "w"} + + +@gen_cluster(client=True, nthreads=[("", 1)] * 3, config=NO_AMM_START) +async def test_RetireWorker_all_replicas_are_being_retired(c, s, w1, w2, w3): + """There are multiple replicas of a key, but they all reside on workers that are + being retired + """ + ws1 = s.workers[w1.address] + ws2 = s.workers[w2.address] + ws3 = s.workers[w3.address] + fut = await c.scatter({"x": "x"}, workers=[w1.address, w2.address], broadcast=True) + assert s.tasks["x"].who_has == {ws1, ws2} + await c.retire_workers([w1.address, w2.address]) + assert s.tasks["x"].who_has == {ws3} + + +@gen_cluster(client=True, nthreads=[("", 1)] * 4, config=NO_AMM_START) +async def test_RetireWorker_no_recipients(c, s, w1, w2, w3, w4): + """All workers are retired at once. + + Test use cases: + 1. (w1) worker contains no data -> it is retired + 2. (w2) worker contains unique data -> it is not retired + 3. (w3, w4) worker contains non-unique data, but all replicas are on workers that + are being retired -> it is not retired + """ + x = await c.scatter("x", workers=[w2.address]) + y = await c.scatter("y", workers=[w3.address, w4.address], broadcast=True) + + out = await c.retire_workers([w1.address, w2.address, w3.address, w4.address]) + assert out.keys() == {w1.address} + assert not s.extensions["amm"].policies + assert set(s.workers) == {w2.address, w3.address, w4.address} + while (ws.status != Status.running for ws in s.workers.values()): + # Must wait for a roundtrip Scheduler -> Worker -> WorkerState + await asyncio.sleep(0.01) + + +@gen_cluster(client=True, config=NO_AMM_START) +async def test_RetireWorker_all_recipients_are_paused(c, s, a, b): + ws_a = s.workers[a.address] + ws_b = s.workers[b.address] + + b.memory_pause_fraction = 1e-15 + while ws_b.status != Status.paused: + await asyncio.sleep(0.01) + + x = await c.scatter("x", workers=[a.address]) + out = await c.retire_workers([a.address]) + assert out == {} + assert not s.extensions["amm"].policies + assert set(s.workers) == {a.address, b.address} + while ws_a.status != Status.running: + # Must wait for a roundtrip Scheduler -> Worker -> WorkerState + await asyncio.sleep(0.01) + + class DropEverything(ActiveMemoryManagerPolicy): """Inanely suggest to drop every single key in the cluster""" @@ -546,7 +688,7 @@ def run(self): self.manager.policies.remove(self) -async def _tensordot_stress(c): +async def tensordot_stress(c): da = pytest.importorskip("dask.array") rng = da.random.RandomState(0) @@ -576,7 +718,7 @@ async def test_drop_stress(c, s, *nannies): See also: test_ReduceReplicas_stress """ - await _tensordot_stress(c) + await tensordot_stress(c) @pytest.mark.slow @@ -599,4 +741,36 @@ async def test_ReduceReplicas_stress(c, s, *nannies): test_drop_stress above, this test does not stop running after a few seconds - the policy must not disrupt the computation too much. """ - await _tensordot_stress(c) + await tensordot_stress(c) + + +@pytest.mark.slow +@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5371") +@gen_cluster( + client=True, + nthreads=[("", 1)] * 10, + Worker=Nanny, + config={ + "distributed.scheduler.active-memory-manager.start": True, + "distributed.scheduler.active-memory-manager.interval": 0.1, + "distributed.scheduler.active-memory-manager.policies": [ + # Also test interactions between RetireWorker and ReduceReplicas + {"class": "distributed.active_memory_manager.ReduceReplicas"}, + ], + }, + timeout=120, +) +async def test_RetireWorkers_stress(c, s, *nannies): + """It is safe to retire the best part of a cluster in the middle of a computation""" + addrs = list(s.workers) + random.shuffle(addrs) + + tasks = [asyncio.create_task(tensordot_stress(c))] + await asyncio.sleep(1) + tasks.append(asyncio.create_task(c.retire_workers(addrs[0:2]))) + await asyncio.sleep(1) + tasks.append(asyncio.create_task(c.retire_workers(addrs[2:5]))) + await asyncio.sleep(1) + tasks.append(asyncio.create_task(c.retire_workers(addrs[5:9]))) + + await asyncio.gather(*tasks)