-
-
Notifications
You must be signed in to change notification settings - Fork 710
/
active_memory_manager.py
549 lines (452 loc) · 21 KB
/
active_memory_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
from __future__ import annotations
import logging
from collections import defaultdict
from collections.abc import Generator
from typing import TYPE_CHECKING
from tornado.ioloop import PeriodicCallback
import dask
from dask.utils import parse_timedelta
from .core import Status
from .metrics import time
from .utils import import_term, log_errors
if TYPE_CHECKING: # pragma: nocover
from .client import Client
from .scheduler import Scheduler, TaskState, WorkerState
logger = logging.getLogger(__name__)
class ActiveMemoryManagerExtension:
"""Scheduler extension that optimizes memory usage across the cluster.
It can be either triggered by hand or automatically every few seconds; at every
iteration it performs one or both of the following:
- create new replicas of in-memory tasks
- destroy replicas of in-memory tasks; this never destroys the last available copy.
There are no 'move' operations. A move is performed in two passes: first you create
a copy and, in the next iteration, you delete the original (if the copy succeeded).
This extension is configured by the dask config section
``distributed.scheduler.active-memory-manager``.
"""
scheduler: Scheduler
policies: set[ActiveMemoryManagerPolicy]
interval: float
# These attributes only exist within the scope of self.run()
# Current memory (in bytes) allocated on each worker, plus/minus pending actions
workers_memory: dict[WorkerState, int]
# Pending replications and deletions for each task
pending: defaultdict[TaskState, tuple[set[WorkerState], set[WorkerState]]]
def __init__(
self,
scheduler: Scheduler,
# The following parameters are exposed so that one may create, run, and throw
# away on the fly a specialized manager, separate from the main one.
policies: set[ActiveMemoryManagerPolicy] | None = None,
*,
register: bool = True,
start: bool | None = None,
interval: float | None = None,
):
self.scheduler = scheduler
self.policies = set()
if policies is None:
# Initialize policies from config
policies = set()
for kwargs in dask.config.get(
"distributed.scheduler.active-memory-manager.policies"
):
kwargs = kwargs.copy()
cls = import_term(kwargs.pop("class"))
policies.add(cls(**kwargs))
for policy in policies:
self.add_policy(policy)
if register:
scheduler.extensions["amm"] = self
scheduler.handlers["amm_handler"] = self.amm_handler
if interval is None:
interval = parse_timedelta(
dask.config.get("distributed.scheduler.active-memory-manager.interval")
)
self.interval = interval
if start is None:
start = dask.config.get("distributed.scheduler.active-memory-manager.start")
if start:
self.start()
def amm_handler(self, comm, method: str):
"""Scheduler handler, invoked from the Client by
:class:`~distributed.active_memory_manager.AMMClientProxy`
"""
assert method in {"start", "stop", "run_once", "running"}
out = getattr(self, method)
return out() if callable(out) else out
def start(self) -> None:
"""Start executing every ``self.interval`` seconds until scheduler shutdown"""
if self.running:
return
pc = PeriodicCallback(self.run_once, self.interval * 1000.0)
self.scheduler.periodic_callbacks[f"amm-{id(self)}"] = pc
pc.start()
def stop(self) -> None:
"""Stop periodic execution"""
pc = self.scheduler.periodic_callbacks.pop(f"amm-{id(self)}", None)
if pc:
pc.stop()
@property
def running(self) -> bool:
"""Return True if the AMM is being triggered periodically; False otherwise"""
return f"amm-{id(self)}" in self.scheduler.periodic_callbacks
def add_policy(self, policy: ActiveMemoryManagerPolicy) -> None:
if not isinstance(policy, ActiveMemoryManagerPolicy):
raise TypeError(f"Expected ActiveMemoryManagerPolicy; got {policy!r}")
self.policies.add(policy)
policy.manager = self
def run_once(self) -> None:
"""Run all policies once and asynchronously (fire and forget) enact their
recommendations to replicate/drop tasks
"""
with log_errors():
# This should never fail since this is a synchronous method
assert not hasattr(self, "pending")
self.pending = defaultdict(lambda: (set(), set()))
self.workers_memory = {
w: w.memory.optimistic for w in self.scheduler.workers.values()
}
try:
# populate self.pending
self._run_policies()
if self.pending:
logger.debug("Enacting suggestions for %d tasks", len(self.pending))
self._enact_suggestions()
finally:
del self.workers_memory
del self.pending
def _run_policies(self) -> None:
"""Sequentially run ActiveMemoryManagerPolicy.run() for all registered policies,
obtain replicate/drop suggestions, and use them to populate self.pending.
"""
candidates: set[WorkerState] | None
cmd: str
ws: WorkerState | None
ts: TaskState
nreplicas: int
for policy in list(self.policies): # a policy may remove itself
policy_gen = policy.run()
ws = None
while True:
try:
cmd, ts, candidates = policy_gen.send(ws)
except StopIteration:
break # next policy
pending_repl, pending_drop = self.pending[ts]
if cmd == "replicate":
ws = self._find_recipient(ts, candidates, pending_repl)
if ws:
pending_repl.add(ws)
self.workers_memory[ws] += ts.nbytes
elif cmd == "drop":
ws = self._find_dropper(ts, candidates, pending_drop)
if ws:
pending_drop.add(ws)
self.workers_memory[ws] = max(
0, self.workers_memory[ws] - ts.nbytes
)
else:
raise ValueError(f"Unknown command: {cmd}") # pragma: nocover
def _find_recipient(
self,
ts: TaskState,
candidates: set[WorkerState] | None,
pending_repl: set[WorkerState],
) -> WorkerState | None:
"""Choose a worker to acquire a new replica of an in-memory task among a set of
candidates. If candidates is None, default to all workers in the cluster.
Regardless, workers that either already hold a replica or are scheduled to
receive one at the end of this AMM iteration are not considered.
Returns
-------
The worker with the lowest memory usage (downstream of pending replications and
drops), or None if no eligible candidates are available.
"""
if ts.state != "memory":
return None
if candidates is None:
candidates = self.scheduler.running.copy()
else:
candidates &= self.scheduler.running
candidates -= ts.who_has
candidates -= pending_repl
if not candidates:
return None
# Select candidate with the lowest memory usage
return min(candidates, key=self.workers_memory.__getitem__)
def _find_dropper(
self,
ts: TaskState,
candidates: set[WorkerState] | None,
pending_drop: set[WorkerState],
) -> WorkerState | None:
"""Choose a worker to drop its replica of an in-memory task among a set of
candidates. If candidates is None, default to all workers in the cluster.
Regardless, workers that either do not hold a replica or are already scheduled
to drop theirs at the end of this AMM iteration are not considered.
This method also ensures that a key will not lose its last replica.
Returns
-------
The worker with the highest memory usage (downstream of pending replications and
drops), or None if no eligible candidates are available.
"""
if len(ts.who_has) - len(pending_drop) < 2:
return None
if candidates is None:
candidates = ts.who_has.copy()
else:
candidates &= ts.who_has
candidates -= pending_drop
candidates -= {waiter_ts.processing_on for waiter_ts in ts.waiters}
if not candidates:
return None
# Select candidate with the highest memory usage.
# Drop from workers with status paused or closing_gracefully first.
return max(
candidates,
key=lambda ws: (ws.status != Status.running, self.workers_memory[ws]),
)
def _enact_suggestions(self) -> None:
"""Iterate through self.pending, which was filled by self._run_policies(), and
push the suggestions to the workers through bulk comms. Return immediately.
"""
drop_by_worker: (defaultdict[WorkerState, set[TaskState]]) = defaultdict(set)
repl_by_worker: (
defaultdict[WorkerState, dict[TaskState, set[str]]]
) = defaultdict(dict)
for ts, (pending_repl, pending_drop) in self.pending.items():
if not ts.who_has:
continue
who_has = {ws_snd.address for ws_snd in ts.who_has - pending_drop}
assert who_has # Never drop the last replica
for ws_rec in pending_repl:
assert ws_rec not in ts.who_has
repl_by_worker[ws_rec][ts] = who_has
for ws in pending_drop:
assert ws in ts.who_has
drop_by_worker[ws].add(ts)
# Fire-and-forget enact recommendations from policies
stimulus_id = str(time())
for ws_rec, ts_to_who_has in repl_by_worker.items():
self.scheduler.stream_comms[ws_rec.address].send(
{
"op": "acquire-replicas",
"keys": [ts.key for ts in ts_to_who_has],
"stimulus_id": "acquire-replicas-" + stimulus_id,
"priorities": {ts.key: ts.priority for ts in ts_to_who_has},
"who_has": {ts.key: v for ts, v in ts_to_who_has.items()},
},
)
for ws, tss in drop_by_worker.items():
# The scheduler immediately forgets about the replica and suggests the
# worker to drop it. The worker may refuse, at which point it will send back
# an add-keys message to reinstate it.
for ts in tss:
self.scheduler.remove_replica(ts, ws)
self.scheduler.stream_comms[ws.address].send(
{
"op": "remove-replicas",
"keys": [ts.key for ts in tss],
"stimulus_id": "remove-replicas-" + stimulus_id,
}
)
class ActiveMemoryManagerPolicy:
"""Abstract parent class"""
manager: ActiveMemoryManagerExtension
def __repr__(self) -> str:
return f"{self.__class__.__name__}()"
def run(
self,
) -> Generator[
tuple[str, TaskState, set[WorkerState] | None],
WorkerState | None,
None,
]:
"""This method is invoked by the ActiveMemoryManager every few seconds, or
whenever the user invokes ``client.amm.run_once``.
It is an iterator that must emit any of the following:
- "replicate", <TaskState>, None
- "replicate", <TaskState>, {subset of potential workers to replicate to}
- "drop", <TaskState>, None
- "drop", <TaskState>, {subset of potential workers to drop from}
Each element yielded indicates the desire to create or destroy a single replica
of a key. If a subset of workers is not provided, it defaults to all workers on
the cluster. Either the ActiveMemoryManager or the Worker may later decide to
disregard the request, e.g. because it would delete the last copy of a key or
because the key is currently needed on that worker.
You may optionally retrieve which worker it was decided the key will be
replicated to or dropped from, as follows:
.. code-block:: python
choice = (yield "replicate", ts, None)
``choice`` is either a WorkerState or None; the latter is returned if the
ActiveMemoryManager chose to disregard the request.
The current pending (accepted) commands can be inspected on
``self.manager.pending``; this includes the commands previously yielded by this
same method.
The current memory usage on each worker, *downstream of all pending commands*,
can be inspected on ``self.manager.workers_memory``.
"""
raise NotImplementedError("Virtual method") # pragma: nocover
class AMMClientProxy:
"""Convenience accessors to operate the AMM from the dask client
Usage: ``client.amm.start()`` etc.
All methods are asynchronous if the client is asynchronous and synchronous if the
client is synchronous.
"""
_client: Client
def __init__(self, client: Client):
self._client = client
def _run(self, method: str):
"""Remotely invoke ActiveMemoryManagerExtension.amm_handler"""
return self._client.sync(self._client.scheduler.amm_handler, method=method)
def start(self):
return self._run("start")
def stop(self):
return self._run("stop")
def run_once(self):
return self._run("run_once")
def running(self):
return self._run("running")
class ReduceReplicas(ActiveMemoryManagerPolicy):
"""Make sure that in-memory tasks are not replicated on more workers than desired;
drop the excess replicas.
"""
def run(self):
""""""
nkeys = 0
ndrop = 0
for ts in self.manager.scheduler.replicated_tasks:
desired_replicas = 1 # TODO have a marker on TaskState
# If a dependent task has not been assigned to a worker yet, err on the side
# of caution and preserve an additional replica for it.
# However, if two dependent tasks have been already assigned to the same
# worker, don't double count them.
nwaiters = len({waiter.processing_on or waiter for waiter in ts.waiters})
ndrop_key = len(ts.who_has) - max(desired_replicas, nwaiters)
if ts in self.manager.pending:
pending_repl, pending_drop = self.manager.pending[ts]
ndrop_key += len(pending_repl) - len(pending_drop)
if ndrop_key > 0:
nkeys += 1
ndrop += ndrop_key
for _ in range(ndrop_key):
yield "drop", ts, None
if ndrop:
logger.debug(
"ReduceReplicas: Dropping %d superfluous replicas of %d tasks",
ndrop,
nkeys,
)
class RetireWorker(ActiveMemoryManagerPolicy):
"""Replicate somewhere else all unique in-memory tasks on a worker, preparing for
its shutdown.
At any given time, the AMM may have registered multiple instances of this policy,
one for each worker currently being retired - meaning that most of the time no
instances will be registered at all. For this reason, this policy doesn't figure in
the dask config (:file:`distributed.yaml`). Instances are added by
:meth:`distributed.Scheduler.retire_workers` and automatically remove themselves
once the worker has been retired. If the AMM is disabled in the dask config,
:meth:`~distributed.Scheduler.retire_workers` will start a temporary ad-hoc one.
**Failure condition**
There may not be any suitable workers to receive the tasks from the retiring worker.
This happens in two use cases:
1. This is the only worker in the cluster, or
2. All workers are either paused or being retired at the same time
In either case, this policy will fail to move out all keys and
Scheduler.retire_workers will abort the retirement. The flag ``no_recipients`` will
be raised.
There is a third use case, where a task fails to be replicated away for whatever
reason; in this case we'll just wait for the next AMM iteration and try again
(possibly with a different receiving worker, e.g. if the receiving worker was
hung but not yet declared dead).
**Retiring a worker with spilled tasks**
On its very first iteration, this policy suggests that other workers should fetch
all unique in-memory tasks. Frequently, this means that in the next few moments the
worker to be retired will be bombarded by :meth:`distributed.worker.Worker.get_data`
calls from the rest of the cluster. This can be a problem if most of the managed
memory of the worker has been spilled out, as it could send the worker above the
terminate threshold. Two measures are in place in order to prevent this:
- At every iteration, this policy drops all tasks that have already been replicated
somewhere else. This makes room for further tasks to be moved out of the spill
file in order to be replicated onto another worker.
- Once a worker passes the ``pause`` threshold,
:meth:`~distributed.worker.Worker.get_data` throttles the number of outgoing
connections to 1.
Parameters
==========
address: str
URI of the worker to be retired
"""
address: str
no_recipients: bool
def __init__(self, address: str):
self.address = address
self.no_recipients = False
def __repr__(self) -> str:
return f"RetireWorker({self.address}, done={self.done})"
def run(self):
""""""
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
self.manager.policies.remove(self)
return
nrepl = 0
nno_rec = 0
for ts in ws.has_what:
pending_repl, pending_drop = self.manager.pending[ts]
if len(ts.who_has) > 1:
# There are already replicas of this key on other workers.
# Use cases:
# 1. The suggestion is accepted by the AMM and by the Worker.
# The replica on this worker is dropped.
# 2. The suggestion is accepted by the AMM, but rejected by the Worker.
# We'll try again at the next AMM iteration.
# 3. The suggestion is rejected by the AMM, because another policy
# (e.g. ReduceReplicas) already suggested the same for this worker
# 4. The suggestion is rejected by the AMM, because all replicas of the
# key are on workers being retired and the other RetireWorker
# instances already asked the same suggestion. We need to deal with
# this case and create a replica elsewhere.
if ws in pending_drop:
continue # Use case 3
drop_ws = (yield "drop", ts, {ws})
if drop_ws:
continue # Use case 1 or 2
# Either the worker holds the only replica or all replicas are being held
# by workers that are being retired
nrepl += 1
if not pending_repl:
rec_ws = (yield "replicate", ts, None)
if not rec_ws:
# replication was rejected by the AMM (see _find_recipient)
nno_rec += 1
if nno_rec:
# All workers are paused or closing_gracefully.
# Scheduler.retire_workers will read this flag and exit immediately.
# TODO after we implement the automatic transition of workers from paused
# to closing_gracefully after a timeout expires, we should revisit this
# code to wait for paused workers and only exit immediately if all
# workers are in closing_gracefully status.
self.no_recipients = True
logger.warning(
f"Tried retiring worker {self.address}, but {nno_rec} tasks could not "
"be moved as there are no suitable workers to receive them. "
"The worker will not be retired."
)
self.manager.policies.remove(self)
elif nrepl:
logger.info(
f"Retiring worker {self.address}; {nrepl} keys are being moved away.",
)
else:
logger.info(
f"Retiring worker {self.address}; no unique keys need to be moved away."
)
self.manager.policies.remove(self)
def done(self) -> bool:
"""Return True if it is safe to close the worker down; False otherwise"""
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
return True
return all(len(ts.who_has) > 1 for ts in ws.has_what)