Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Scheduling Protocol #1061

Merged
merged 1 commit into from Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 15 additions & 10 deletions src/xdist/dsession.py
Expand Up @@ -15,6 +15,7 @@
from xdist.scheduler import LoadGroupScheduling
from xdist.scheduler import LoadScheduling
from xdist.scheduler import LoadScopeScheduling
from xdist.scheduler import Scheduling
from xdist.scheduler import WorkStealingScheduling
from xdist.workermanage import NodeManager

Expand Down Expand Up @@ -97,17 +98,21 @@ def pytest_collection(self):
return True

@pytest.hookimpl(trylast=True)
def pytest_xdist_make_scheduler(self, config, log):
def pytest_xdist_make_scheduler(self, config, log) -> Scheduling | None:
dist = config.getvalue("dist")
schedulers = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not using the dict because I don't want to include the __init__ in the protocol (if that's even possible...)

"each": EachScheduling,
"load": LoadScheduling,
"loadscope": LoadScopeScheduling,
"loadfile": LoadFileScheduling,
"loadgroup": LoadGroupScheduling,
"worksteal": WorkStealingScheduling,
}
return schedulers[dist](config, log)
if dist == "each":
return EachScheduling(config, log)
if dist == "load":
return LoadScheduling(config, log)
if dist == "loadscope":
return LoadScopeScheduling(config, log)
if dist == "loadfile":
return LoadFileScheduling(config, log)
if dist == "loadgroup":
return LoadGroupScheduling(config, log)
if dist == "worksteal":
return WorkStealingScheduling(config, log)
return None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a semantic change here -- if dist is not one of the builtin schedulers, this hook now returns None instead of raising a KeyError before. This seems better to me, allowing other hookimpls (e.g. wrappers) to handle this more gracefully.


@pytest.hookimpl
def pytest_runtestloop(self):
Expand Down
1 change: 1 addition & 0 deletions src/xdist/scheduler/__init__.py
Expand Up @@ -3,4 +3,5 @@
from xdist.scheduler.loadfile import LoadFileScheduling as LoadFileScheduling
from xdist.scheduler.loadgroup import LoadGroupScheduling as LoadGroupScheduling
from xdist.scheduler.loadscope import LoadScopeScheduling as LoadScopeScheduling
from xdist.scheduler.protocol import Scheduling as Scheduling
from xdist.scheduler.worksteal import WorkStealingScheduling as WorkStealingScheduling
3 changes: 3 additions & 0 deletions src/xdist/scheduler/each.py
Expand Up @@ -103,6 +103,9 @@ def mark_test_complete(self, node, item_index, duration=0):
def mark_test_pending(self, item):
raise NotImplementedError()

def remove_pending_tests_from_node(self, node, indices):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some schedulers were missing this method that is used by DSession, if guess it's not called for them so raising NotImplementedError.

raise NotImplementedError()

def remove_node(self, node):
# KeyError if we didn't get an add_node() yet
pending = self.node2pending.pop(node)
Expand Down
3 changes: 3 additions & 0 deletions src/xdist/scheduler/load.py
Expand Up @@ -160,6 +160,9 @@ def mark_test_pending(self, item):
for node in self.node2pending:
self.check_schedule(node)

def remove_pending_tests_from_node(self, node, indices):
raise NotImplementedError()

def check_schedule(self, node, duration=0):
"""Maybe schedule new items on the node.

Expand Down
3 changes: 3 additions & 0 deletions src/xdist/scheduler/loadscope.py
Expand Up @@ -244,6 +244,9 @@ def mark_test_complete(self, node, item_index, duration=0):
def mark_test_pending(self, item):
raise NotImplementedError()

def remove_pending_tests_from_node(self, node, indices):
raise NotImplementedError()

def _assign_work_unit(self, node):
"""Assign a work unit to a node."""
assert self.workqueue
Expand Down
47 changes: 47 additions & 0 deletions src/xdist/scheduler/protocol.py
@@ -0,0 +1,47 @@
from __future__ import annotations

from typing import Protocol
from typing import Sequence

from xdist.workermanage import WorkerController


class Scheduling(Protocol):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, up to you: I think a more appropriate name would be Scheduler (a subject), "sheduling" is a verb AFAIK. But I understand also that all the implementations use Scheduling in their name, so on one hand it makes sense to name this Scheduling, on the other hand, "two wrongs don't make a right".

Up to you really. 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I don't know why the existing schedulers are called Scheduling but I don't think we can safely change them at this point. And then I think being consistent is better.

@property
def nodes(self) -> list[WorkerController]: ...

@property
def collection_is_completed(self) -> bool: ...

@property
def tests_finished(self) -> bool: ...

@property
def has_pending(self) -> bool: ...

def add_node(self, node: WorkerController) -> None: ...

def add_node_collection(
self,
node: WorkerController,
collection: Sequence[str],
) -> None: ...

def mark_test_complete(
self,
node: WorkerController,
item_index: int,
duration: float = 0,
) -> None: ...

def mark_test_pending(self, item: str) -> None: ...

def remove_pending_tests_from_node(
self,
node: WorkerController,
indices: Sequence[int],
) -> None: ...

def remove_node(self, node: WorkerController) -> str | None: ...

def schedule(self) -> None: ...