Skip to content

Commit

Permalink
retire_workers to use AMM
Browse files Browse the repository at this point in the history
polish


Split out #5530


docs


Unit tests


f


fix test_lifetime


fix tests


test unresponsive worker


Replace xfail with avoid_ci


f
  • Loading branch information
crusaderky committed Dec 3, 2021
1 parent e2e2dda commit d3fa246
Show file tree
Hide file tree
Showing 7 changed files with 562 additions and 108 deletions.
137 changes: 136 additions & 1 deletion distributed/active_memory_manager.py
Expand Up @@ -122,7 +122,7 @@ def add_policy(self, policy: ActiveMemoryManagerPolicy) -> None:

def run_once(self) -> None:
"""Run all policies once and asynchronously (fire and forget) enact their
recommendations to replicate/drop keys
recommendations to replicate/drop tasks
"""
with log_errors():
# This should never fail since this is a synchronous method
Expand Down Expand Up @@ -383,6 +383,7 @@ class ReduceReplicas(ActiveMemoryManagerPolicy):
"""

def run(self):
""""""
nkeys = 0
ndrop = 0

Expand Down Expand Up @@ -412,3 +413,137 @@ def run(self):
ndrop,
nkeys,
)


class RetireWorker(ActiveMemoryManagerPolicy):
"""Replicate somewhere else all unique in-memory tasks on a worker, preparing for
its shutdown.
At any given time, the AMM may have registered multiple instances of this policy,
one for each worker currently being retired - meaning that most of the time no
instances will be registered at all. For this reason, this policy doesn't figure in
the dask config (:file:`distributed.yaml`). Instances are added by
:meth:`distributed.Scheduler.retire_workers` and automatically remove themselves
once the worker has been retired. If the AMM is disabled in the dask config,
:meth:`~distributed.Scheduler.retire_workers` will start a temporary ad-hoc one.
**Failure condition**
There may not be any suitable workers to receive the tasks from the retiring worker.
This happens in two use cases:
1. This is the only worker in the cluster, or
2. All workers are either paused or being retired at the same time
In either case, this policy will fail to move out all keys and
Scheduler.retire_workers will abort the retirement. The flag ``no_recipients`` will
be raised.
There is a third use case, where a task fails to be replicated away for whatever
reason; in this case we'll just wait for the next AMM iteration and try again
(possibly with a different receiving worker, e.g. if the receiving worker was
hung but not yet declared dead).
**Retiring a worker with spilled tasks**
On its very first iteration, this policy suggests that other workers should fetch
all unique in-memory tasks. Frequently, this means that in the next few moments the
worker to be retired will be bombarded by :meth:`distributed.worker.Worker.get_data`
calls from the rest of the cluster. This can be a problem if most of the managed
memory of the worker has been spilled out, as it could send the worker above the
terminate threshold. Two measures are in place in order to prevent this:
- At every iteration, this policy drops all tasks that have already been replicated
somewhere else. This makes room for further tasks to be moved out of the spill
file in order to be replicated onto another worker.
- Once a worker passes the ``pause`` threshold,
:meth:`~distributed.worker.Worker.get_data` throttles the number of outgoing
connections to 1.
Parameters
==========
address: str
URI of the worker to be retired
"""

address: str
no_recipients: bool

def __init__(self, address: str):
self.address = address
self.no_recipients = False

def __repr__(self) -> str:
return f"RetireWorker({self.address}, done={self.done})"

def run(self):
""""""
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
self.manager.policies.remove(self)
return

nrepl = 0
nno_rec = 0

for ts in ws.has_what:
pending_repl, pending_drop = self.manager.pending[ts]

if len(ts.who_has) > 1:
# There are already replicas of this key on other workers.
# Use cases:
# 1. The suggestion is accepted by the AMM and by the Worker.
# The replica on this worker is dropped.
# 2. The suggestion is accepted by the AMM, but rejected by the Worker.
# We'll try again at the next AMM iteration.
# 3. The suggestion is rejected by the AMM, because another policy
# (e.g. ReduceReplicas) already suggested the same for this worker
# 4. The suggestion is rejected by the AMM, because all replicas of the
# key are on workers being retired and the other RetireWorker
# instances already asked the same suggestion. We need to deal with
# this case and create a replica elsewhere.
if ws in pending_drop:
continue # Use case 3
drop_ws = (yield "drop", ts, {ws})
if drop_ws:
continue # Use case 1 or 2

# Either the worker holds the only replica or all replicas are being held
# by workers that are being retired
nrepl += 1
if not pending_repl:
rec_ws = (yield "replicate", ts, None)
if not rec_ws:
# replication was rejected by the AMM (see _find_recipient)
nno_rec += 1

if nno_rec:
# All workers are paused or closing_gracefully.
# Scheduler.retire_workers will read this flag and exit immediately.
# TODO after we implement the automatic transition of workers from paused
# to closing_gracefully after a timeout expires, we should revisit this
# code to wait for paused workers and only exit immediately if all
# workers are in closing_gracefully status.
self.no_recipients = True
logger.warning(
f"Tried retiring worker {self.address}, but {nno_rec} tasks could not "
"be moved as there are no suitable workers to receive them. "
"The worker will not be retired."
)
self.manager.policies.remove(self)
elif nrepl:
logger.info(
f"Retiring worker {self.address}; {nrepl} keys are being moved away.",
)
else:
logger.info(
f"Retiring worker {self.address}; no unique keys need to be moved away."
)
self.manager.policies.remove(self)

def done(self) -> bool:
"""Return True if it is safe to close the worker down; False otherwise"""
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
return True
return all(len(ts.who_has) > 1 for ts in ws.has_what)
195 changes: 120 additions & 75 deletions distributed/scheduler.py
Expand Up @@ -53,7 +53,7 @@

from . import preloading, profile
from . import versions as version_module
from .active_memory_manager import ActiveMemoryManagerExtension
from .active_memory_manager import ActiveMemoryManagerExtension, RetireWorker
from .batched import BatchedSend
from .comm import (
Comm,
Expand Down Expand Up @@ -6740,11 +6740,11 @@ def _key(group):
async def retire_workers(
self,
comm=None,
workers=None,
remove=True,
close_workers=False,
names=None,
lock=True,
*,
workers: "list[str] | None" = None,
names: "list[str] | None" = None,
close_workers: bool = False,
remove: bool = True,
**kwargs,
) -> dict:
"""Gracefully retire workers from cluster
Expand All @@ -6753,16 +6753,18 @@ async def retire_workers(
----------
workers: list (optional)
List of worker addresses to retire.
If not provided we call ``workers_to_close`` which finds a good set
names: list (optional)
List of worker names to retire.
remove: bool (defaults to True)
Whether or not to remove the worker metadata immediately or else
wait for the worker to contact us
Mutually exclusive with ``workers``.
If neither ``workers`` nor ``names`` are provided, we call
``workers_to_close`` which finds a good set.
close_workers: bool (defaults to False)
Whether or not to actually close the worker explicitly from here.
Otherwise we expect some external job scheduler to finish off the
worker.
remove: bool (defaults to True)
Whether or not to remove the worker metadata immediately or else
wait for the worker to contact us
**kwargs: dict
Extra options to pass to workers_to_close to determine which
workers we should drop
Expand All @@ -6780,78 +6782,121 @@ async def retire_workers(
ws: WorkerState
ts: TaskState
with log_errors():
async with self._lock if lock else empty_context:
if names is not None:
if workers is not None:
raise TypeError("names and workers are mutually exclusive")
if names:
logger.info("Retire worker names %s", names)
names = set(map(str, names))
workers = {
ws._address
for ws in parent._workers_dv.values()
if str(ws._name) in names
}
elif workers is None:
while True:
try:
workers = self.workers_to_close(**kwargs)
if not workers:
return {}
return await self.retire_workers(
workers=workers,
remove=remove,
if names is not None:
if workers is not None:
raise TypeError("names and workers are mutually exclusive")
if names:
logger.info("Retire worker names %s", names)
names_set = {str(name) for name in names}
wss = {
ws
for ws in parent._workers_dv.values()
if str(ws._name) in names_set
}
elif workers is not None:
wss = {
parent._workers_dv[address]
for address in workers
if address in parent._workers_dv
}
else:
wss = {
parent._workers_dv[address]
for address in self.workers_to_close(**kwargs)
}
if not wss:
return {}

stop_amm = False
amm: ActiveMemoryManagerExtension = self.extensions["amm"]
if not amm.running:
amm = ActiveMemoryManagerExtension(
self, policies=set(), register=False, start=True, interval=2.0
)
stop_amm = True

# This lock makes retire_workers, rebalance, and replicate mutually
# exclusive and will no longer be necessary once rebalance and replicate are
# migrated to the Active Memory Manager.
async with self._lock:
try:
coros = []
for ws in wss:
logger.info("Retiring worker %s", ws._address)

policy = RetireWorker(ws._address)
amm.add_policy(policy)

# Change Worker.status to closing_gracefully. Immediately set
# the same on the scheduler to prevent race conditions.
prev_status = ws.status
ws.status = Status.closing_gracefully
self.running.discard(ws)
self.stream_comms[ws.address].send(
{"op": "worker-status-change", "status": ws.status.name}
)

coros.append(
self._retire_worker(
ws,
policy,
prev_status=prev_status,
close_workers=close_workers,
lock=False,
remove=remove,
)
except KeyError: # keys left during replicate
pass
)

workers = {
parent._workers_dv[w] for w in workers if w in parent._workers_dv
}
if not workers:
return {}
logger.info("Retire workers %s", workers)

# Keys orphaned by retiring those workers
keys = {k for w in workers for k in w.has_what}
keys = {ts._key for ts in keys if ts._who_has.issubset(workers)}

if keys:
other_workers = set(parent._workers_dv.values()) - workers
if not other_workers:
return {}
logger.info("Moving %d keys to other workers", len(keys))
await self.replicate(
keys=keys,
workers=[ws._address for ws in other_workers],
n=1,
delete=False,
lock=False,
)
# Give the AMM a kick, in addition to its periodic running. This is
# to avoid unnecessarily waiting for a potentially arbitrarily long
# time (depending on interval settings)
amm.run_once()

worker_keys = {ws._address: ws.identity() for ws in workers}
if close_workers:
await asyncio.gather(
*[self.close_worker(worker=w, safe=True) for w in worker_keys]
)
if remove:
await asyncio.gather(
*[self.remove_worker(address=w, safe=True) for w in worker_keys]
)
workers_info = dict(await asyncio.gather(*coros))
workers_info.pop(None, None)
finally:
if stop_amm:
amm.stop()

self.log_event(
"all",
{
"action": "retire-workers",
"workers": worker_keys,
"moved-keys": len(keys),
},
self.log_event("all", {"action": "retire-workers", "workers": workers_info})
self.log_event(list(workers_info), {"action": "retired"})

return workers_info

async def _retire_worker(
self,
ws: WorkerState,
policy: RetireWorker,
prev_status: Status,
close_workers: bool,
remove: bool,
) -> tuple: # tuple[str | None, dict]
parent: SchedulerState = cast(SchedulerState, self)

while not policy.done():
if policy.no_recipients:
# Abort retirement. This time we don't need to worry about race
# conditions and we can wait for a round-trip.
self.stream_comms[ws.address].send(
{"op": "worker-status-change", "status": prev_status.name}
)
self.log_event(list(worker_keys), {"action": "retired"})
return None, {}

# Sleep 0.01s when there are 4 tasks or less
# Sleep 0.5s when there are 200 or more
poll_interval = max(0.01, min(0.5, len(ws.has_what) / 400))
await asyncio.sleep(poll_interval)

logger.debug(
"All unique keys on worker %s have been replicated elsewhere", ws._address
)

if close_workers and ws._address in parent._workers_dv:
await self.close_worker(worker=ws._address, safe=True)
if remove:
await self.remove_worker(address=ws._address, safe=True)

return worker_keys
logger.info("Retired worker %s", ws._address)
return ws._address, ws.identity()

def add_keys(self, comm=None, worker=None, keys=(), stimulus_id=None):
"""
Expand Down

0 comments on commit d3fa246

Please sign in to comment.