Skip to content

Commit

Permalink
Revert "Update rearrange_by_column patch for explicit comms" (#1001)
Browse files Browse the repository at this point in the history
Reverts #992, which had led to unexpected issues. See rapidsai/cudf#11800 (review)

cc @wence-

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #1001
  • Loading branch information
rjzamora committed Sep 28, 2022
1 parent 8de9ce3 commit c0ae66c
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 43 deletions.
8 changes: 5 additions & 3 deletions dask_cuda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from ._version import get_versions
from .cuda_worker import CUDAWorker
from .explicit_comms.dataframe.shuffle import get_rearrange_by_column_wrapper
from .explicit_comms.dataframe.shuffle import get_rearrange_by_column_tasks_wrapper
from .local_cuda_cluster import LocalCUDACluster
from .proxify_device_objects import proxify_decorator, unproxify_decorator

Expand All @@ -19,8 +19,10 @@


# Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True`
dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper(
dask.dataframe.shuffle.rearrange_by_column
dask.dataframe.shuffle.rearrange_by_column_tasks = (
get_rearrange_by_column_tasks_wrapper(
dask.dataframe.shuffle.rearrange_by_column_tasks
)
)


Expand Down
28 changes: 5 additions & 23 deletions dask_cuda/explicit_comms/dataframe/shuffle.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import functools
import inspect
import warnings
from collections import defaultdict
from operator import getitem
from typing import Dict, List, Optional
Expand Down Expand Up @@ -345,7 +344,7 @@ def shuffle(
return new_dd_object(dsk, name, meta, divs).persist()


def get_rearrange_by_column_wrapper(func):
def get_rearrange_by_column_tasks_wrapper(func):
"""Returns a function wrapper that dispatch the shuffle to explicit-comms.
Notice, this is monkey patched into Dask at dask_cuda import
Expand All @@ -355,40 +354,23 @@ def get_rearrange_by_column_wrapper(func):

@functools.wraps(func)
def wrapper(*args, **kwargs):
# Use explicit-comms shuffle if the shuffle kwarg is
# set to "explicit-comms". For now, we also use
# explicit-comms if the shuffle kwarg is set to "tasks"
# and the `dask.config` specifies "explicit-comms".
# However, this behavior should be deprecated in the
# next dask-cuda release (after 22.10)
shuffle_arg = kwargs.pop("shuffle", None) or dask.config.get("shuffle", "disk")
if shuffle_arg == "tasks" and dask.config.get("explicit-comms", False):
shuffle_arg = "explicit-comms"
warnings.warn(
"The 'explicit-comms' config field is now deprecated and "
"will be removed in a future dask-cuda version. Please set "
"the 'shuffle' config to 'explicit-comms' instead, or pass "
"`shuffle='explicit-comms'` to the collection API.",
FutureWarning,
)

if shuffle_arg == "explicit-comms":
if dask.config.get("explicit-comms", False):
try:
import distributed.worker

# Make sure we have an activate client.
distributed.worker.get_client()
except (ImportError, ValueError):
shuffle_arg = "tasks" # Fall back to task-based shuffle
pass
else:
# Convert `*args, **kwargs` to a dict of `keyword -> values`
kw = func_sig.bind(*args, **kwargs)
kw.apply_defaults()
kw = kw.arguments
column = kw["col"]
column = kw["column"]
if isinstance(column, str):
column = [column]
return shuffle(kw["df"], column, kw["npartitions"], kw["ignore_index"])
return func(*args, shuffle=shuffle_arg, **kwargs)
return func(*args, **kwargs)

return wrapper
16 changes: 1 addition & 15 deletions dask_cuda/tests/test_explicit_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,30 +168,16 @@ def check_shuffle(in_cluster):
"""
name = "explicit-comms-shuffle"
ddf = dd.from_pandas(pd.DataFrame({"key": np.arange(10)}), npartitions=2)
# TODO: Update when "explicit-comms" config is deprecated
# See: https://github.com/rapidsai/dask-cuda/pull/992
with dask.config.set(explicit_comms=False):
res = ddf.shuffle(on="key", npartitions=4, shuffle="tasks")
assert all(name not in str(key) for key in res.dask)
with dask.config.set(explicit_comms=True):
with pytest.warns(FutureWarning):
res = ddf.shuffle(on="key", npartitions=4, shuffle="tasks")
res = ddf.shuffle(on="key", npartitions=4, shuffle="tasks")
if in_cluster:
assert any(name in str(key) for key in res.dask)
else: # If not in cluster, we cannot use explicit comms
assert all(name not in str(key) for key in res.dask)

# Passing explicit `shuffle="explicit-comms"` argument
res = dd.shuffle.shuffle(ddf, "key", npartitions=4, shuffle="explicit-comms")
if in_cluster:
assert any(name in str(key) for key in res.dask)
else: # If not in cluster, we cannot use explicit comms
assert all(name not in str(key) for key in res.dask)

# Passing explicit `shuffle="tasks"` argument
res = dd.shuffle.shuffle(ddf, "key", npartitions=4, shuffle="tasks")
assert all(name not in str(key) for key in res.dask)

with LocalCluster(
protocol="tcp",
dashboard_address=None,
Expand Down
5 changes: 3 additions & 2 deletions docs/source/explicit_comms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Explicit-comms
==============

Communication and scheduling overhead can be a major bottleneck in Dask/Distributed. Dask-CUDA addresses this by introducing an API for explicit communication in Dask tasks.
The idea is that Dask/Distributed spawns workers and distributes the data as usual while the user can submit tasks on the workers that communicate explicitly.
The idea is that Dask/Distributed spawns workers and distribute data as usually while the user can submit tasks on the workers that communicate explicitly.

This makes it possible to bypass Distributed's scheduler and write hand-tuned computation and communication patterns. Currently, Dask-CUDA includes an explicit-comms
implementation of the Dataframe `shuffle <https://github.com/rapidsai/dask-cuda/blob/d3c723e2c556dfe18b47b392d0615624453406a5/dask_cuda/explicit_comms/dataframe/shuffle.py#L210>`_ operation used for merging and sorting.
Expand All @@ -11,6 +11,7 @@ implementation of the Dataframe `shuffle <https://github.com/rapidsai/dask-cuda/
Usage
-----

In order to use explicit-comms in Dask/Distributed for shuffle-based DataFrame operations, simply pass the ``shuffle="explicit-comms"`` key-word argument to the appropriate DataFrame API (e.g. ``ddf.sort_values(..., shuffle="explicit-comms")```). You can also change the default shuffle algorithm to explicit-comms by define the ``DASK_SHUFFLE=explicit-comms`` environment variable, or by setting the ``"shuffle"`` key to ``"explicit-comms"`` in the `Dask configuration <https://docs.dask.org/en/latest/configuration.html>`_.
In order to use explicit-comms in Dask/Distributed automatically, simply define the environment variable ``DASK_EXPLICIT_COMMS=True`` or setting the ``"explicit-comms"``
key in the `Dask configuration <https://docs.dask.org/en/latest/configuration.html>`_.

It is also possible to use explicit-comms in tasks manually, see the `API <api.html#explicit-comms>`_ and our `implementation of shuffle <https://github.com/rapidsai/dask-cuda/blob/branch-0.20/dask_cuda/explicit_comms/dataframe/shuffle.py>`_ for guidance.

0 comments on commit c0ae66c

Please sign in to comment.