Skip to content

Commit

Permalink
Unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Nov 21, 2021
1 parent b955440 commit f1de6c0
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 37 deletions.
42 changes: 29 additions & 13 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,16 +409,22 @@ class RetireWorker(ActiveMemoryManagerPolicy):
once the worker has been retired. If the AMM is disabled in the dask config,
:meth:`~distributed.Scheduler.retire_workers` will start a temporary ad-hoc one.
**Hang condition**
**Failure condition**
There may not be any suitable workers to receive the tasks from the retiring worker.
This typically happens in two use cases:
This happens in two use cases:
1. This is the only worker in the cluster, or
2. All workers are either paused or being retired at the same time
In either case, this policy will fail to move out all keys within a single AMM
iteration and try again at the next one (typically 2 seconds later).
In either case, this policy will fail to move out all keys and
Scheduler.retire_workers will abort the retirement. The flag ``no_recipients`` will
be raised.
There is a third use case, where a task fails to be replicated away for whatever
reason; in this case we'll just wait for the next AMM iteration and try again
(possibly with a different receiving worker, e.g. if the receiving worker was
hung but not yet declared dead).
**Retiring a worker with spilled tasks**
Expand All @@ -443,9 +449,11 @@ class RetireWorker(ActiveMemoryManagerPolicy):
"""

address: str
no_recipients: bool

def __init__(self, address: str):
self.address = address
self.no_recipients = False

def __repr__(self) -> str:
return f"RetireWorker({self.address}, done={self.done})"
Expand Down Expand Up @@ -478,23 +486,31 @@ def run(self):
yield "drop", ts, {ws}

if nno_rec:
# All workers are paused or closing_gracefully.
# scheduler.retire_workers will read this flag and exit immediately.
# TODO after we implement the automatic transition of workers from paused
# to closing_gracefully after a timeout expires, we should revisit this
# code to wait for paused workers and only exit immediately if all
# workers are in closing_gracefully status.
self.no_recipients = True
logger.warning(
f"Retiring worker {self.address}; {nrepl - nno_rec} in-memory tasks "
f"are being moved away while {nno_rec} keys won't be moved for now as "
"there are no suitable workers to receive them; this typically happens "
"when this is the only worker on the cluster or all other workers are "
"either paused or retiring themselves. Trying again later..."
f"Tried retiring worker {self.address}, but {nno_rec} tasks could not "
"be moved as there are no suitable workers to receive them. "
"The worker will not be retired."
)
self.manager.policies.remove(self)
elif nrepl:
logger.info(
f"Retiring worker {self.address}; {nrepl} keys are being moved away.",
)
else:
logger.info(
f"Retiring worker {self.address}; no unique keys need to be moved away."
)
self.manager.policies.remove(self)

def done(self) -> bool:
"""Return True if it is safe to close the worker down; False otherwise. True
doesn't necessarily mean that run() won't issue any more suggestions - it could
continue issuing ``drop`` suggestions afterwards.
"""
"""Return True if it is safe to close the worker down; False otherwise"""
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
return True
Expand Down
64 changes: 44 additions & 20 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6802,8 +6802,6 @@ async def retire_workers(
if not wss:
return {}

workers_info = {ws._address: ws.identity() for ws in wss}

stop_amm = False
amm: ActiveMemoryManagerExtension = self.extensions["amm"]
if not amm.running:
Expand All @@ -6817,12 +6815,39 @@ async def retire_workers(
# migrated to the Active Memory Manager.
async with self._lock:
try:
await asyncio.gather(
*(
self._retire_worker(ws, amm, close_workers, remove)
for ws in wss
coros = []
for ws in wss:
logger.info("Retiring worker %s", ws._address)

policy = RetireWorker(ws._address)
amm.add_policy(policy)

# Change Worker.status to closing_gracefully. Immediately set
# the same on the scheduler to prevent race conditions.
prev_status = ws.status
ws.status = Status.closing_gracefully
self.running.discard(ws)
self.stream_comms[ws.address].send(
{"op": "worker-status-change", "status": ws.status.name}
)
)

coros.append(
self._retire_worker(
ws,
policy,
prev_status=prev_status,
close_workers=close_workers,
remove=remove,
)
)

# Give the AMM a kick, in addition to its periodic running. This is
# to avoid unnecessarily waiting for a potentially arbitrarily long
# time (depending on interval settings)
amm.run_once()

workers_info = dict(await asyncio.gather(*coros))
workers_info.pop(None, None)
finally:
if stop_amm:
amm.stop()
Expand All @@ -6835,24 +6860,22 @@ async def retire_workers(
async def _retire_worker(
self,
ws: WorkerState,
amm: ActiveMemoryManagerExtension,
policy: RetireWorker,
prev_status: Status,
close_workers: bool,
remove: bool,
) -> None:
) -> tuple: # tuple[str | None, dict]
parent: SchedulerState = cast(SchedulerState, self)

logger.info("Retiring worker %s", ws._address)

policy = RetireWorker(ws._address)
amm.add_policy(policy)

ws.status = Status.closing_gracefully
self.running.discard(ws)
self.stream_comms[ws.address].send(
{"op": "worker-status-change", "status": ws.status.name}
)

while not policy.done():
if policy.no_recipients:
# Abort retirement. This time we don't need to worry about race
# conditions and we can wait for a round-trip.
self.stream_comms[ws.address].send(
{"op": "worker-status-change", "status": prev_status.name}
)
return None, {}

# Sleep 0.01s when there's a single in-memory task
# Sleep 2s when there are 200 or more
poll_interval = min(len(ws.has_what) / 100, 2.0)
Expand All @@ -6868,6 +6891,7 @@ async def _retire_worker(
await self.remove_worker(address=ws._address, safe=True)

logger.info("Retired worker %s", ws._address)
return ws._address, ws.identity()

def add_keys(self, comm=None, worker=None, keys=(), stimulus_id=None):
"""
Expand Down
178 changes: 174 additions & 4 deletions distributed/tests/test_active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from distributed import Nanny
from distributed import Nanny, wait
from distributed.active_memory_manager import (
ActiveMemoryManagerExtension,
ActiveMemoryManagerPolicy,
Expand Down Expand Up @@ -524,6 +524,144 @@ async def test_ReduceReplicas(c, s, *workers):
await asyncio.sleep(0.01)


@pytest.mark.parametrize("start_amm", [False, True])
@gen_cluster(client=True)
async def test_RetireWorker_amm_on_off(c, s, a, b, start_amm):
"""retire_workers must work both with and without the AMM started"""
if start_amm:
await c.amm.start()
else:
await c.amm.stop()

futures = await c.scatter({"x": 1}, workers=[a.address])
await c.retire_workers([a.address])
assert a.address not in s.workers
assert "x" in b.data


@gen_cluster(
client=True,
config={
"distributed.scheduler.active-memory-manager.start": True,
"distributed.scheduler.active-memory-manager.interval": 0.1,
"distributed.scheduler.active-memory-manager.policies": [],
},
)
async def test_RetireWorker_no_remove(c, s, a, b):
"""Test RetireWorker behaviour on retire_workers(..., remove=False)"""

x = await c.scatter({"x": "x"}, workers=[a.address])
await c.retire_workers([a.address], close_workers=False, remove=False)
# Wait 2 AMM iterations
# retire_workers may return before all keys have been dropped from a
while s.tasks["x"].who_has != {s.workers[b.address]}:
await asyncio.sleep(0.01)
assert a.address in s.workers
# Policy has been removed without waiting for worker to disappear from
# Scheduler.workers
assert not s.extensions["amm"].policies


@pytest.mark.slow
@pytest.mark.parametrize("use_ReduceReplicas", [False, True])
@gen_cluster(
client=True,
Worker=Nanny,
config={
"distributed.scheduler.active-memory-manager.start": True,
"distributed.scheduler.active-memory-manager.interval": 0.1,
"distributed.scheduler.active-memory-manager.policies": [
{"class": "distributed.active_memory_manager.ReduceReplicas"},
],
},
)
async def test_RetireWorker_with_ReduceReplicas(c, s, *nannies, use_ReduceReplicas):
"""RetireWorker and ReduceReplicas work well with each other.
If ReduceReplicas is enabled,
1. On the first AMM iteration, either ReduceReplicas or RetireWorker (arbitrarily
depending on which comes first in the iteration of
ActiveMemoryManagerExtension.policies) deletes non-unique keys, choosing from
workers to be retired first. At the same time, RetireWorker replicates unique
keys.
2. On the second AMM iteration, either ReduceReplicas or RetireWorker deletes the
keys replicated at the previous round from the worker to be retired.
If ReduceReplicas is not enabled, all drops are performed by RetireWorker.
This test fundamentally relies on workers in the process of being retired to be
always picked first by ActiveMemoryManagerExtension._find_dropper.
"""
ws_a, ws_b = s.workers.values()
if not use_ReduceReplicas:
s.extensions["amm"].policies.clear()

x = c.submit(lambda: "x" * 2 ** 26, key="x", workers=[ws_a.address]) # 64 MiB
y = c.submit(lambda: "y" * 2 ** 26, key="y", workers=[ws_a.address]) # 64 MiB
z = c.submit(lambda x: None, x, key="z", workers=[ws_b.address]) # copy x to ws_b
# Make sure that the worker NOT being retired has the most RAM usage to test that
# it is not being picked first since there's a retiring worker.
w = c.submit(lambda: "w" * 2 ** 28, key="w", workers=[ws_b.address]) # 256 MiB
await wait([x, y, z, w])

await c.retire_workers([ws_a.address], remove=False)
# retire_workers may return before all keys have been dropped from a
while ws_a.has_what:
await asyncio.sleep(0.01)
assert {ts.key for ts in ws_b.has_what} == {"x", "y", "z", "w"}


@gen_cluster(client=True, nthreads=[("", 1)] * 3, config=NO_AMM_START)
async def test_RetireWorker_all_replicas_are_being_retired(c, s, w1, w2, w3):
"""There are multiple replicas of a key, but they all reside on workers that are
being retired
"""
fut = await c.scatter({"x": "x"}, workers=[w1.address, w2.address], broadcast=True)
await c.retire_workers([w1.address, w2.address])
assert s.tasks["x"].who_has == {s.workers[w3.address]}


@gen_cluster(client=True, nthreads=[("", 1)] * 4, config=NO_AMM_START)
async def test_RetireWorker_no_recipients(c, s, w1, w2, w3, w4):
"""All workers are retired at once.
Test use cases:
1. (w1) worker contains no data -> it is retired
2. (w2) worker contains unique data -> it is not retired
3. (w3, w4) worker contains non-unique data, but all replicas are on workers that
are being retired -> it is not retired
"""
x = await c.scatter("x", workers=[w2.address])
y = await c.scatter("y", workers=[w3.address, w4.address], broadcast=True)

out = await c.retire_workers([w1.address, w2.address, w3.address, w4.address])
assert out.keys() == {w1.address}
assert not s.extensions["amm"].policies
assert set(s.workers) == {w2.address, w3.address, w4.address}
while (ws.status != Status.running for ws in s.workers.values()):
# Must wait for a roundtrip Scheduler -> Worker -> WorkerState
await asyncio.sleep(0.01)


@gen_cluster(client=True, config=NO_AMM_START)
async def test_RetireWorker_all_recipients_are_paused(c, s, a, b):
ws_a = s.workers[a.address]
ws_b = s.workers[b.address]

b.memory_pause_fraction = 1e-15
while ws_b.status != Status.paused:
await asyncio.sleep(0.01)

x = await c.scatter("x", workers=[a.address])
out = await c.retire_workers([a.address])
assert out == {}
assert not s.extensions["amm"].policies
assert set(s.workers) == {a.address, b.address}
while ws_a.status != Status.running:
# Must wait for a roundtrip Scheduler -> Worker -> WorkerState
await asyncio.sleep(0.01)


class DropEverything(ActiveMemoryManagerPolicy):
"""Inanely suggest to drop every single key in the cluster"""

Expand All @@ -546,7 +684,7 @@ def run(self):
self.manager.policies.remove(self)


async def _tensordot_stress(c):
async def tensordot_stress(c):
da = pytest.importorskip("dask.array")

rng = da.random.RandomState(0)
Expand Down Expand Up @@ -576,7 +714,7 @@ async def test_drop_stress(c, s, *nannies):
See also: test_ReduceReplicas_stress
"""
await _tensordot_stress(c)
await tensordot_stress(c)


@pytest.mark.slow
Expand All @@ -599,4 +737,36 @@ async def test_ReduceReplicas_stress(c, s, *nannies):
test_drop_stress above, this test does not stop running after a few seconds - the
policy must not disrupt the computation too much.
"""
await _tensordot_stress(c)
await tensordot_stress(c)


@pytest.mark.slow
@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5371")
@gen_cluster(
client=True,
nthreads=[("", 1)] * 10,
Worker=Nanny,
config={
"distributed.scheduler.active-memory-manager.start": True,
"distributed.scheduler.active-memory-manager.interval": 0.1,
"distributed.scheduler.active-memory-manager.policies": [
# Also test interactions between RetireWorker and ReduceReplicas
{"class": "distributed.active_memory_manager.ReduceReplicas"},
],
},
timeout=120,
)
async def test_RetireWorkers_stress(c, s, *nannies):
"""It is safe to retire the best part of a cluster in the middle of a computation"""
addrs = list(s.workers)
random.shuffle(addrs)

tasks = [asyncio.create_task(tensordot_stress(c))]
await asyncio.sleep(1)
tasks.append(asyncio.create_task(c.retire_workers(addrs[0:2])))
await asyncio.sleep(1)
tasks.append(asyncio.create_task(c.retire_workers(addrs[2:5])))
await asyncio.sleep(1)
tasks.append(asyncio.create_task(c.retire_workers(addrs[5:9])))

await asyncio.gather(*tasks)

0 comments on commit f1de6c0

Please sign in to comment.