Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMM high level documentation #5456

Merged
merged 3 commits into from Oct 22, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions distributed/active_memory_manager.py
Expand Up @@ -304,9 +304,9 @@ def run(
You may optionally retrieve which worker it was decided the key will be
replicated to or dropped from, as follows:

```python
choice = yield "replicate", ts, None
```
.. code-block:: python

choice = yield "replicate", ts, None

``choice`` is either a WorkerState or None; the latter is returned if the
ActiveMemoryManager chose to disregard the request.
Expand Down
242 changes: 242 additions & 0 deletions docs/source/active_memory_manager.rst
@@ -0,0 +1,242 @@
Active Memory Manager
=====================
The Active Memory Manager, or *AMM*, is an experimental daemon that optimizes memory
usage of workers across the Dask cluster. It is disabled by default.


Memory imbalance and duplication
--------------------------------
Whenever a Dask task returns data, it is stored on the worker that executed the task for
as long as it's a dependency of other tasks, is referenced by a ``Client`` through a
``Future``, or is part of a :doc:`published dataset <publish>`.

Dask assigns tasks to workers following criteria of CPU occupancy, :doc:`resources`, and
locality. In the trivial use case of tasks that are not connected to each other, take
the same time to compute, return data of the same size, and have no resource
constraints, one will observe a perfect balance in memory occupation across workers too.
In all other use cases, however, as the computation goes it could cause an imbalance in
memory usage.

When a task runs on a worker and requires in input the output of a task from a different
worker, Dask will transparently transfer the data between workers, ending up with
multiple copies of the same data on different workers. This is generally desirable, as
it avoids re-transferring the data if it's required again later on. However, it also
causes increased overall memory usage across the cluster.


Enabling the Active Memory Manager
----------------------------------
The AMM can be enabled through the :doc:`Dask configuration file <configuration>`:

.. code-block:: yaml

distributed:
scheduler:
active-memory-manager:
start: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit - is it too late to change these config fields? I'd find enable: true to be a clearer config name.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO I find enable to be ambiguous. You can call run_once while it's not started.

interval: 2s

The above is the recommended setup and will run all enabled *AMM policies* (see below)
every two seconds. Alternatively, you can manually start/stop the AMM from the
``Client`` or trigger a one-off iteration:

.. code-block:: python

>>> client.scheduler.amm_start() # Start running every 2 seconds
>>> client.scheduler.amm_stop() # Stop running periodically
>>> client.scheduler.amm_run_once()


Policies
--------
The AMM by itself doesn't do anything. The user must enable *policies* which suggest
actions regarding Dask data. The AMM runs the policies and enacts their suggestions, as
long as they don't harm data integrity. These suggestions can be of two types:

- Replicate the data of an in-memory Dask task from one worker to another.
This should not be confused with replication caused by task dependencies.
- Delete one or more replicas of an in-memory task. The AMM will never delete the last
replica of a task, even if a policy asks to.

Unless a policy puts constraints on which workers should be impacted, the AMM will
automatically create replicas on workers with the lowest memory usage first and delete
them from workers with the highest memory usage first.

Individual policies are enabled, disabled, and configured through the Dask config:


.. code-block:: yaml

distributed:
scheduler:
active-memory-manager:
start: true
interval: 2s
policies:
- class: distributed.active_memory_manager.ReduceReplicas
- class: my_package.MyPolicy
arg1: foo
arg2: bar

See below for custom policies like the one in the example above.

The default Dask config file contains a sane selection of builtin policies that should
be generally desirable. You should try first with just ``start: true`` in your Dask
config and see if it is fit for purpose for you before you tweak individual policies.


Built-in policies
-----------------
ReduceReplicas
++++++++++++++
class
``distributed.active_memory_manager.ReduceReplicas``
parameters
None

This policy is enabled in the default Dask config. Whenever a Dask task is replicated
on more than one worker and the additional replicas don't appear to serve an ongoing
computation, this policy drops all excess replicas.

.. note::
This policy is incompatible with :meth:`~distributed.Client.replicate` and with the
``broadcast`` parameter of :meth:`~distributed.Client.scatter`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment more on what this incompatibility entails? What happens if AMM is enabled and scatter(..., broadcast=True) is called in user code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done



Custom policies
---------------
Power users can write their own policies by subclassing
:class:`~distributed.active_memory_manager.ActiveMemoryManagerPolicy`. The class should
define two methods:

``__init__``
A custom policy may load parameters from the Dask config through ``__init__``
parameters. If you don't need configuration, you don't need to implement this
method.
``run``
This method accepts no parameters and is invoked by the AMM every 2 seconds (or
whatever the AMM interval is).
It must yield zero or more of the following *suggestion* tuples:

``yield "replicate", <TaskState>, None``
Create one replica of the target task on the worker with the lowest memory usage
that doesn't hold a replica yet. To create more than one replica, you need to
yield the same command more than once.
``yield "replicate", <TaskState>, {<WorkerState>, <WorkerState>, ...}``
Create one replica of the target task on the worker with the lowest memory among
the listed candidates.
``yield "drop", <TaskState>, None``
Delete one replica of the target task one the worker with the highest memory
usage across the whole cluster.
``yield "drop", <TaskState>, {<WorkerState>, <WorkerState>, ...}``
Delete one replica of the target task on the worker with the highest memory
among the listed candidates.

The AMM will silently reject unacceptable suggestions, such as:

- Delete the last replica of a task
- Delete a replica from a subset of workers that don't hold any
- Delete a replica from a worker that currently needs it for computation
- Replicate a task that is not yet in memory
- Create more replicas of a task than there are workers
- Create replicas of a task on workers that already hold them
- Create replicas on paused or retiring workers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to enable debug logs for issues like these? When developing a new policy, should you try to avoid generating unacceptable suggestions (in which case you'd want to be able to debug these issues), or is generating unacceptable suggestions (and relying on the AMM to ignore them) in a policy fine and intended if it makes the policy code simpler?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter. Policies should be simple and readable. The subtle edge case management is left to the AMM.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a note to this section clarifying that policies shouldn't worry about these cases?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


Optionally, the ``run`` method may retrieve which worker the AMM just selected, as
follows:

.. code-block:: python

ws = (yield "drop", ts, None)

The ``run`` method can access the following attributes:

``self.manager``
The :class:`~distributed.active_memory_manager.ActiveMemoryManagerExtension` that
the policy is attached to
``self.manager.scheduler``
:class:`~distributed.Scheduler` to which the suggestions will be applied. From there
you can access various attributes such as ``tasks`` and ``workers``.
``self.manager.workers_memory``
Read-only mapping of ``{WorkerState: bytes}``. bytes is the expected RAM usage of
the worker after all suggestions accepted so far in the current AMM iteration, from
all policies, will be enacted. Note that you don't need to access this if you are
happy to always create/delete replicas on the workers with the lowest and highest
memory usage respectively - the AMM will handle it for you.
``self.manager.pending``
Read-only mapping of ``{TaskState: ({<WorkerState>, ...}, {<WorkerState>, ...})``.
The first set contains the workers that will receive a new replica of the task
according to the suggestions accepted so far; the second set contains the workers
which will lose a replica.
``self.manager.policies``
Set of policies registered in the AMM. A policy can deregister itself as follows:

.. code-block:: python

def run(self):
self.manager.policies.drop(self)

Example
+++++++
The following custom policy ensures that keys "foo" and "bar" are replicated on all
workers at all times. New workers will receive a replica soon after connecting to the
scheduler. The policy will do nothing if the target keys are not in memory somewhere or
if all workers already hold a replica.
Note that this example is incompatible with the ``ReduceReplicas`` built-in policy.

In mymodule.py (it must be accessible by the scheduler):

.. code-block:: python

from distributed.active_memory_manager import ActiveMemoryManagerPolicy


class EnsureBroadcast(ActiveMemoryManagerPolicy):
def __init__(self, key):
self.key = key

def run(self):
ts = self.manager.scheduler.tasks.get(self.key)
if not ts:
return
for _ in range(len(self.manager.scheduler.workers) - len(ts.who_has)):
yield "replicate", ts, None

Note that the policy doesn't bother testing for edge cases such as paused workers or
other policies also requesting replicas; the AMM takes care of it. In theory you could
rewrite the last two lines as follows (at the cost of some wasted CPU cycles):

.. code-block:: python

for _ in range(1000):
yield "replicate", ts, None

In distributed.yaml:

.. code-block:: yaml

distributed:
scheduler:
active-memory-manager:
start: true
interval: 2s
policies:
- class: mymodule.EnsureBroadcast
key: foo
- class: mymodule.EnsureBroadcast
key: bar

We could have alternatively used a single policy instance with a list of keys - the
above design merely illustrates that you may have multiple instances of the same policy
running side by side.


API reference
-------------
.. autoclass:: distributed.active_memory_manager.ActiveMemoryManagerExtension
:members:

.. autoclass:: distributed.active_memory_manager.ActiveMemoryManagerPolicy
:members:

.. autoclass:: distributed.active_memory_manager.ReduceReplicas
2 changes: 1 addition & 1 deletion docs/source/conf.py
Expand Up @@ -147,7 +147,7 @@
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ["_static"]
html_static_path = []

# Add any extra paths that contain custom files (such as robots.txt or
# .htaccess) here, relative to this directory. These files are copied
Expand Down
5 changes: 2 additions & 3 deletions docs/source/diagnosing-performance.rst
Expand Up @@ -19,9 +19,8 @@ identify performance issues.
Fortunately, Dask collects a variety of diagnostic information during
execution. It does this both to provide performance feedback to users, but
also for its own internal scheduling decisions. The primary place to observe
this feedback is the :doc:`diagnostic dashboard <web>`. This document
describes the various pieces of performance information available and how to
access them.
this feedback is the diagnostic dashboard. This document describes the various
pieces of performance information available and how to access them.


Task start and stop times
Expand Down
3 changes: 1 addition & 2 deletions docs/source/index.rst
Expand Up @@ -109,14 +109,13 @@ Contents

actors
asynchronous
configuration
ipython
prometheus
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these rows deleted?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those pages don't exist anywhere

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch -- these are over in the dask/dask docs now (e.g. https://docs.dask.org/en/stable/setup/prometheus.html)

http_services
publish
resources
task-launch
tls
active_memory_manager

.. toctree::
:maxdepth: 1
Expand Down
2 changes: 1 addition & 1 deletion docs/source/tls.rst
Expand Up @@ -50,7 +50,7 @@ One can also pass additional parameters:

All those parameters can be passed in several ways:

* through the Dask :ref:`configuration file <configuration>`;
* through the Dask :doc:`configuration file <configuration>`;
* if using the command line, through options to ``dask-scheduler`` and
``dask-worker``;
* if using the API, through a ``Security`` object. For example, here is
Expand Down