New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AMM: Graceful Worker Retirement #5381
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -312,8 +312,7 @@ def log_reject(msg: str) -> None: | |
# THEN temporarily keep the extra replica on the candidate with status=running. | ||
# | ||
# This prevents a ping-pong effect between ReduceReplicas (or any other policy | ||
# that yields drop commands with multiple candidates) and RetireWorker | ||
# (to be later introduced by https://github.com/dask/distributed/pull/5381): | ||
# that yields drop commands with multiple candidates) and RetireWorker: | ||
# 1. RetireWorker replicates in-memory tasks from worker A (very busy and being | ||
# retired) to worker B (idle) | ||
# 2. on the next AMM iteration 2 seconds later, ReduceReplicas drops the same | ||
|
@@ -494,3 +493,151 @@ def run(self): | |
ndrop, | ||
nkeys, | ||
) | ||
|
||
|
||
class RetireWorker(ActiveMemoryManagerPolicy): | ||
"""Replicate somewhere else all unique in-memory tasks on a worker, preparing for | ||
its shutdown. | ||
|
||
At any given time, the AMM may have registered multiple instances of this policy, | ||
one for each worker currently being retired - meaning that most of the time no | ||
instances will be registered at all. For this reason, this policy doesn't figure in | ||
the dask config (:file:`distributed.yaml`). Instances are added by | ||
:meth:`distributed.Scheduler.retire_workers` and automatically remove themselves | ||
once the worker has been retired. If the AMM is disabled in the dask config, | ||
:meth:`~distributed.Scheduler.retire_workers` will start a temporary ad-hoc one. | ||
|
||
**Failure condition** | ||
|
||
There may not be any suitable workers to receive the tasks from the retiring worker. | ||
This happens in two use cases: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this respect resources or host restrictions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't. The current implementation of It would be good to hear from the GPU people who use a hybrid cluster if they have a use case of GPU data that will fail to unpickle on non-GPU workers (different answers for different libraries I guess?), but then again, nobody complained until today so I guess the answer is no? At any rate this PR does not introduce a regression. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I was worried about GPUs as well but I agree this is not our concern right now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @quasiben for visibility. If we need "Respect resources and restrictions in retirement" we should file an extra ticket. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think the current resources design is fit for purpose. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I'm not entirely sure if resources are perfect. host and worker restrictions, maybe. |
||
|
||
1. This is the only worker in the cluster, or | ||
2. All workers are either paused or being retired at the same time | ||
|
||
In either case, this policy will fail to move out all keys and set the | ||
``no_recipients`` boolean to True. :meth:`~distributed.Scheduler.retire_workers` | ||
will abort the retirement. | ||
|
||
There is a third use case, where a task fails to be replicated away for whatever | ||
reason, e.g. because its recipient is unresponsive but the Scheduler doesn't know | ||
yet. In this case we'll just wait for the next AMM iteration and try again (possibly | ||
with a different receiving worker, e.g. if the receiving worker was hung but not yet | ||
declared dead). | ||
|
||
**Retiring a worker with spilled tasks** | ||
|
||
On its very first iteration, this policy suggests that other workers should fetch | ||
all unique in-memory tasks of the retiring worker. Frequently, this means that in | ||
the next few moments the retiring worker will be bombarded by | ||
:meth:`distributed.worker.Worker.get_data` calls from the rest of the cluster. This | ||
can be a problem if most of the managed memory of the worker has been spilled out, | ||
as it could send the worker above its terminate threshold. Two measures are in place | ||
in order to prevent this: | ||
|
||
- At every iteration, this policy drops all tasks on the retiring worker that have | ||
already been replicated somewhere else. This makes room for further tasks to be | ||
moved out of the spill file in order to be replicated onto another worker. | ||
- Once the worker passes the ``pause`` threshold, | ||
:meth:`~distributed.worker.Worker.get_data` throttles the number of outgoing | ||
connections to 1. | ||
|
||
Parameters | ||
========== | ||
address: str | ||
URI of the worker to be retired | ||
""" | ||
|
||
address: str | ||
no_recipients: bool | ||
|
||
def __init__(self, address: str): | ||
self.address = address | ||
self.no_recipients = False | ||
|
||
def __repr__(self) -> str: | ||
return f"RetireWorker({self.address!r})" | ||
|
||
def run(self): | ||
"""""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. docstring? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's to prevent sphinx from repeating the docstring from its parent. I didn't write anything as there isn't anything to add compared to the method being overridden + the class docstring. |
||
ws = self.manager.scheduler.workers.get(self.address) | ||
if ws is None: | ||
logger.debug("Removing policy %s: Worker no longer in cluster", self) | ||
self.manager.policies.remove(self) | ||
return | ||
|
||
nrepl = 0 | ||
nno_rec = 0 | ||
|
||
logger.debug("Retiring %s", ws) | ||
for ts in ws.has_what: | ||
if len(ts.who_has) > 1: | ||
# There are already replicas of this key on other workers. | ||
# Suggest dropping the replica from this worker. | ||
# Use cases: | ||
# 1. The suggestion is accepted by the AMM and by the Worker. | ||
# The replica on this worker is dropped. | ||
# 2. The suggestion is accepted by the AMM, but rejected by the Worker. | ||
# We'll try again at the next AMM iteration. | ||
# 3. The suggestion is rejected by the AMM, because another policy | ||
# (e.g. ReduceReplicas) already suggested the same for this worker | ||
# 4. The suggestion is rejected by the AMM, because the task has | ||
# dependents queued or running on the same worker. | ||
# We'll try again at the next AMM iteration. | ||
# 5. The suggestion is rejected by the AMM, because all replicas of the | ||
# key are on workers being retired and the other RetireWorker | ||
# instances already made the same suggestion. We need to deal with | ||
# this case and create a replica elsewhere. | ||
drop_ws = (yield "drop", ts, {ws}) | ||
if drop_ws: | ||
continue # Use case 1 or 2 | ||
if ts.who_has & self.manager.scheduler.running: | ||
continue # Use case 3 or 4 | ||
# Use case 5 | ||
|
||
# Either the worker holds the only replica or all replicas are being held | ||
# by workers that are being retired | ||
nrepl += 1 | ||
# Don't create an unnecessary additional replica if another policy already | ||
# asked for one | ||
try: | ||
has_pending_repl = bool(self.manager.pending[ts][0]) | ||
except KeyError: | ||
has_pending_repl = False | ||
|
||
if not has_pending_repl: | ||
rec_ws = (yield "replicate", ts, None) | ||
if not rec_ws: | ||
# replication was rejected by the AMM (see _find_recipient) | ||
nno_rec += 1 | ||
|
||
if nno_rec: | ||
# All workers are paused or closing_gracefully. | ||
# Scheduler.retire_workers will read this flag and exit immediately. | ||
# TODO after we implement the automatic transition of workers from paused | ||
# to closing_gracefully after a timeout expires, we should revisit this | ||
# code to wait for paused workers and only exit immediately if all | ||
# workers are in closing_gracefully status. | ||
self.no_recipients = True | ||
logger.warning( | ||
f"Tried retiring worker {self.address}, but {nno_rec} tasks could not " | ||
"be moved as there are no suitable workers to receive them. " | ||
"The worker will not be retired." | ||
) | ||
self.manager.policies.remove(self) | ||
elif nrepl: | ||
logger.info( | ||
f"Retiring worker {self.address}; {nrepl} keys are being moved away.", | ||
) | ||
else: | ||
logger.info( | ||
f"Retiring worker {self.address}; no unique keys need to be moved away." | ||
) | ||
self.manager.policies.remove(self) | ||
|
||
def done(self) -> bool: | ||
"""Return True if it is safe to close the worker down; False otherwise""" | ||
ws = self.manager.scheduler.workers.get(self.address) | ||
if ws is None: | ||
return True | ||
return all(len(ts.who_has) > 1 for ts in ws.has_what) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to say this is excellent high-level documentation. I have a very good sense of the purpose of this and how it works without reading any code.