Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename futures to tasks #10906

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@ dependencies:
- jinja2
- pip
- pip:
- git+https://github.com/dask/distributed
- git+https://github.com/milesgranger/distributed@milesgranger/rename-future-to-task
4 changes: 2 additions & 2 deletions continuous_integration/environment-3.11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,5 @@ dependencies:
- jinja2
- pip
- pip:
- git+https://github.com/dask/distributed
- git+https://github.com/dask-contrib/dask-expr
- git+https://github.com/milesgranger/distributed@milesgranger/rename-future-to-task
- git+https://github.com/milesgranger/dask-expr@milesgranger/rename-future-to-task
4 changes: 2 additions & 2 deletions continuous_integration/environment-3.12.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ dependencies:
- jinja2
- pip
- pip:
- git+https://github.com/dask/distributed
- git+https://github.com/dask-contrib/dask-expr
- git+https://github.com/milesgranger/distributed@milesgranger/rename-future-to-task
- git+https://github.com/milesgranger/dask-expr@milesgranger/rename-future-to-task
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@ dependencies:
- jinja2
- pip
- pip:
- git+https://github.com/dask/distributed
- git+https://github.com/milesgranger/distributed@milesgranger/rename-future-to-task
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies:
# optional dependencies pulled in by pip install dask[distributed]
- pip
- pip:
- git+https://github.com/dask/distributed
- git+https://github.com/milesgranger/distributed@milesgranger/rename-future-to-task
# test dependencies
- pre-commit
- pytest
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-mindeps-optional.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ dependencies:
- pip
- pip:
# optional dependencies pulled in by pip install dask[distributed]
- git+https://github.com/dask/distributed
- git+https://github.com/milesgranger/distributed@milesgranger/rename-future-to-task
# test dependencies
- pre-commit
- pytest
Expand Down
2 changes: 1 addition & 1 deletion dask/array/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def _check_chunks(x, check_ndim=True, scheduler=None):
x = x.persist(scheduler=scheduler)
for idx in itertools.product(*(range(len(c)) for c in x.chunks)):
chunk = x.dask[(x.name,) + idx]
if hasattr(chunk, "result"): # it's a future
if hasattr(chunk, "result"): # it's a task
chunk = chunk.result()
if not hasattr(chunk, "dtype"):
chunk = np.array(chunk, dtype="O")
Expand Down
6 changes: 3 additions & 3 deletions dask/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def persist(self, **kwargs):
task scheduler. If the task scheduler supports asynchronous computing,
such as is the case of the dask.distributed scheduler, then persist
will return *immediately* and the return value's task graph will
contain Dask Future objects. However if the task scheduler only
contain Dask Task objects. However if the task scheduler only
supports blocking computation then the call to persist will *block*
and the return value's task graph will contain concrete Python results.

Expand Down Expand Up @@ -933,7 +933,7 @@ def persist(*args, traverse=True, optimize_graph=True, scheduler=None, **kwargs)
>>> df['in-debt'] = df.balance < 0 # doctest: +SKIP
>>> df = df.persist() # triggers computation # doctest: +SKIP

>>> df.value().min() # future computations are now fast # doctest: +SKIP
>>> df.value().min() # task computations are now fast # doctest: +SKIP
-10
>>> df.value().max() # doctest: +SKIP
100
Expand Down Expand Up @@ -1525,7 +1525,7 @@ def wait(x, timeout=None, return_when="ALL_COMPLETED"):
"""Wait until computation has finished

This is a compatibility alias for ``dask.distributed.wait``.
If it is applied onto Dask collections without Dask Futures or if Dask
If it is applied onto Dask collections without Dask Tasks or if Dask
distributed is not installed then it is a no-op
"""
try:
Expand Down
2 changes: 1 addition & 1 deletion dask/dataframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5377,7 +5377,7 @@ def set_index(

With ``sort=True``, this function is much more expensive. Under normal
operation this function does an initial pass over the index column to
compute approximate quantiles to serve as future divisions. It then passes
compute approximate quantiles to serve as task divisions. It then passes
over the data a second time, splitting up each input partition into several
pieces and sharing those pieces to all of the output partitions now in
sorted order.
Expand Down
4 changes: 2 additions & 2 deletions dask/dataframe/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ def to_records(df):

@insert_meta_param_description
def from_delayed(
dfs: Delayed | distributed.Future | Iterable[Delayed | distributed.Future],
dfs: Delayed | distributed.Task | Iterable[Delayed | distributed.Task],
meta=None,
divisions: tuple | Literal["sorted"] | None = None,
prefix: str = "from-delayed",
Expand All @@ -595,7 +595,7 @@ def from_delayed(
Parameters
----------
dfs :
A ``dask.delayed.Delayed``, a ``distributed.Future``, or an iterable of either
A ``dask.delayed.Delayed``, a ``distributed.Task``, or an iterable of either
of these objects, e.g. returned by ``client.submit``. These comprise the
individual partitions of the resulting dataframe.
If a single object is provided (not an iterable), then the resulting dataframe
Expand Down
4 changes: 2 additions & 2 deletions dask/dataframe/io/orc/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def to_orc(
Key/value pairs to be passed on to the file-system backend, if any.
compute : bool, default True
If True (default) then the result is computed immediately. If False
then a ``dask.delayed`` object is returned for future computation.
then a ``dask.delayed`` object is returned for task computation.
compute_kwargs : dict, default True
Options to be passed in to the compute method

Expand Down Expand Up @@ -235,7 +235,7 @@ def to_orc(
dsk[(final_name, 0)] = (lambda x: None, part_tasks)
graph = HighLevelGraph.from_collections((final_name, 0), dsk, dependencies=[df])

# Compute or return future
# Compute or return task
if compute:
if compute_kwargs is None:
compute_kwargs = dict()
Expand Down
4 changes: 2 additions & 2 deletions dask/dataframe/io/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ def to_parquet(
compute : bool, default True
If ``True`` (default) then the result is computed immediately. If
``False`` then a ``dask.dataframe.Scalar`` object is returned for
future computation.
task computation.
compute_kwargs : dict, default True
Options to be passed in to the compute method
schema : pyarrow.Schema, dict, "infer", or None, default "infer"
Expand Down Expand Up @@ -1100,7 +1100,7 @@ def create_metadata_file(
inputs to be handled by any one task in the tree. Defaults to 32.
compute : bool, optional
If True (default) then the result is computed immediately. If False
then a ``dask.delayed`` object is returned for future computation.
then a ``dask.delayed`` object is returned for task computation.
compute_kwargs : dict, optional
Options to be passed in to the compute method
fs : fsspec object, optional
Expand Down
2 changes: 1 addition & 1 deletion dask/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ class DataFrameIOLayer(Blockwise):
Whether one or more elements of `inputs` is expected to
contain a nested task. This argument in only used for
serialization purposes, and will be deprecated in the
future. Default is False.
task. Default is False.
creation_info: dict (optional)
Dictionary containing the callable function ('func'),
positional arguments ('args'), and key-word arguments
Expand Down
7 changes: 4 additions & 3 deletions dask/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@

import os
from collections.abc import Mapping, Sequence
from concurrent.futures import Executor, Future
from concurrent.futures import Executor
from concurrent.futures import Future as ConcurrentFuture
from functools import partial
from queue import Empty, Queue

Expand Down Expand Up @@ -538,7 +539,7 @@ class SynchronousExecutor(Executor):
_max_workers = 1

def submit(self, fn, *args, **kwargs):
fut = Future()
fut = ConcurrentFuture()
try:
fut.set_result(fn(*args, **kwargs))
except BaseException as e:
Expand Down Expand Up @@ -585,7 +586,7 @@ def submit(self, fn, *args, **kwargs):


def submit_apply_async(apply_async, fn, *args, **kwargs):
fut = Future()
fut = ConcurrentFuture()
apply_async(fn, args, kwargs, fut.set_result, fut.set_exception)
return fut

Expand Down
18 changes: 9 additions & 9 deletions dask/tests/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ def test_futures_to_delayed_dataframe(c):

df = pd.DataFrame({"x": [1, 2, 3]})

futures = c.scatter([df, df])
ddf = dd.from_delayed(futures)
tasks = c.scatter([df, df])
ddf = dd.from_delayed(tasks)
dd.utils.assert_eq(ddf.compute(), pd.concat([df, df], axis=0))

# Make sure from_delayed is Blockwise
Expand Down Expand Up @@ -242,8 +242,8 @@ def foo():
def test_futures_to_delayed_bag(c):
L = [1, 2, 3]

futures = c.scatter([L, L])
b = db.from_delayed(futures)
tasks = c.scatter([L, L])
b = db.from_delayed(tasks)
assert list(b) == L + L


Expand All @@ -254,9 +254,9 @@ def test_futures_to_delayed_array(c):
np = pytest.importorskip("numpy")
x = np.arange(5)

futures = c.scatter([x, x])
tasks = c.scatter([x, x])
A = da.concatenate(
[da.from_delayed(f, shape=x.shape, dtype=x.dtype) for f in futures], axis=0
[da.from_delayed(f, shape=x.shape, dtype=x.dtype) for f in tasks], axis=0
)
assert_eq(A.compute(), np.concatenate([x, x], axis=0))

Expand Down Expand Up @@ -469,16 +469,16 @@ def test_blockwise_array_creation(c, io, fuse):
],
)
@pytest.mark.parametrize("fuse", [True, False, None])
@pytest.mark.parametrize("from_futures", [True, False])
def test_blockwise_dataframe_io(c, tmpdir, io, fuse, from_futures):
@pytest.mark.parametrize("from_tasks", [True, False])
def test_blockwise_dataframe_io(c, tmpdir, io, fuse, from_tasks):
pd = pytest.importorskip("pandas")
dd = pytest.importorskip("dask.dataframe")
if dd._dask_expr_enabled():
pytest.xfail("doesn't work yet")

df = pd.DataFrame({"x": [1, 2, 3] * 5, "y": range(15)})

if from_futures:
if from_tasks:
parts = [df.iloc[:5], df.iloc[5:10], df.iloc[10:15]]
futs = c.scatter(parts)
ddf0 = dd.from_delayed(futs, meta=parts[0])
Expand Down
2 changes: 1 addition & 1 deletion dask/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def persist(self: CollType, **kwargs: Any) -> CollType:
asynchronous computing, such as is the case of the
dask.distributed scheduler, then persist will return
*immediately* and the return value's task graph will contain
Dask Future objects. However if the task scheduler only
Dask Task objects. However if the task scheduler only
supports blocking computation then the call to persist will
*block* and the return value's task graph will contain
concrete Python results.
Expand Down
8 changes: 4 additions & 4 deletions docs/source/10-minutes-to-dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,10 @@ run into code that is parallelizable, but isn't just a big DataFrame or array.

c = c.compute() # This triggers all of the above computations

.. tab-item:: Futures: Immediate
.. tab-item:: Tasks: Immediate

Unlike the interfaces described so far, Futures are eager. Computation starts as soon
as the function is submitted (see :doc:`futures`).
Unlike the interfaces described so far, Tasks are eager. Computation starts as soon
as the function is submitted (see :doc:`tasks`).

.. code-block:: python

Expand All @@ -496,7 +496,7 @@ run into code that is parallelizable, but isn't just a big DataFrame or array.

.. note::

Futures can only be used with distributed cluster. See the section below for more
Tasks can only be used with distributed cluster. See the section below for more
information.


Expand Down
4 changes: 2 additions & 2 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Dask APIs generally follow from upstream APIs:
- :doc:`DataFrames <dataframe-api>` follows Pandas
- :doc:`Bag <bag-api>` follows map/filter/groupby/reduce common in Spark and Python iterators
- :doc:`Delayed <delayed-api>` wraps general Python code
- :doc:`Futures <futures>` follows `concurrent.futures <https://docs.python.org/3/library/concurrent.futures.html>`_ from the standard library for real-time computation.
- :doc:`Tasks <tasks>` follows `concurrent.futures <https://docs.python.org/3/library/concurrent.futures.html>`_ from the standard library for real-time computation.

.. toctree::
:maxdepth: 1
Expand All @@ -17,7 +17,7 @@ Dask APIs generally follow from upstream APIs:
DataFrame <dataframe-api.rst>
Bag <bag-api.rst>
Delayed <delayed-api.rst>
Futures <futures>
Tasks <tasks>


Additionally, Dask has its own functions to start computations, persist data in
Expand Down
8 changes: 4 additions & 4 deletions docs/source/caching.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ Opportunistic Caching
Dask usually removes intermediate values as quickly as possible in order to
make space for more data to flow through your computation. However, in some
cases, we may want to hold onto intermediate values, because they might be
useful for future computations in an interactive session.
useful for task computations in an interactive session.

We need to balance the following concerns:

1. Intermediate results might be useful in future unknown computations
1. Intermediate results might be useful in task unknown computations
2. Intermediate results also fill up memory, reducing space for the rest of our
current computation

Negotiating between these two concerns helps us to leverage the memory that we
have available to speed up future, unanticipated computations. Which intermediate results
have available to speed up task, unanticipated computations. Which intermediate results
should we keep?

This document explains an experimental, opportunistic caching mechanism that automatically
Expand Down Expand Up @@ -81,7 +81,7 @@ Automatic Opportunistic Caching
-------------------------------

Another approach is to watch *all* intermediate computations, and *guess* which
ones might be valuable to keep for the future. Dask has an *opportunistic
ones might be valuable to keep for the task. Dask has an *opportunistic
caching mechanism* that stores intermediate tasks that show the following
characteristics:

Expand Down
4 changes: 2 additions & 2 deletions docs/source/custom-collections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ created. It too has three stages:
2. **Computation**

Same as in ``compute``, except in the case of the distributed scheduler,
where the values in ``results`` are futures instead of values.
where the values in ``results`` are tasks instead of values.

3. **Postpersist**

Expand All @@ -221,7 +221,7 @@ created. It too has three stages:
- A ``rebuild`` function, which takes in a persisted graph. The keys of
this graph are the same as ``__dask_keys__`` for the corresponding
collection, and the values are computed results (for the single-machine
scheduler) or futures (for the distributed scheduler).
scheduler) or tasks (for the distributed scheduler).
- A tuple of extra arguments to pass to ``rebuild`` after the graph

To build the outputs of ``persist``, the list of collections and results is
Expand Down
2 changes: 1 addition & 1 deletion docs/source/delayed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ Real time
---------

Sometimes you want to create and destroy work during execution, launch tasks
from other tasks, etc. For this, see the :doc:`Futures <futures>` interface.
from other tasks, etc. For this, see the :doc:`Tasks <tasks>` interface.


Best Practices
Expand Down
8 changes: 4 additions & 4 deletions docs/source/deploying-python-advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ cleaning everything up.
async with Scheduler() as s:
async with Worker(s.address) as w1, Worker(s.address) as w2:
async with Client(s.address, asynchronous=True) as client:
future = client.submit(lambda x: x + 1, 10)
result = await future
task = client.submit(lambda x: x + 1, 10)
result = await task
print(result)

asyncio.get_event_loop().run_until_complete(f())
Expand Down Expand Up @@ -154,8 +154,8 @@ computation, and then allow things to clean up after that computation..
async with Scheduler() as s:
async with Worker(s.address) as w1, Worker(s.address) as w2:
async with Client(s.address, asynchronous=True) as client:
future = client.submit(lambda x: x + 1, 10)
result = await future
task = client.submit(lambda x: x + 1, 10)
result = await task
print(result)

asyncio.get_event_loop().run_until_complete(f())
Expand Down
6 changes: 3 additions & 3 deletions docs/source/develop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Dask conversation happens in the following places:

For usage questions and bug reports we prefer the use of Discourse, Stack Overflow
and GitHub issues over Slack chat. Discourse, GitHub and Stack Overflow are more easily
searchable by future users, so conversations had there can be useful to many more people
searchable by task users, so conversations had there can be useful to many more people
than just those directly involved.

.. _`Dask Discourse forum`: https://dask.discourse.group
Expand Down Expand Up @@ -142,7 +142,7 @@ Test
~~~~

Dask employs extensive unit tests to ensure correctness of code both for today
and for the future. Test coverage is expected for all code contributions.
and for the task. Test coverage is expected for all code contributions.

Tests are written in a py.test style with bare functions:

Expand Down Expand Up @@ -375,7 +375,7 @@ Distributed submit PRs. In this case, the gpuCI bot will comment on the PR:
Dask Maintainers can then approve gpuCI builds for these PRs with following choices:

- To only approve the PR contributor for the current PR, leave a comment which states ``ok to test``
- To approve the current PR and all future PRs from the contributor, leave a comment which states ``add to allowlist``
- To approve the current PR and all task PRs from the contributor, leave a comment which states ``add to allowlist``


.. _Sphinx: https://www.sphinx-doc.org/