Skip to content

Commit

Permalink
retire_workers to use AMM
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Oct 15, 2021
1 parent ce1d0f1 commit 1d4f45c
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 77 deletions.
56 changes: 56 additions & 0 deletions distributed/active_memory_manager.py
Expand Up @@ -344,3 +344,59 @@ def run(self):
# ndrop could be negative, which for range() is the same as 0.
for _ in range(ndrop):
yield "drop", ts, None


class RetireWorker(ActiveMemoryManagerPolicy):
"""Replicate somewhere else all unique keys on a worker, preparing for its shutdown.
Once the worker has been retired, this policy automatically removes itself from the
Active Memory Manager it's attached to.
**Retiring a worker with spilled keys**
On its very first iteration, this policy suggests other workers to fetch all unique
in-memory tasks. Frequently, this means that in the next few moments the worker to
be retired will be bombarded with ``Worker.get_data`` calls from the rest of the
cluster. This can be a problem if most of the managed memory of the worker has been
spilled out, as it could send the worker above the terminate threshold.
Two things are in place in order to avoid this:
1. At every iteration, this policy drops all keys that have already been replicated
somewhere else. This makes room for further keys to be moved out of the spill
file in order to be replicated onto another worker.
2. Once a worker passes the ``pause`` threshold, ``Worker.get_data`` throttles the
number of outgoing connections to 1.
"""

address: str

def __init__(self, address: str):
self.address = address

def __repr__(self) -> str:
return f"RetireWorker({self.address}, done={self.done})"

def run(self):
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
self.manager.policies.remove(self)
return

for ts in ws.has_what:
if len(ts.who_has) == 1:
yield "replicate", ts, None
else:
# This may be rejected by either the AMM (see _find_dropper) or by the
# Worker; if so we'll try again at the next iteration
yield "drop", ts, {ws}

def done(self) -> bool:
"""Return True if it is safe to close the worker down, or False otherwise. True
doesn't necessarily mean that run() won't issue any more suggestions - it could
continue issuing ``drop`` suggestions afterwards.
"""
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
return True
if ws.processing:
return False
return all(len(ts.who_has) > 1 for ts in ws.has_what)
167 changes: 91 additions & 76 deletions distributed/scheduler.py
Expand Up @@ -51,7 +51,7 @@

from . import preloading, profile
from . import versions as version_module
from .active_memory_manager import ActiveMemoryManagerExtension
from .active_memory_manager import ActiveMemoryManagerExtension, RetireWorker
from .batched import BatchedSend
from .comm import (
get_address_host,
Expand Down Expand Up @@ -6607,11 +6607,11 @@ def _key(group):
async def retire_workers(
self,
comm=None,
workers=None,
remove=True,
close_workers=False,
names=None,
lock=True,
*,
workers: "list[str] | None" = None,
names: "list[str] | None" = None,
close_workers: bool = False,
remove: bool = True,
**kwargs,
) -> dict:
"""Gracefully retire workers from cluster
Expand All @@ -6620,16 +6620,18 @@ async def retire_workers(
----------
workers: list (optional)
List of worker addresses to retire.
If not provided we call ``workers_to_close`` which finds a good set
names: list (optional)
List of worker names to retire.
remove: bool (defaults to True)
Whether or not to remove the worker metadata immediately or else
wait for the worker to contact us
Mutually exclusive with ``workers``.
If neither ``workers`` nor ``names`` are provided, we call
``workers_to_close`` which finds a good set.
close_workers: bool (defaults to False)
Whether or not to actually close the worker explicitly from here.
Otherwise we expect some external job scheduler to finish off the
worker.
remove: bool (defaults to True)
Whether or not to remove the worker metadata immediately or else
wait for the worker to contact us
**kwargs: dict
Extra options to pass to workers_to_close to determine which
workers we should drop
Expand All @@ -6647,78 +6649,91 @@ async def retire_workers(
ws: WorkerState
ts: TaskState
with log_errors():
async with self._lock if lock else empty_context:
if names is not None:
if workers is not None:
raise TypeError("names and workers are mutually exclusive")
if names:
logger.info("Retire worker names %s", names)
names = set(map(str, names))
workers = {
ws._address
for ws in parent._workers_dv.values()
if str(ws._name) in names
}
elif workers is None:
while True:
try:
workers = self.workers_to_close(**kwargs)
if not workers:
return {}
return await self.retire_workers(
workers=workers,
remove=remove,
close_workers=close_workers,
lock=False,
)
except KeyError: # keys left during replicate
pass

workers = {
parent._workers_dv[w] for w in workers if w in parent._workers_dv
if names is not None:
if workers is not None:
raise TypeError("names and workers are mutually exclusive")
if names:
logger.info("Retire worker names %s", names)
names_set = {str(name) for name in names}
wss = {
ws
for ws in parent._workers_dv.values()
if str(ws._name) in names_set
}
if not workers:
return {}
logger.info("Retire workers %s", workers)

# Keys orphaned by retiring those workers
keys = {k for w in workers for k in w.has_what}
keys = {ts._key for ts in keys if ts._who_has.issubset(workers)}

if keys:
other_workers = set(parent._workers_dv.values()) - workers
if not other_workers:
return {}
logger.info("Moving %d keys to other workers", len(keys))
await self.replicate(
keys=keys,
workers=[ws._address for ws in other_workers],
n=1,
delete=False,
lock=False,
)
elif workers is not None:
wss = {
parent._workers_dv[address]
for address in workers
if address in parent._workers_dv
}
else:
wss = {
parent._workers_dv[address]
for address in self.workers_to_close(**kwargs)
}
if not wss:
return {}

worker_keys = {ws._address: ws.identity() for ws in workers}
if close_workers:
await asyncio.gather(
*[self.close_worker(worker=w, safe=True) for w in worker_keys]
)
if remove:
workers_info = {ws._address: ws.identity() for ws in wss}

stop_amm = False
amm: ActiveMemoryManagerExtension = self.extensions["amm"]
if not amm.started:
amm = ActiveMemoryManagerExtension(
self, register=False, start=True, interval=2.0
)
stop_amm = True

# This lock makes retire_workers, rebalance, and replicate mutually
# exclusive and will no longer be necessary once rebalance and replicate are
# migrated to the Active Memory Manager.
async with self._lock:
try:
await asyncio.gather(
*[self.remove_worker(address=w, safe=True) for w in worker_keys]
*(
self._retire_worker(ws, amm, close_workers, remove)
for ws in wss
)
)
finally:
if stop_amm:
amm.stop()

self.log_event(
"all",
{
"action": "retire-workers",
"workers": worker_keys,
"moved-keys": len(keys),
},
)
self.log_event(list(worker_keys), {"action": "retired"})
self.log_event("all", {"action": "retire-workers", "workers": workers_info})
self.log_event(list(workers_info), {"action": "retired"})

return workers_info

async def _retire_worker(
self,
ws: WorkerState,
amm: ActiveMemoryManagerExtension,
close_workers: bool,
remove: bool,
) -> None:
logger.info("Retiring worker %s", ws)

policy = RetireWorker(ws._address)
amm.add_policy(policy)

ws.status = Status.closing_gracefully
self.running.discard(ws)
self.stream_comms[ws.address].send(
{"op": "worker-status-change", "status": ws.status.name}
)

while not policy.done():
# Sleep 0.1s when there are 10 in-memory tasks or less
# Sleep 3s when there are 300 or more
poll_interval = max(0.1, min(3.0, len(ws.has_what) / 100))
await asyncio.sleep(poll_interval)

if close_workers and ws._address in self._workers_dv:
await self.close_worker(worker=ws._address, safe=True)
if remove:
await self.remove_worker(address=ws._address, safe=True)

return worker_keys
logger.info("Retired worker %s", ws)

def add_keys(self, comm=None, worker=None, keys=(), stimulus_id=None):
"""
Expand Down
25 changes: 24 additions & 1 deletion distributed/worker.py
Expand Up @@ -767,6 +767,7 @@ def __init__(
"free-keys": self.handle_free_keys,
"remove-replicas": self.handle_remove_replicas,
"steal-request": self.handle_steal_request,
"worker-status-change": self.handle_worker_status_change,
}

super().__init__(
Expand Down Expand Up @@ -1434,6 +1435,9 @@ async def close_gracefully(self, restart=None):
restart = self.lifetime_restart

logger.info("Closing worker gracefully: %s", self.address)
# Scheduler.retire_workers will set the status to closing_gracefully and push it
# back to this worker. We're additionally setting it here immediately to prevent
# race conditions during the round trip.
self.status = Status.closing_gracefully
await self.scheduler.retire_workers(workers=[self.address], remove=False)
await self.close(safe=True, nanny=not restart)
Expand Down Expand Up @@ -2745,6 +2749,24 @@ def handle_steal_request(self, key, stimulus_id):
# `transition_constrained_executing`
self.transition(ts, "forgotten", stimulus_id=stimulus_id)

def handle_worker_status_change(self, status: str) -> None:
new_status = Status.lookup[status] # type: ignore

if new_status == Status.closing_gracefully and self.status not in (
Status.running,
Status.paused,
):
logger.error(
"Invalid Worker.status transition: %s -> %s", self.status, new_status
)
# Reiterate the current status to the scheduler to restore sync
# (see status.setter)
self.status = self.status
return

# Update status and send confirmation to the Scheduler (see status.setter).
self.status = new_status

def release_key(
self,
key: Hashable,
Expand Down Expand Up @@ -2956,7 +2978,8 @@ async def _maybe_deserialize_task(self, ts, *, stimulus_id):
raise

def ensure_computing(self):
if self.status == Status.paused:
if self.status in (Status.paused, Status.closing_gracefully):
# Pending tasks shall be stolen
return
try:
stimulus_id = f"ensure-computing-{time()}"
Expand Down

0 comments on commit 1d4f45c

Please sign in to comment.