Skip to content

Commit

Permalink
Apply renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
milesgranger committed Feb 6, 2024
1 parent 3fbbce7 commit d8fbfaa
Show file tree
Hide file tree
Showing 112 changed files with 2,429 additions and 2,441 deletions.
14 changes: 7 additions & 7 deletions distributed/__init__.py
Expand Up @@ -20,18 +20,18 @@
from dask.config import config # type: ignore

from distributed._version import get_versions
from distributed.actor import Actor, ActorFuture, BaseActorFuture
from distributed.actor import Actor, ActorTask, BaseActorTask
from distributed.client import (
Client,
CompatibleExecutor,
Future,
Task,
as_completed,
default_client,
fire_and_forget,
futures_of,
get_task_metadata,
get_task_stream,
performance_report,
tasks_of,
wait,
)
from distributed.core import Status, connect, rpc
Expand Down Expand Up @@ -120,16 +120,16 @@ def _():

__all__ = [
"Actor",
"ActorFuture",
"ActorTask",
"Adaptive",
"BaseActorFuture",
"BaseActorTask",
"CancelledError",
"Client",
"CompatibleExecutor",
"CondaInstall",
"Environ",
"Event",
"Future",
"Task",
"KilledWorker",
"LocalCluster",
"Lock",
Expand Down Expand Up @@ -162,7 +162,7 @@ def _():
"dask",
"default_client",
"fire_and_forget",
"futures_of",
"tasks_of",
"get_client",
"get_task_metadata",
"get_task_stream",
Expand Down
12 changes: 6 additions & 6 deletions distributed/_concurrent_futures_thread.py
Expand Up @@ -51,22 +51,22 @@ def _python_exit():


class _WorkItem:
def __init__(self, future, fn, args, kwargs):
self.future = future
def __init__(self, task, fn, args, kwargs):
self.task = task
self.fn = fn
self.args = args
self.kwargs = kwargs

def run(self):
if not self.future.set_running_or_notify_cancel(): # pragma: no cover
if not self.task.set_running_or_notify_cancel(): # pragma: no cover
return

try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
self.task.set_exception(e)
else:
self.future.set_result(result)
self.task.set_result(result)


def _worker(executor_reference, work_queue):
Expand Down Expand Up @@ -123,7 +123,7 @@ def __init__(self, max_workers=None, thread_name_prefix=""):
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown: # pragma: no cover
raise RuntimeError("cannot schedule new futures after shutdown")
raise RuntimeError("cannot schedule new tasks after shutdown")

f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
Expand Down
54 changes: 27 additions & 27 deletions distributed/actor.py
Expand Up @@ -10,7 +10,7 @@

from tornado.ioloop import IOLoop

from distributed.client import Future
from distributed.client import Task
from distributed.protocol import to_serialize
from distributed.utils import LateLoopEvent, iscoroutinefunction, sync, thread_state
from distributed.utils_comm import WrappedKey
Expand Down Expand Up @@ -41,20 +41,20 @@ class Actor(WrappedKey):
You can create an actor by submitting a class with the keyword
``actor=True``.
>>> future = client.submit(Counter, actor=True)
>>> counter = future.result()
>>> task = client.submit(Counter, actor=True)
>>> counter = task.result()
>>> counter
<Actor: Counter, key=Counter-1234abcd>
Calling methods on this object immediately returns deferred ``BaseActorFuture``
Calling methods on this object immediately returns deferred ``BaseActorTask``
objects. You can call ``.result()`` on these objects to block and get the
result of the function call.
>>> future = counter.increment()
>>> future.result()
>>> task = counter.increment()
>>> task.result()
1
>>> future = counter.increment()
>>> future.result()
>>> task = counter.increment()
>>> task.result()
2
"""

Expand All @@ -63,7 +63,7 @@ def __init__(self, cls, address, key, worker=None):
self._cls = cls
self._address = address
self._key = key
self._future = None
self._task = None
self._worker = worker
self._client = None
self._try_bind_worker_client()
Expand All @@ -77,7 +77,7 @@ def _try_bind_worker_client(self):
if not self._client:
try:
self._client = get_client()
self._future = Future(self._key, inform=False)
self._task = Task(self._key, inform=False)
# ^ When running on a worker, only hold a weak reference to the key, otherwise the key could become unreleasable.
except ValueError:
self._client = None
Expand Down Expand Up @@ -139,9 +139,9 @@ def __dir__(self):
return sorted(o)

def __getattr__(self, key):
if self._future and self._future.status not in ("finished", "pending"):
if self._task and self._task.status not in ("finished", "pending"):
raise ValueError(
"Worker holding Actor was lost. Status: " + self._future.status
"Worker holding Actor was lost. Status: " + self._task.status
)
self._try_bind_worker_client()
if (
Expand All @@ -157,7 +157,7 @@ def __getattr__(self, key):
return attr

elif callable(attr):
return lambda *args, **kwargs: EagerActorFuture(attr(*args, **kwargs))
return lambda *args, **kwargs: EagerActorTask(attr(*args, **kwargs))

Check warning on line 160 in distributed/actor.py

View check run for this annotation

Codecov / codecov/patch

distributed/actor.py#L160

Added line #L160 was not covered by tests
else:
return attr

Expand All @@ -176,8 +176,8 @@ async def run_actor_function_on_worker():
kwargs={k: to_serialize(v) for k, v in kwargs.items()},
)
except OSError:
if self._future and not self._future.done():
await self._future
if self._task and not self._task.done():
await self._task

Check warning on line 180 in distributed/actor.py

View check run for this annotation

Codecov / codecov/patch

distributed/actor.py#L180

Added line #L180 was not covered by tests
return await run_actor_function_on_worker()
else:
exc = OSError("Unable to contact Actor's worker")
Expand All @@ -186,13 +186,13 @@ async def run_actor_function_on_worker():
return _OK(result["result"])
return _Error(result["exception"])

actor_future = ActorFuture(io_loop=self._io_loop)
actor_task = ActorTask(io_loop=self._io_loop)

async def wait_then_set_result():
actor_future._set_result(await run_actor_function_on_worker())
actor_task._set_result(await run_actor_function_on_worker())

self._io_loop.add_callback(wait_then_set_result)
return actor_future
return actor_task

return func

Expand All @@ -211,7 +211,7 @@ async def get_actor_attribute_from_worker():

@property
def client(self):
return self._future.client
return self._task.client

Check warning on line 214 in distributed/actor.py

View check run for this annotation

Codecov / codecov/patch

distributed/actor.py#L214

Added line #L214 was not covered by tests


class ProxyRPC:
Expand All @@ -232,10 +232,10 @@ async def func(**msg):
return func


class BaseActorFuture(abc.ABC, Awaitable[_T]):
"""Future to an actor's method call
class BaseActorTask(abc.ABC, Awaitable[_T]):
"""Task to an actor's method call
Whenever you call a method on an Actor you get a BaseActorFuture immediately
Whenever you call a method on an Actor you get a BaseActorTask immediately
while the computation happens in the background. You can call ``.result``
to block and collect the full result
Expand All @@ -252,13 +252,13 @@ def result(self, timeout: str | timedelta | float | None = None) -> _T:
def done(self) -> bool:
...

def __repr__(self) -> Literal["<ActorFuture>"]:
return "<ActorFuture>"
def __repr__(self) -> Literal["<ActorTask>"]:
return "<ActorTask>"


@dataclass(frozen=True, eq=False)
class EagerActorFuture(BaseActorFuture[_T]):
"""Future to an actor's method call when an actor calls another actor on the same worker"""
class EagerActorTask(BaseActorTask[_T]):
"""Task to an actor's method call when an actor calls another actor on the same worker"""

_result: _T

Expand Down Expand Up @@ -289,7 +289,7 @@ def unwrap(self) -> NoReturn:
raise self._e


class ActorFuture(BaseActorFuture[_T]):
class ActorTask(BaseActorTask[_T]):
def __init__(self, io_loop: IOLoop):
self._io_loop = io_loop
self._event = LateLoopEvent()
Expand Down
58 changes: 29 additions & 29 deletions distributed/cfexecutor.py
Expand Up @@ -13,13 +13,13 @@


@gen.coroutine
def _cascade_future(future, cf_future):
def _cascade_task(task, cf_future):
"""
Coroutine that waits on Dask future, then transmits its outcome to
Coroutine that waits on Dask task, then transmits its outcome to
cf_future.
"""
result = yield future._result(raiseit=False)
status = future.status
result = yield task._result(raiseit=False)
status = task.status
if status == "finished":
cf_future.set_result(result)
elif status == "cancelled":
Expand All @@ -35,8 +35,8 @@ def _cascade_future(future, cf_future):


@gen.coroutine
def _wait_on_futures(futures):
for fut in futures:
def _wait_on_tasks(tasks):
for fut in tasks:
try:
yield fut
except Exception:
Expand All @@ -60,41 +60,41 @@ def __init__(self, client, **kwargs):
% sorted(sk - self._allowed_kwargs)
)
self._client = client
self._futures = weakref.WeakSet()
self._tasks = weakref.WeakSet()
self._shutdown = False
self._kwargs = kwargs

def _wrap_future(self, future):
def _wrap_task(self, task):
"""
Wrap a distributed Future in a concurrent.futures Future.
Wrap a distributed Task in a concurrent.futures Task.
"""
cf_future = cf.Future()

# Support cancelling task through .cancel() on c.f.Future
# Support cancelling task through .cancel() on c.f.Task
def cf_callback(cf_future):
if cf_future.cancelled() and future.status != "cancelled":
future.cancel()
if cf_future.cancelled() and task.status != "cancelled":
task.cancel()

cf_future.add_done_callback(cf_callback)

self._client.loop.add_callback(_cascade_future, future, cf_future)
self._client.loop.add_callback(_cascade_task, task, cf_future)
return cf_future

def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as ``fn(*args, **kwargs)``
and returns a Future instance representing the execution of the callable.
and returns a Task instance representing the execution of the callable.
Returns
-------
A Future representing the given call.
A Task representing the given call.
"""
if self._shutdown:
raise RuntimeError("cannot schedule new futures after shutdown")
future = self._client.submit(fn, *args, **merge(self._kwargs, kwargs))
self._futures.add(future)
return self._wrap_future(future)
raise RuntimeError("cannot schedule new tasks after shutdown")
task = self._client.submit(fn, *args, **merge(self._kwargs, kwargs))
self._tasks.add(task)
return self._wrap_task(task)

def map(self, fn, *iterables, **kwargs):
"""Returns an iterator equivalent to ``map(fn, *iterables)``.
Expand Down Expand Up @@ -133,25 +133,25 @@ def map(self, fn, *iterables, **kwargs):
fs = self._client.map(fn, *iterables, **self._kwargs)

# Below iterator relies on fs being an iterator itself, and not just an iterable
# (such as a list), in order to cancel remaining futures
# (such as a list), in order to cancel remaining tasks
fs = iter(fs)

# Yield must be hidden in closure so that the tasks are submitted
# before the first iterator value is required.
def result_iterator():
try:
for future in fs:
self._futures.add(future)
for task in fs:
self._tasks.add(task)
if timeout is not None:
try:
yield future.result(end_time - time())
yield task.result(end_time - time())
except TimeoutError:
raise cf.TimeoutError
else:
yield future.result()
yield task.result()
finally:
remaining = list(fs)
self._futures.update(remaining)
self._tasks.update(remaining)
self._client.cancel(remaining)

return result_iterator()
Expand All @@ -165,13 +165,13 @@ def shutdown(self, wait=True):
Parameters
----------
wait : If True then shutdown will not return until all running
futures have finished executing. If False then all running
futures are cancelled immediately.
tasks have finished executing. If False then all running
tasks are cancelled immediately.
"""
if not self._shutdown:
self._shutdown = True
fs = list(self._futures)
fs = list(self._tasks)
if wait:
sync(self._client.loop, _wait_on_futures, fs)
sync(self._client.loop, _wait_on_tasks, fs)
else:
self._client.cancel(fs)

0 comments on commit d8fbfaa

Please sign in to comment.