Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make decide_worker and rootish logic private #8457

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 15 additions & 15 deletions distributed/scheduler.py
Expand Up @@ -1059,7 +1059,7 @@
types: set[str]

#: The worker most recently assigned a task from this group, or None when the group
#: is not identified to be root-like by `SchedulerState.decide_worker`.
#: is not identified to be root-like by `SchedulerState._decide_worker`.
last_worker: WorkerState | None

#: If `last_worker` is not None, the number of times that worker should be assigned
Expand Down Expand Up @@ -2117,14 +2117,14 @@
assert not ts.actor, f"Actors can't be in `no-worker`: {ts}"
assert ts in self.unrunnable

if ws := self.decide_worker_non_rootish(ts):
if ws := self._decide_worker_non_rootish(ts):
self.unrunnable.discard(ts)
return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
# If no worker, task just stays in `no-worker`

return {}, {}, {}

def decide_worker_rootish_queuing_disabled(
def _decide_worker_rootish_queuing_disabled(
self, ts: TaskState
) -> WorkerState | None:
"""Pick a worker for a runnable root-ish task, without queuing.
Expand All @@ -2148,7 +2148,7 @@
``no-worker``.
"""
if self.validate:
# See root-ish-ness note below in `decide_worker_rootish_queuing_enabled`
# See root-ish-ness note below in `_decide_worker_rootish_queuing_enabled`
assert math.isinf(self.WORKER_SATURATION)

pool = self.idle.values() if self.idle else self.running
Expand Down Expand Up @@ -2184,7 +2184,7 @@

return ws

def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:
def _decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:
"""Pick a worker for a runnable root-ish task, if not all are busy.

Picks the least-busy worker out of the ``idle`` workers (idle workers have fewer
Expand Down Expand Up @@ -2236,7 +2236,7 @@

return ws

def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None:
def _decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None:
"""Pick a worker for a runnable non-root task, considering dependencies and
restrictions.

Expand All @@ -2261,7 +2261,7 @@
valid_workers = self.running

if ts.dependencies or valid_workers is not None:
ws = decide_worker(
ws = _decide_worker(
ts,
self.running,
valid_workers,
Expand Down Expand Up @@ -2310,19 +2310,19 @@
"""
ts = self.tasks[key]

if self.is_rootish(ts):
if self._is_rootish(ts):
# NOTE: having two root-ish methods is temporary. When the feature flag is
# removed, there should only be one, which combines co-assignment and
# queuing. Eventually, special-casing root tasks might be removed entirely,
# with better heuristics.
if math.isinf(self.WORKER_SATURATION):
if not (ws := self.decide_worker_rootish_queuing_disabled(ts)):
if not (ws := self._decide_worker_rootish_queuing_disabled(ts)):
return {ts.key: "no-worker"}, {}, {}
else:
if not (ws := self.decide_worker_rootish_queuing_enabled()):
if not (ws := self._decide_worker_rootish_queuing_enabled()):
return {ts.key: "queued"}, {}, {}
else:
if not (ws := self.decide_worker_non_rootish(ts)):
if not (ws := self._decide_worker_non_rootish(ts)):
return {ts.key: "no-worker"}, {}, {}

return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
Expand Down Expand Up @@ -2789,7 +2789,7 @@
assert not ts.actor, f"Actors can't be queued: {ts}"
assert ts in self.queued

if ws := self.decide_worker_rootish_queuing_enabled():
if ws := self._decide_worker_rootish_queuing_enabled():
self.queued.discard(ts)
return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
# If no worker, task just stays `queued`
Expand Down Expand Up @@ -2914,7 +2914,7 @@
# Assigning Tasks to Workers #
##############################

def is_rootish(self, ts: TaskState) -> bool:
def _is_rootish(self, ts: TaskState) -> bool:
"""
Whether ``ts`` is a root or root-like task.

Expand Down Expand Up @@ -8328,7 +8328,7 @@
return {}


def decide_worker(
def _decide_worker(
ts: TaskState,
all_workers: set[WorkerState],
valid_workers: set[WorkerState] | None,
Expand Down Expand Up @@ -8364,7 +8364,7 @@
candidates = valid_workers
if not candidates:
if ts.loose_restrictions:
return decide_worker(ts, all_workers, None, objective)
return _decide_worker(ts, all_workers, None, objective)

Check warning on line 8367 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8367

Added line #L8367 was not covered by tests

if not candidates:
return None
Expand Down
16 changes: 8 additions & 8 deletions distributed/tests/test_scheduler.py
Expand Up @@ -286,19 +286,19 @@ async def test_override_is_rootish(c, s):
await async_poll_for(lambda: "x" in s.tasks, timeout=5)
ts_x = s.tasks["x"]
assert ts_x._rootish is None
assert s.is_rootish(ts_x)
assert s._is_rootish(ts_x)

ts_x._rootish = False
assert not s.is_rootish(ts_x)
assert not s._is_rootish(ts_x)

y = c.submit(lambda y: y + 1, 1, key="y", workers=["not-existing"])
await async_poll_for(lambda: "y" in s.tasks, timeout=5)
ts_y = s.tasks["y"]
assert ts_y._rootish is None
assert not s.is_rootish(ts_y)
assert not s._is_rootish(ts_y)

ts_y._rootish = True
assert s.is_rootish(ts_y)
assert s._is_rootish(ts_y)


@pytest.mark.skipif(
Expand Down Expand Up @@ -344,14 +344,14 @@ async def test_decide_worker_rootish_while_last_worker_is_retiring(c, s, a):
# guarantee that all the y tasks are already on the scheduler. Only
# after at least 5 have been registered, will the task be flagged as
# rootish
while "y-2" not in s.tasks or not s.is_rootish(s.tasks["y-2"]):
while "y-2" not in s.tasks or not s._is_rootish(s.tasks["y-2"]):
await asyncio.sleep(0.01)

# - y-2 has no restrictions
# - TaskGroup(y) has more than 4 tasks (total_nthreads * 2)
# - TaskGroup(y) has less than 5 dependency groups
# - TaskGroup(y) has less than 5 dependency tasks
assert s.is_rootish(s.tasks["y-2"])
assert s._is_rootish(s.tasks["y-2"])

await evx[0].set()
await wait_for_state("y-0", "processing", s)
Expand Down Expand Up @@ -4427,9 +4427,9 @@ def assert_rootish():
# Just to verify our assumptions in case the definition changes. This is
# currently a bit brittle
if rootish:
assert all(s.is_rootish(s.tasks[k]) for k in keys)
assert all(s._is_rootish(s.tasks[k]) for k in keys)
else:
assert not any(s.is_rootish(s.tasks[k]) for k in keys)
assert not any(s._is_rootish(s.tasks[k]) for k in keys)

f1 = submit_tasks()
# Make sure that the worker is properly saturated
Expand Down
6 changes: 3 additions & 3 deletions docs/source/scheduling-policies.rst
Expand Up @@ -130,11 +130,11 @@ functions in ``scheduler.py``.

.. autosummary:: decide_worker

.. autosummary:: Scheduler.decide_worker_non_rootish
.. autosummary:: Scheduler._decide_worker_non_rootish

.. autosummary:: Scheduler.decide_worker_rootish_queuing_disabled
.. autosummary:: Scheduler._decide_worker_rootish_queuing_disabled

.. autosummary:: Scheduler.decide_worker_rootish_queuing_enabled
.. autosummary:: Scheduler._decide_worker_rootish_queuing_enabled

.. autosummary:: Scheduler.worker_objective
Comment on lines -133 to 139
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should nuke this entire document. definitely the reference to the methods

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't care either way. If the interested user wants to read through those implementation details, sure, have at it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...though I do see the challenge of keeping this up-to-date (which I'm not sure it currently is, and frankly, I'm also not going to read it in full and compare it to the status quo today)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this a follow-up issue in case someone else has strong opinions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Expand Down