Skip to content

Commit

Permalink
retire_workers to use AMM
Browse files Browse the repository at this point in the history
black


doc tweaks
  • Loading branch information
crusaderky committed Oct 26, 2021
1 parent 7193bb9 commit e1d0e7f
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 85 deletions.
56 changes: 56 additions & 0 deletions distributed/active_memory_manager.py
Expand Up @@ -356,3 +356,59 @@ def run(self):
# ndrop could be negative, which for range() is the same as 0.
for _ in range(ndrop):
yield "drop", ts, None


class RetireWorker(ActiveMemoryManagerPolicy):
"""Replicate somewhere else all unique keys on a worker, preparing for its shutdown.
Once the worker has been retired, this policy automatically removes itself from the
Active Memory Manager it's attached to.
**Retiring a worker with spilled keys**
On its very first iteration, this policy suggests other workers to fetch all unique
in-memory tasks. Frequently, this means that in the next few moments the worker to
be retired will be bombarded with ``Worker.get_data`` calls from the rest of the
cluster. This can be a problem if most of the managed memory of the worker has been
spilled out, as it could send the worker above the terminate threshold.
Two things are in place in order to avoid this:
1. At every iteration, this policy drops all keys that have already been replicated
somewhere else. This makes room for further keys to be moved out of the spill
file in order to be replicated onto another worker.
2. Once a worker passes the ``pause`` threshold, ``Worker.get_data`` throttles the
number of outgoing connections to 1.
"""

address: str

def __init__(self, address: str):
self.address = address

def __repr__(self) -> str:
return f"RetireWorker({self.address}, done={self.done})"

def run(self):
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
self.manager.policies.remove(self)
return

for ts in ws.has_what:
if len(ts.who_has) == 1:
yield "replicate", ts, None
else:
# This may be rejected by either the AMM (see _find_dropper) or by the
# Worker; if so we'll try again at the next iteration
yield "drop", ts, {ws}

def done(self) -> bool:
"""Return True if it is safe to close the worker down, or False otherwise. True
doesn't necessarily mean that run() won't issue any more suggestions - it could
continue issuing ``drop`` suggestions afterwards.
"""
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
return True
if ws.processing:
return False
return all(len(ts.who_has) > 1 for ts in ws.has_what)
167 changes: 91 additions & 76 deletions distributed/scheduler.py
Expand Up @@ -51,7 +51,7 @@

from . import preloading, profile
from . import versions as version_module
from .active_memory_manager import ActiveMemoryManagerExtension
from .active_memory_manager import ActiveMemoryManagerExtension, RetireWorker
from .batched import BatchedSend
from .comm import (
get_address_host,
Expand Down Expand Up @@ -6673,11 +6673,11 @@ def _key(group):
async def retire_workers(
self,
comm=None,
workers=None,
remove=True,
close_workers=False,
names=None,
lock=True,
*,
workers: "list[str] | None" = None,
names: "list[str] | None" = None,
close_workers: bool = False,
remove: bool = True,
**kwargs,
) -> dict:
"""Gracefully retire workers from cluster
Expand All @@ -6686,16 +6686,18 @@ async def retire_workers(
----------
workers: list (optional)
List of worker addresses to retire.
If not provided we call ``workers_to_close`` which finds a good set
names: list (optional)
List of worker names to retire.
remove: bool (defaults to True)
Whether or not to remove the worker metadata immediately or else
wait for the worker to contact us
Mutually exclusive with ``workers``.
If neither ``workers`` nor ``names`` are provided, we call
``workers_to_close`` which finds a good set.
close_workers: bool (defaults to False)
Whether or not to actually close the worker explicitly from here.
Otherwise we expect some external job scheduler to finish off the
worker.
remove: bool (defaults to True)
Whether or not to remove the worker metadata immediately or else
wait for the worker to contact us
**kwargs: dict
Extra options to pass to workers_to_close to determine which
workers we should drop
Expand All @@ -6713,78 +6715,91 @@ async def retire_workers(
ws: WorkerState
ts: TaskState
with log_errors():
async with self._lock if lock else empty_context:
if names is not None:
if workers is not None:
raise TypeError("names and workers are mutually exclusive")
if names:
logger.info("Retire worker names %s", names)
names = set(map(str, names))
workers = {
ws._address
for ws in parent._workers_dv.values()
if str(ws._name) in names
}
elif workers is None:
while True:
try:
workers = self.workers_to_close(**kwargs)
if not workers:
return {}
return await self.retire_workers(
workers=workers,
remove=remove,
close_workers=close_workers,
lock=False,
)
except KeyError: # keys left during replicate
pass

workers = {
parent._workers_dv[w] for w in workers if w in parent._workers_dv
if names is not None:
if workers is not None:
raise TypeError("names and workers are mutually exclusive")
if names:
logger.info("Retire worker names %s", names)
names_set = {str(name) for name in names}
wss = {
ws
for ws in parent._workers_dv.values()
if str(ws._name) in names_set
}
if not workers:
return {}
logger.info("Retire workers %s", workers)

# Keys orphaned by retiring those workers
keys = {k for w in workers for k in w.has_what}
keys = {ts._key for ts in keys if ts._who_has.issubset(workers)}

if keys:
other_workers = set(parent._workers_dv.values()) - workers
if not other_workers:
return {}
logger.info("Moving %d keys to other workers", len(keys))
await self.replicate(
keys=keys,
workers=[ws._address for ws in other_workers],
n=1,
delete=False,
lock=False,
)
elif workers is not None:
wss = {
parent._workers_dv[address]
for address in workers
if address in parent._workers_dv
}
else:
wss = {
parent._workers_dv[address]
for address in self.workers_to_close(**kwargs)
}
if not wss:
return {}

worker_keys = {ws._address: ws.identity() for ws in workers}
if close_workers:
await asyncio.gather(
*[self.close_worker(worker=w, safe=True) for w in worker_keys]
)
if remove:
workers_info = {ws._address: ws.identity() for ws in wss}

stop_amm = False
amm: ActiveMemoryManagerExtension = self.extensions["amm"]
if not amm.started:
amm = ActiveMemoryManagerExtension(
self, register=False, start=True, interval=2.0
)
stop_amm = True

# This lock makes retire_workers, rebalance, and replicate mutually
# exclusive and will no longer be necessary once rebalance and replicate are
# migrated to the Active Memory Manager.
async with self._lock:
try:
await asyncio.gather(
*[self.remove_worker(address=w, safe=True) for w in worker_keys]
*(
self._retire_worker(ws, amm, close_workers, remove)
for ws in wss
)
)
finally:
if stop_amm:
amm.stop()

self.log_event(
"all",
{
"action": "retire-workers",
"workers": worker_keys,
"moved-keys": len(keys),
},
)
self.log_event(list(worker_keys), {"action": "retired"})
self.log_event("all", {"action": "retire-workers", "workers": workers_info})
self.log_event(list(workers_info), {"action": "retired"})

return workers_info

async def _retire_worker(
self,
ws: WorkerState,
amm: ActiveMemoryManagerExtension,
close_workers: bool,
remove: bool,
) -> None:
logger.info("Retiring worker %s", ws)

policy = RetireWorker(ws._address)
amm.add_policy(policy)

ws.status = Status.closing_gracefully
self.running.discard(ws)
self.stream_comms[ws.address].send(
{"op": "worker-status-change", "status": ws.status.name}
)

while not policy.done():
# Sleep 0.1s when there are 10 in-memory tasks or less
# Sleep 3s when there are 300 or more
poll_interval = max(0.1, min(3.0, len(ws.has_what) / 100))
await asyncio.sleep(poll_interval)

if close_workers and ws._address in self._workers_dv:
await self.close_worker(worker=ws._address, safe=True)
if remove:
await self.remove_worker(address=ws._address, safe=True)

return worker_keys
logger.info("Retired worker %s", ws)

def add_keys(self, comm=None, worker=None, keys=(), stimulus_id=None):
"""
Expand Down
24 changes: 22 additions & 2 deletions distributed/worker.py
Expand Up @@ -888,6 +888,7 @@ def __init__(
"free-keys": self.handle_free_keys,
"remove-replicas": self.handle_remove_replicas,
"steal-request": self.handle_steal_request,
"worker-status-change": self.handle_worker_status_change,
}

super().__init__(
Expand Down Expand Up @@ -1555,7 +1556,9 @@ async def close_gracefully(self, restart=None):
restart = self.lifetime_restart

logger.info("Closing worker gracefully: %s", self.address)
self.status = Status.closing_gracefully
# Wait for all tasks to leave the worker and don't accept any new ones.
# Scheduler.retire_workers will set the status to closing_gracefully and push it
# back to this worker.
await self.scheduler.retire_workers(workers=[self.address], remove=False)
await self.close(safe=True, nanny=not restart)

Expand Down Expand Up @@ -2908,6 +2911,22 @@ def handle_steal_request(self, key, stimulus_id):
# `transition_constrained_executing`
self.transition(ts, "forgotten", stimulus_id=stimulus_id)

def handle_worker_status_change(self, status: str) -> None:
new_status = Status.lookup[status] # type: ignore

if new_status == Status.closing_gracefully and self._status not in (
Status.running,
Status.paused,
):
logger.error(
"Invalid Worker.status transition: %s -> %s", self._status, new_status
)
# Reiterate the current status to the scheduler to restore sync
self._send_worker_status_change()
else:
# Update status and send confirmation to the Scheduler (see status.setter)
self.status = new_status

def release_key(
self,
key: str,
Expand Down Expand Up @@ -3119,7 +3138,8 @@ async def _maybe_deserialize_task(self, ts, *, stimulus_id):
raise

def ensure_computing(self):
if self.status == Status.paused:
if self.status in (Status.paused, Status.closing_gracefully):
# Pending tasks shall be stolen
return
try:
stimulus_id = f"ensure-computing-{time()}"
Expand Down
19 changes: 12 additions & 7 deletions docs/source/active_memory_manager.rst
Expand Up @@ -7,8 +7,9 @@ usage of workers across the Dask cluster. It is disabled by default.
Memory imbalance and duplication
--------------------------------
Whenever a Dask task returns data, it is stored on the worker that executed the task for
as long as it's a dependency of other tasks, is referenced by a ``Client`` through a
``Future``, or is part of a :doc:`published dataset <publish>`.
as long as it's a dependency of other tasks, is referenced by a
:class:`~distributed.Client` through a :class:`~distributed.Future`, or is part of a
:doc:`published dataset <publish>`.

Dask assigns tasks to workers following criteria of CPU occupancy, :doc:`resources`, and
locality. In the trivial use case of tasks that are not connected to each other, take
Expand Down Expand Up @@ -38,7 +39,7 @@ The AMM can be enabled through the :doc:`Dask configuration file <configuration>
The above is the recommended setup and will run all enabled *AMM policies* (see below)
every two seconds. Alternatively, you can manually start/stop the AMM from the
``Client`` or trigger a one-off iteration:
:class:`~distributed.`Client` or trigger a one-off iteration:

.. code-block:: python
Expand All @@ -58,6 +59,10 @@ long as they don't harm data integrity. These suggestions can be of two types:
- Delete one or more replicas of an in-memory task. The AMM will never delete the last
replica of a task, even if a policy asks to.

There are no "move" operations. A move is performed in two passes: first a policy
creates a copy; in the next AMM iteration, the same or another policy deletes the
original (if the copy succeeded).

Unless a policy puts constraints on which workers should be impacted, the AMM will
automatically create replicas on workers with the lowest memory usage first and delete
them from workers with the highest memory usage first.
Expand Down Expand Up @@ -90,7 +95,7 @@ Built-in policies
ReduceReplicas
++++++++++++++
class
``distributed.active_memory_manager.ReduceReplicas``
:class:`distributed.active_memory_manager.ReduceReplicas`
parameters
None

Expand Down Expand Up @@ -128,7 +133,7 @@ define two methods:
Create one replica of the target task on the worker with the lowest memory among
the listed candidates.
``yield "drop", <TaskState>, None``
Delete one replica of the target task one the worker with the highest memory
Delete one replica of the target task on the worker with the highest memory
usage across the whole cluster.
``yield "drop", <TaskState>, {<WorkerState>, <WorkerState>, ...}``
Delete one replica of the target task on the worker with the highest memory
Expand Down Expand Up @@ -186,8 +191,8 @@ Example
The following custom policy ensures that keys "foo" and "bar" are replicated on all
workers at all times. New workers will receive a replica soon after connecting to the
scheduler. The policy will do nothing if the target keys are not in memory somewhere or
if all workers already hold a replica.
Note that this example is incompatible with the ``ReduceReplicas`` built-in policy.
if all workers already hold a replica. Note that this example is incompatible with the
:class:`~distributed.active_memory_manager.ReduceReplicas built-in policy.
In mymodule.py (it must be accessible by the scheduler):
Expand Down

0 comments on commit e1d0e7f

Please sign in to comment.