Skip to content

Commit

Permalink
Unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Nov 21, 2021
1 parent b955440 commit d19e737
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 52 deletions.
82 changes: 54 additions & 28 deletions distributed/active_memory_manager.py
Expand Up @@ -409,16 +409,22 @@ class RetireWorker(ActiveMemoryManagerPolicy):
once the worker has been retired. If the AMM is disabled in the dask config,
:meth:`~distributed.Scheduler.retire_workers` will start a temporary ad-hoc one.
**Hang condition**
**Failure condition**
There may not be any suitable workers to receive the tasks from the retiring worker.
This typically happens in two use cases:
This happens in two use cases:
1. This is the only worker in the cluster, or
2. All workers are either paused or being retired at the same time
In either case, this policy will fail to move out all keys within a single AMM
iteration and try again at the next one (typically 2 seconds later).
In either case, this policy will fail to move out all keys and
Scheduler.retire_workers will abort the retirement. The flag ``no_recipients`` will
be raised.
There is a third use case, where a task fails to be replicated away for whatever
reason; in this case we'll just wait for the next AMM iteration and try again
(possibly with a different receiving worker, e.g. if the receiving worker was
hung but not yet declared dead).
**Retiring a worker with spilled tasks**
Expand All @@ -443,9 +449,11 @@ class RetireWorker(ActiveMemoryManagerPolicy):
"""

address: str
no_recipients: bool

def __init__(self, address: str):
self.address = address
self.no_recipients = False

def __repr__(self) -> str:
return f"RetireWorker({self.address}, done={self.done})"
Expand All @@ -461,40 +469,58 @@ def run(self):
nno_rec = 0

for ts in ws.has_what:
if len(ts.who_has) == 1:
nrepl += 1
pending_repl, _ = self.manager.pending[ts]
if not pending_repl:
rec_ws = (yield "replicate", ts, None)
if not rec_ws:
# replication was rejected by the AMM (see _find_recipient)
nno_rec += 1
else:
# This may be rejected by either the AMM (see _find_dropper) or by the
# Worker; e.g. a running task may depend on this key. If so we'll try
# again at the next iteration. Anyway, this is just to facilitate
# spilled tasks to be moved back into memory and not mandatory for
# retirement.
yield "drop", ts, {ws}
if len(ts.who_has) > 1:
# Use cases:
# 1. The suggestion is accepted by the AMM and by the Worker.
# Replica is dropped.
# 2. The suggestion is accepted by the AMM, but rejected by the Worker.
# We'll try again at the next AMM iteration.
# 3. The suggestion is rejected by the AMM, because:
# 3a. Another policy (e.g. ReduceReplicas) already suggested the same
# 3b. All replicas of the key are on workers being retired, and the
# other RetireWorker instances already asked the same suggestion.
# We need to deal with this case explicitly and create a replica
# elsewhere.
drop_ws = (yield "drop", ts, {ws})
if drop_ws:
continue

# Either the worker holds the only replica or all replicas are being held
# by workers that are being retired
nrepl += 1
pending_repl, _ = self.manager.pending[ts]
if not pending_repl:
rec_ws = (yield "replicate", ts, None)
if not rec_ws:
# replication was rejected by the AMM (see _find_recipient)
nno_rec += 1

if nno_rec:
# All workers are paused or closing_gracefully.
# scheduler.retire_workers will read this flag and exit immediately.
# TODO after we implement the automatic transition of workers from paused
# to closing_gracefully after a timeout expires, we should revisit this
# code to wait for paused workers and only exit immediately if all
# workers are in closing_gracefully status.
self.no_recipients = True
logger.warning(
f"Retiring worker {self.address}; {nrepl - nno_rec} in-memory tasks "
f"are being moved away while {nno_rec} keys won't be moved for now as "
"there are no suitable workers to receive them; this typically happens "
"when this is the only worker on the cluster or all other workers are "
"either paused or retiring themselves. Trying again later..."
f"Tried retiring worker {self.address}, but {nno_rec} tasks could not "
"be moved as there are no suitable workers to receive them. "
"The worker will not be retired."
)
self.manager.policies.remove(self)
elif nrepl:
logger.info(
f"Retiring worker {self.address}; {nrepl} keys are being moved away.",
)
else:
logger.info(
f"Retiring worker {self.address}; no unique keys need to be moved away."
)
self.manager.policies.remove(self)

def done(self) -> bool:
"""Return True if it is safe to close the worker down; False otherwise. True
doesn't necessarily mean that run() won't issue any more suggestions - it could
continue issuing ``drop`` suggestions afterwards.
"""
"""Return True if it is safe to close the worker down; False otherwise"""
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
return True
Expand Down
64 changes: 44 additions & 20 deletions distributed/scheduler.py
Expand Up @@ -6802,8 +6802,6 @@ async def retire_workers(
if not wss:
return {}

workers_info = {ws._address: ws.identity() for ws in wss}

stop_amm = False
amm: ActiveMemoryManagerExtension = self.extensions["amm"]
if not amm.running:
Expand All @@ -6817,12 +6815,39 @@ async def retire_workers(
# migrated to the Active Memory Manager.
async with self._lock:
try:
await asyncio.gather(
*(
self._retire_worker(ws, amm, close_workers, remove)
for ws in wss
coros = []
for ws in wss:
logger.info("Retiring worker %s", ws._address)

policy = RetireWorker(ws._address)
amm.add_policy(policy)

# Change Worker.status to closing_gracefully. Immediately set
# the same on the scheduler to prevent race conditions.
prev_status = ws.status
ws.status = Status.closing_gracefully
self.running.discard(ws)
self.stream_comms[ws.address].send(
{"op": "worker-status-change", "status": ws.status.name}
)
)

coros.append(
self._retire_worker(
ws,
policy,
prev_status=prev_status,
close_workers=close_workers,
remove=remove,
)
)

# Give the AMM a kick, in addition to its periodic running. This is
# to avoid unnecessarily waiting for a potentially arbitrarily long
# time (depending on interval settings)
amm.run_once()

workers_info = dict(await asyncio.gather(*coros))
workers_info.pop(None, None)
finally:
if stop_amm:
amm.stop()
Expand All @@ -6835,24 +6860,22 @@ async def retire_workers(
async def _retire_worker(
self,
ws: WorkerState,
amm: ActiveMemoryManagerExtension,
policy: RetireWorker,
prev_status: Status,
close_workers: bool,
remove: bool,
) -> None:
) -> tuple: # tuple[str | None, dict]
parent: SchedulerState = cast(SchedulerState, self)

logger.info("Retiring worker %s", ws._address)

policy = RetireWorker(ws._address)
amm.add_policy(policy)

ws.status = Status.closing_gracefully
self.running.discard(ws)
self.stream_comms[ws.address].send(
{"op": "worker-status-change", "status": ws.status.name}
)

while not policy.done():
if policy.no_recipients:
# Abort retirement. This time we don't need to worry about race
# conditions and we can wait for a round-trip.
self.stream_comms[ws.address].send(
{"op": "worker-status-change", "status": prev_status.name}
)
return None, {}

# Sleep 0.01s when there's a single in-memory task
# Sleep 2s when there are 200 or more
poll_interval = min(len(ws.has_what) / 100, 2.0)
Expand All @@ -6868,6 +6891,7 @@ async def _retire_worker(
await self.remove_worker(address=ws._address, safe=True)

logger.info("Retired worker %s", ws._address)
return ws._address, ws.identity()

def add_keys(self, comm=None, worker=None, keys=(), stimulus_id=None):
"""
Expand Down

0 comments on commit d19e737

Please sign in to comment.