Skip to content

Commit

Permalink
retire_workers to use AMM
Browse files Browse the repository at this point in the history
black


doc tweaks


fixes


fix regression in unit tests; add logging


TEMP label failing tests


Revert "TEMP label failing tests"

This reverts commit c64f0ae.

more logging


refactor


Fix failing tests
  • Loading branch information
crusaderky committed Nov 18, 2021
1 parent 1721d62 commit 2142bb2
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 151 deletions.
197 changes: 146 additions & 51 deletions distributed/active_memory_manager.py
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
from collections import defaultdict
from collections.abc import Generator
from typing import TYPE_CHECKING
Expand All @@ -17,6 +18,8 @@
from .client import Client
from .scheduler import Scheduler, TaskState, WorkerState

logger = logging.getLogger(__name__)


class ActiveMemoryManagerExtension:
"""Scheduler extension that optimizes memory usage across the cluster.
Expand Down Expand Up @@ -49,6 +52,7 @@ def __init__(
# The following parameters are exposed so that one may create, run, and throw
# away on the fly a specialized manager, separate from the main one.
policies: set[ActiveMemoryManagerPolicy] | None = None,
*,
register: bool = True,
start: bool | None = None,
interval: float | None = None,
Expand Down Expand Up @@ -132,52 +136,9 @@ def run_once(self) -> None:
# populate self.pending
self._run_policies()

drop_by_worker: (
defaultdict[WorkerState, set[TaskState]]
) = defaultdict(set)
repl_by_worker: (
defaultdict[WorkerState, dict[TaskState, set[str]]]
) = defaultdict(dict)

for ts, (pending_repl, pending_drop) in self.pending.items():
if not ts.who_has:
continue
who_has = {ws_snd.address for ws_snd in ts.who_has - pending_drop}
assert who_has # Never drop the last replica
for ws_rec in pending_repl:
assert ws_rec not in ts.who_has
repl_by_worker[ws_rec][ts] = who_has
for ws in pending_drop:
assert ws in ts.who_has
drop_by_worker[ws].add(ts)

# Fire-and-forget enact recommendations from policies
stimulus_id = str(time())
for ws_rec, ts_to_who_has in repl_by_worker.items():
self.scheduler.stream_comms[ws_rec.address].send(
{
"op": "acquire-replicas",
"keys": [ts.key for ts in ts_to_who_has],
"stimulus_id": "acquire-replicas-" + stimulus_id,
"priorities": {ts.key: ts.priority for ts in ts_to_who_has},
"who_has": {ts.key: v for ts, v in ts_to_who_has.items()},
},
)

for ws, tss in drop_by_worker.items():
# The scheduler immediately forgets about the replica and suggests
# the worker to drop it. The worker may refuse, at which point it
# will send back an add-keys message to reinstate it.
for ts in tss:
self.scheduler.remove_replica(ts, ws)
self.scheduler.stream_comms[ws.address].send(
{
"op": "remove-replicas",
"keys": [ts.key for ts in tss],
"stimulus_id": "remove-replicas-" + stimulus_id,
}
)

if self.pending:
logger.debug("Enacting suggestions for %d keys", len(self.pending))
self._enact_suggestions()
finally:
del self.workers_memory
del self.pending
Expand Down Expand Up @@ -286,6 +247,54 @@ def _find_dropper(
key=lambda ws: (ws.status != Status.running, self.workers_memory[ws]),
)

def _enact_suggestions(self) -> None:
"""Iterate through self.pending, which was filled by self._run_policies(), and
push the suggestions to the workers through bulk comms. Return immediately.
"""
drop_by_worker: (defaultdict[WorkerState, set[TaskState]]) = defaultdict(set)
repl_by_worker: (
defaultdict[WorkerState, dict[TaskState, set[str]]]
) = defaultdict(dict)

for ts, (pending_repl, pending_drop) in self.pending.items():
if not ts.who_has:
continue
who_has = {ws_snd.address for ws_snd in ts.who_has - pending_drop}
assert who_has # Never drop the last replica
for ws_rec in pending_repl:
assert ws_rec not in ts.who_has
repl_by_worker[ws_rec][ts] = who_has
for ws in pending_drop:
assert ws in ts.who_has
drop_by_worker[ws].add(ts)

# Fire-and-forget enact recommendations from policies
stimulus_id = str(time())
for ws_rec, ts_to_who_has in repl_by_worker.items():
self.scheduler.stream_comms[ws_rec.address].send(
{
"op": "acquire-replicas",
"keys": [ts.key for ts in ts_to_who_has],
"stimulus_id": "acquire-replicas-" + stimulus_id,
"priorities": {ts.key: ts.priority for ts in ts_to_who_has},
"who_has": {ts.key: v for ts, v in ts_to_who_has.items()},
},
)

for ws, tss in drop_by_worker.items():
# The scheduler immediately forgets about the replica and suggests the
# worker to drop it. The worker may refuse, at which point it will send back
# an add-keys message to reinstate it.
for ts in tss:
self.scheduler.remove_replica(ts, ws)
self.scheduler.stream_comms[ws.address].send(
{
"op": "remove-replicas",
"keys": [ts.key for ts in tss],
"stimulus_id": "remove-replicas-" + stimulus_id,
}
)


class ActiveMemoryManagerPolicy:
"""Abstract parent class"""
Expand Down Expand Up @@ -374,6 +383,9 @@ class ReduceReplicas(ActiveMemoryManagerPolicy):
"""

def run(self):
nkeys = 0
ndrop = 0

for ts in self.manager.scheduler.replicated_tasks:
desired_replicas = 1 # TODO have a marker on TaskState

Expand All @@ -383,11 +395,94 @@ def run(self):
# worker, don't double count them.
nwaiters = len({waiter.processing_on or waiter for waiter in ts.waiters})

ndrop = len(ts.who_has) - max(desired_replicas, nwaiters)
ndrop_key = len(ts.who_has) - max(desired_replicas, nwaiters)
if ts in self.manager.pending:
pending_repl, pending_drop = self.manager.pending[ts]
ndrop += len(pending_repl) - len(pending_drop)
ndrop_key += len(pending_repl) - len(pending_drop)

if ndrop_key > 0:
nkeys += 1
ndrop += ndrop_key
for _ in range(ndrop_key):
yield "drop", ts, None

if ndrop:
logger.debug("Dropping %d superfluous replicas of %d tasks", ndrop, nkeys)


class RetireWorker(ActiveMemoryManagerPolicy):
"""Replicate somewhere else all unique keys on a worker, preparing for its shutdown.
Once the worker has been retired, this policy automatically removes itself from the
Active Memory Manager it's attached to.
**Retiring a worker with spilled keys**
On its very first iteration, this policy suggests other workers to fetch all unique
in-memory tasks. Frequently, this means that in the next few moments the worker to
be retired will be bombarded with ``Worker.get_data`` calls from the rest of the
cluster. This can be a problem if most of the managed memory of the worker has been
spilled out, as it could send the worker above the terminate threshold.
Two things are in place in order to avoid this:
# ndrop could be negative, which for range() is the same as 0.
for _ in range(ndrop):
yield "drop", ts, None
1. At every iteration, this policy drops all keys that have already been replicated
somewhere else. This makes room for further keys to be moved out of the spill
file in order to be replicated onto another worker.
2. Once a worker passes the ``pause`` threshold, ``Worker.get_data`` throttles the
number of outgoing connections to 1.
"""

address: str

def __init__(self, address: str):
self.address = address

def __repr__(self) -> str:
return f"RetireWorker({self.address}, done={self.done})"

def run(self):
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
self.manager.policies.remove(self)
return

n_repl = 0
n_no_rec = 0

for ts in ws.has_what:
if len(ts.who_has) == 1:
n_repl += 1
pending_repl, _ = self.manager.pending[ts]
if not pending_repl:
rec_ws = (yield "replicate", ts, None)
if not rec_ws:
# replication was rejected by the AMM (see _find_recipient)
n_no_rec += 1
else:
# This may be rejected by either the AMM (see _find_dropper) or by the
# Worker; e.g. a running task may depend on this key. If so we'll try
# again at the next iteration. Anyway, this is just to allow spilled
# keys to be moved back into memory and not mandatory for retirement.
yield "drop", ts, {ws}

if n_no_rec:
logger.warning(
f"Retiring worker {self.address}; {n_repl - n_no_rec} keys are being "
f"moved away while {n_no_rec} keys won't be moved for now as there are "
"no suitable workers to receive them; this typically happens when this "
"is the only worker on the cluster or all other workers are either "
"paused or are being shut down themselves. Trying again later..."
)
elif n_repl:
logger.info(
f"Retiring worker {self.address}; {n_repl} keys are being moved away.",
)

def done(self) -> bool:
"""Return True if it is safe to close the worker down, or False otherwise. True
doesn't necessarily mean that run() won't issue any more suggestions - it could
continue issuing ``drop`` suggestions afterwards.
"""
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
return True
return all(len(ts.who_has) > 1 for ts in ws.has_what)

0 comments on commit 2142bb2

Please sign in to comment.