Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make decide_worker and rootish logic private #8457

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 16 additions & 16 deletions distributed/scheduler.py
Expand Up @@ -1061,7 +1061,7 @@
types: set[str]

#: The worker most recently assigned a task from this group, or None when the group
#: is not identified to be root-like by `SchedulerState.decide_worker`.
#: is not identified to be root-like by `SchedulerState._decide_worker`.
last_worker: WorkerState | None

#: If `last_worker` is not None, the number of times that worker should be assigned
Expand Down Expand Up @@ -2119,14 +2119,14 @@
assert not ts.actor, f"Actors can't be in `no-worker`: {ts}"
assert ts in self.unrunnable

if ws := self.decide_worker_non_rootish(ts):
if ws := self._decide_worker_non_rootish(ts):

Check warning on line 2122 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2122

Added line #L2122 was not covered by tests
self.unrunnable.discard(ts)
return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
# If no worker, task just stays in `no-worker`

return {}, {}, {}

def decide_worker_rootish_queuing_disabled(
def _decide_worker_rootish_queuing_disabled(

Check warning on line 2129 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2129

Added line #L2129 was not covered by tests
self, ts: TaskState
) -> WorkerState | None:
"""Pick a worker for a runnable root-ish task, without queuing.
Expand All @@ -2150,7 +2150,7 @@
``no-worker``.
"""
if self.validate:
# See root-ish-ness note below in `decide_worker_rootish_queuing_enabled`
# See root-ish-ness note below in `_decide_worker_rootish_queuing_enabled`
assert math.isinf(self.WORKER_SATURATION)

pool = self.idle.values() if self.idle else self.running
Expand Down Expand Up @@ -2186,7 +2186,7 @@

return ws

def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:
def _decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:

Check warning on line 2189 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2189

Added line #L2189 was not covered by tests
"""Pick a worker for a runnable root-ish task, if not all are busy.

Picks the least-busy worker out of the ``idle`` workers (idle workers have fewer
Expand All @@ -2211,7 +2211,7 @@

"""
if self.validate:
# We don't `assert self.is_rootish(ts)` here, because that check is
# We don't `assert self._is_rootish(ts)` here, because that check is
# dependent on cluster size. It's possible a task looked root-ish when it
# was queued, but the cluster has since scaled up and it no longer does when
# coming out of the queue. If `is_rootish` changes to a static definition,
Expand All @@ -2238,7 +2238,7 @@

return ws

def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None:
def _decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None:

Check warning on line 2241 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2241

Added line #L2241 was not covered by tests
"""Pick a worker for a runnable non-root task, considering dependencies and
restrictions.

Expand All @@ -2263,7 +2263,7 @@
valid_workers = self.running

if ts.dependencies or valid_workers is not None:
ws = decide_worker(
ws = _decide_worker(

Check warning on line 2266 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2266

Added line #L2266 was not covered by tests
ts,
self.running,
valid_workers,
Expand Down Expand Up @@ -2312,19 +2312,19 @@
"""
ts = self.tasks[key]

if self.is_rootish(ts):
if self._is_rootish(ts):

Check warning on line 2315 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2315

Added line #L2315 was not covered by tests
# NOTE: having two root-ish methods is temporary. When the feature flag is
# removed, there should only be one, which combines co-assignment and
# queuing. Eventually, special-casing root tasks might be removed entirely,
# with better heuristics.
if math.isinf(self.WORKER_SATURATION):
if not (ws := self.decide_worker_rootish_queuing_disabled(ts)):
if not (ws := self._decide_worker_rootish_queuing_disabled(ts)):

Check warning on line 2321 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2321

Added line #L2321 was not covered by tests
return {ts.key: "no-worker"}, {}, {}
else:
if not (ws := self.decide_worker_rootish_queuing_enabled()):
if not (ws := self._decide_worker_rootish_queuing_enabled()):

Check warning on line 2324 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2324

Added line #L2324 was not covered by tests
return {ts.key: "queued"}, {}, {}
else:
if not (ws := self.decide_worker_non_rootish(ts)):
if not (ws := self._decide_worker_non_rootish(ts)):

Check warning on line 2327 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2327

Added line #L2327 was not covered by tests
return {ts.key: "no-worker"}, {}, {}

return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
Expand Down Expand Up @@ -2791,7 +2791,7 @@
assert not ts.actor, f"Actors can't be queued: {ts}"
assert ts in self.queued

if ws := self.decide_worker_rootish_queuing_enabled():
if ws := self._decide_worker_rootish_queuing_enabled():

Check warning on line 2794 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2794

Added line #L2794 was not covered by tests
self.queued.discard(ts)
return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
# If no worker, task just stays `queued`
Expand Down Expand Up @@ -2916,7 +2916,7 @@
# Assigning Tasks to Workers #
##############################

def is_rootish(self, ts: TaskState) -> bool:
def _is_rootish(self, ts: TaskState) -> bool:

Check warning on line 2919 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2919

Added line #L2919 was not covered by tests
"""
Whether ``ts`` is a root or root-like task.

Expand Down Expand Up @@ -8446,7 +8446,7 @@
return {}


def decide_worker(
def _decide_worker(

Check warning on line 8449 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8449

Added line #L8449 was not covered by tests
ts: TaskState,
all_workers: set[WorkerState],
valid_workers: set[WorkerState] | None,
Expand Down Expand Up @@ -8482,7 +8482,7 @@
candidates = valid_workers
if not candidates:
if ts.loose_restrictions:
return decide_worker(ts, all_workers, None, objective)
return _decide_worker(ts, all_workers, None, objective)

Check warning on line 8485 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8485

Added line #L8485 was not covered by tests

if not candidates:
return None
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/tests/test_shuffle.py
Expand Up @@ -2566,7 +2566,7 @@ async def test_unpack_is_non_rootish(c, s, a, b):

unpack_tss = [ts for key, ts in s.tasks.items() if key_split(key) == "shuffle_p2p"]
assert len(unpack_tss) == 20
assert not any(s.is_rootish(ts) for ts in unpack_tss)
assert not any(s._is_rootish(ts) for ts in unpack_tss)
del unpack_tss
scheduler_plugin.block_barrier.set()
result = await result
Expand Down
16 changes: 8 additions & 8 deletions distributed/tests/test_scheduler.py
Expand Up @@ -286,19 +286,19 @@ async def test_override_is_rootish(c, s):
await async_poll_for(lambda: "x" in s.tasks, timeout=5)
ts_x = s.tasks["x"]
assert ts_x._rootish is None
assert s.is_rootish(ts_x)
assert s._is_rootish(ts_x)

ts_x._rootish = False
assert not s.is_rootish(ts_x)
assert not s._is_rootish(ts_x)

y = c.submit(lambda y: y + 1, 1, key="y", workers=["not-existing"])
await async_poll_for(lambda: "y" in s.tasks, timeout=5)
ts_y = s.tasks["y"]
assert ts_y._rootish is None
assert not s.is_rootish(ts_y)
assert not s._is_rootish(ts_y)

ts_y._rootish = True
assert s.is_rootish(ts_y)
assert s._is_rootish(ts_y)


@pytest.mark.skipif(
Expand Down Expand Up @@ -344,14 +344,14 @@ async def test_decide_worker_rootish_while_last_worker_is_retiring(c, s, a):
# guarantee that all the y tasks are already on the scheduler. Only
# after at least 5 have been registered, will the task be flagged as
# rootish
while "y-2" not in s.tasks or not s.is_rootish(s.tasks["y-2"]):
while "y-2" not in s.tasks or not s._is_rootish(s.tasks["y-2"]):
await asyncio.sleep(0.01)

# - y-2 has no restrictions
# - TaskGroup(y) has more than 4 tasks (total_nthreads * 2)
# - TaskGroup(y) has less than 5 dependency groups
# - TaskGroup(y) has less than 5 dependency tasks
assert s.is_rootish(s.tasks["y-2"])
assert s._is_rootish(s.tasks["y-2"])

await evx[0].set()
await wait_for_state("y-0", "processing", s)
Expand Down Expand Up @@ -4528,9 +4528,9 @@ def assert_rootish():
# Just to verify our assumptions in case the definition changes. This is
# currently a bit brittle
if rootish:
assert all(s.is_rootish(s.tasks[k]) for k in keys)
assert all(s._is_rootish(s.tasks[k]) for k in keys)
else:
assert not any(s.is_rootish(s.tasks[k]) for k in keys)
assert not any(s._is_rootish(s.tasks[k]) for k in keys)

f1 = submit_tasks()
# Make sure that the worker is properly saturated
Expand Down
10 changes: 5 additions & 5 deletions docs/source/scheduling-policies.rst
Expand Up @@ -123,18 +123,18 @@ start running the soonest, using :meth:`Scheduler.worker_objective`. For each wo
memory, not the :ref:`process <memtypes>` memory.

This process is easy to change (and indeed this document may be outdated). We
encourage readers to inspect the ``decide_worker`` and ``worker_objective``
encourage readers to inspect the ``_decide_worker`` and ``worker_objective``
functions in ``scheduler.py``.

.. currentmodule:: distributed.scheduler

.. autosummary:: decide_worker
.. autosummary:: _decide_worker

.. autosummary:: Scheduler.decide_worker_non_rootish
.. autosummary:: Scheduler._decide_worker_non_rootish

.. autosummary:: Scheduler.decide_worker_rootish_queuing_disabled
.. autosummary:: Scheduler._decide_worker_rootish_queuing_disabled

.. autosummary:: Scheduler.decide_worker_rootish_queuing_enabled
.. autosummary:: Scheduler._decide_worker_rootish_queuing_enabled

.. autosummary:: Scheduler.worker_objective
Comment on lines -133 to 139
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should nuke this entire document. definitely the reference to the methods

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't care either way. If the interested user wants to read through those implementation details, sure, have at it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...though I do see the challenge of keeping this up-to-date (which I'm not sure it currently is, and frankly, I'm also not going to read it in full and compare it to the status quo today)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this a follow-up issue in case someone else has strong opinions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Expand Down
2 changes: 1 addition & 1 deletion docs/source/scheduling-state.rst
Expand Up @@ -381,7 +381,7 @@ API
.. autoclass:: ClientState
:members:

.. autofunction:: decide_worker
.. autofunction:: _decide_worker

.. autoclass:: MemoryState
:members: