Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename futures to tasks #8497

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions continuous_integration/environment-3.10.yaml
Expand Up @@ -45,8 +45,8 @@ dependencies:
- zict # overridden by git tip below
- zstandard
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/dask-contrib/dask-expr
- git+https://github.com/milesgranger/dask@milesgranger/rename-future-to-task
- git+https://github.com/milesgranger/dask-expr@milesgranger/rename-future-to-task
- git+https://github.com/dask/s3fs
- git+https://github.com/dask/zict
- git+https://github.com/fsspec/filesystem_spec
Expand Down
4 changes: 2 additions & 2 deletions continuous_integration/environment-3.11.yaml
Expand Up @@ -45,8 +45,8 @@ dependencies:
- zict # overridden by git tip below
- zstandard
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/dask-contrib/dask-expr
- git+https://github.com/milesgranger/dask@milesgranger/rename-future-to-task
- git+https://github.com/milesgranger/dask-expr@milesgranger/rename-future-to-task
- git+https://github.com/dask/s3fs
- git+https://github.com/dask/zict
- git+https://github.com/fsspec/filesystem_spec
Expand Down
4 changes: 2 additions & 2 deletions continuous_integration/environment-3.12.yaml
Expand Up @@ -45,8 +45,8 @@ dependencies:
- zict # overridden by git tip below
- zstandard
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/dask-contrib/dask-expr
- git+https://github.com/milesgranger/dask@milesgranger/rename-future-to-task
- git+https://github.com/milesgranger/dask-expr@milesgranger/rename-future-to-task
- git+https://github.com/dask/s3fs
- git+https://github.com/dask/zict
- git+https://github.com/fsspec/filesystem_spec
Expand Down
4 changes: 2 additions & 2 deletions continuous_integration/environment-3.9.yaml
Expand Up @@ -52,7 +52,7 @@ dependencies:
- zict
- zstandard
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/dask-contrib/dask-expr
- git+https://github.com/milesgranger/dask@milesgranger/rename-future-to-task
- git+https://github.com/milesgranger/dask-expr@milesgranger/rename-future-to-task
- git+https://github.com/dask/crick # Only tested here
- keras
2 changes: 1 addition & 1 deletion continuous_integration/environment-mindeps.yaml
Expand Up @@ -22,7 +22,7 @@ dependencies:
# Distributed depends on the latest version of Dask
- pip
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/milesgranger/dask@milesgranger/rename-future-to-task
# test dependencies
- pytest
- pytest-cov
Expand Down
14 changes: 13 additions & 1 deletion distributed/__init__.py
Expand Up @@ -20,18 +20,26 @@
from dask.config import config # type: ignore

from distributed._version import get_versions
from distributed.actor import Actor, ActorFuture, BaseActorFuture
from distributed.actor import (
Actor,
ActorFuture,
ActorTask,
BaseActorFuture,
BaseActorTask,
)
from distributed.client import (
Client,
CompatibleExecutor,
Future,
Task,
as_completed,
default_client,
fire_and_forget,
futures_of,
get_task_metadata,
get_task_stream,
performance_report,
tasks_of,
wait,
)
from distributed.core import Status, connect, rpc
Expand Down Expand Up @@ -120,8 +128,10 @@ def _():

__all__ = [
"Actor",
"ActorTask",
"ActorFuture",
"Adaptive",
"BaseActorTask",
"BaseActorFuture",
"CancelledError",
"Client",
Expand All @@ -130,6 +140,7 @@ def _():
"Environ",
"Event",
"Future",
"Task",
"KilledWorker",
"LocalCluster",
"Lock",
Expand Down Expand Up @@ -163,6 +174,7 @@ def _():
"default_client",
"fire_and_forget",
"futures_of",
"tasks_of",
"get_client",
"get_task_metadata",
"get_task_stream",
milesgranger marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
12 changes: 6 additions & 6 deletions distributed/_concurrent_futures_thread.py
Expand Up @@ -51,22 +51,22 @@ def _python_exit():


class _WorkItem:
def __init__(self, future, fn, args, kwargs):
self.future = future
def __init__(self, task, fn, args, kwargs):
self.task = task
self.fn = fn
self.args = args
self.kwargs = kwargs

def run(self):
if not self.future.set_running_or_notify_cancel(): # pragma: no cover
if not self.task.set_running_or_notify_cancel(): # pragma: no cover
return

try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
self.task.set_exception(e)
else:
self.future.set_result(result)
self.task.set_result(result)


def _worker(executor_reference, work_queue):
Expand Down Expand Up @@ -123,7 +123,7 @@ def __init__(self, max_workers=None, thread_name_prefix=""):
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown: # pragma: no cover
raise RuntimeError("cannot schedule new futures after shutdown")
raise RuntimeError("cannot schedule new tasks after shutdown")

f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
Expand Down
62 changes: 34 additions & 28 deletions distributed/actor.py
Expand Up @@ -10,7 +10,7 @@

from tornado.ioloop import IOLoop

from distributed.client import Future
from distributed.client import Task
from distributed.protocol import to_serialize
from distributed.utils import LateLoopEvent, iscoroutinefunction, sync, thread_state
from distributed.utils_comm import WrappedKey
Expand All @@ -24,7 +24,7 @@ class Actor(WrappedKey):

An actor allows remote control of a stateful object living on a remote
worker. Method calls on this object trigger operations on the remote
object and return BaseActorFutures on which we can block to get results.
object and return BaseActorTasks on which we can block to get results.

Examples
--------
Expand All @@ -41,20 +41,20 @@ class Actor(WrappedKey):
You can create an actor by submitting a class with the keyword
``actor=True``.

>>> future = client.submit(Counter, actor=True)
>>> counter = future.result()
>>> task = client.submit(Counter, actor=True)
>>> counter = task.result()
>>> counter
<Actor: Counter, key=Counter-1234abcd>

Calling methods on this object immediately returns deferred ``BaseActorFuture``
Calling methods on this object immediately returns deferred ``BaseActorTask``
objects. You can call ``.result()`` on these objects to block and get the
result of the function call.

>>> future = counter.increment()
>>> future.result()
>>> task = counter.increment()
>>> task.result()
1
>>> future = counter.increment()
>>> future.result()
>>> task = counter.increment()
>>> task.result()
2
"""

Expand All @@ -63,7 +63,7 @@ def __init__(self, cls, address, key, worker=None):
self._cls = cls
self._address = address
self._key = key
self._future = None
self._task = None
self._worker = worker
self._client = None
self._try_bind_worker_client()
Expand All @@ -77,7 +77,7 @@ def _try_bind_worker_client(self):
if not self._client:
try:
self._client = get_client()
self._future = Future(self._key, inform=False)
self._task = Task(self._key, inform=False)
# ^ When running on a worker, only hold a weak reference to the key, otherwise the key could become unreleasable.
except ValueError:
self._client = None
Expand Down Expand Up @@ -139,9 +139,9 @@ def __dir__(self):
return sorted(o)

def __getattr__(self, key):
if self._future and self._future.status not in ("finished", "pending"):
if self._task and self._task.status not in ("finished", "pending"):
raise ValueError(
"Worker holding Actor was lost. Status: " + self._future.status
"Worker holding Actor was lost. Status: " + self._task.status
)
self._try_bind_worker_client()
if (
Expand All @@ -157,7 +157,7 @@ def __getattr__(self, key):
return attr

elif callable(attr):
return lambda *args, **kwargs: EagerActorFuture(attr(*args, **kwargs))
return lambda *args, **kwargs: EagerActorTask(attr(*args, **kwargs))
else:
return attr

Expand All @@ -176,8 +176,8 @@ async def run_actor_function_on_worker():
kwargs={k: to_serialize(v) for k, v in kwargs.items()},
)
except OSError:
if self._future and not self._future.done():
await self._future
if self._task and not self._task.done():
await self._task
return await run_actor_function_on_worker()
else:
exc = OSError("Unable to contact Actor's worker")
Expand All @@ -186,13 +186,13 @@ async def run_actor_function_on_worker():
return _OK(result["result"])
return _Error(result["exception"])

actor_future = ActorFuture(io_loop=self._io_loop)
actor_task = ActorTask(io_loop=self._io_loop)

async def wait_then_set_result():
actor_future._set_result(await run_actor_function_on_worker())
actor_task._set_result(await run_actor_function_on_worker())

self._io_loop.add_callback(wait_then_set_result)
return actor_future
return actor_task

return func

Expand All @@ -211,7 +211,7 @@ async def get_actor_attribute_from_worker():

@property
def client(self):
return self._future.client
return self._task.client


class ProxyRPC:
Expand All @@ -232,10 +232,10 @@ async def func(**msg):
return func


class BaseActorFuture(abc.ABC, Awaitable[_T]):
"""Future to an actor's method call
class BaseActorTask(abc.ABC, Awaitable[_T]):
"""Task to an actor's method call

Whenever you call a method on an Actor you get a BaseActorFuture immediately
Whenever you call a method on an Actor you get a BaseActorTask immediately
while the computation happens in the background. You can call ``.result``
to block and collect the full result

Expand All @@ -252,13 +252,16 @@ def result(self, timeout: str | timedelta | float | None = None) -> _T:
def done(self) -> bool:
...

def __repr__(self) -> Literal["<ActorFuture>"]:
return "<ActorFuture>"
def __repr__(self) -> Literal["<ActorTask>"]:
return "<ActorTask>"


BaseActorFuture = BaseActorTask


@dataclass(frozen=True, eq=False)
class EagerActorFuture(BaseActorFuture[_T]):
"""Future to an actor's method call when an actor calls another actor on the same worker"""
class EagerActorTask(BaseActorTask[_T]):
"""Task to an actor's method call when an actor calls another actor on the same worker"""

_result: _T

Expand Down Expand Up @@ -289,7 +292,7 @@ def unwrap(self) -> NoReturn:
raise self._e


class ActorFuture(BaseActorFuture[_T]):
class ActorTask(BaseActorTask[_T]):
def __init__(self, io_loop: IOLoop):
self._io_loop = io_loop
self._event = LateLoopEvent()
Expand All @@ -313,3 +316,6 @@ def _set_result(self, out: _Error | _OK[_T]) -> None:

def result(self, timeout: str | timedelta | float | None = None) -> _T:
return sync(self._io_loop, self._result, callback_timeout=timeout)


ActorFuture = ActorTask