Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aioloop-proxy usage draft #312

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.rst
Expand Up @@ -247,6 +247,12 @@ automatically to *async* test functions.
.. |pytestmark| replace:: ``pytestmark``
.. _pytestmark: http://doc.pytest.org/en/latest/example/markers.html#marking-whole-classes-or-modules

Timeout protection
------------------

Sometime tests can work much slowly than expected or even hang.


Note about unittest
-------------------

Expand Down
93 changes: 93 additions & 0 deletions pytest_asyncio/_runner.py
@@ -0,0 +1,93 @@
import asyncio
import contextlib
from typing import Awaitable, List, Optional, TypeVar

import aioloop_proxy
import pytest

_R = TypeVar("_R")


class Runner:
__slots__ = ("_loop", "_node", "_children", "_loop_proxy_cm")

def __init__(self, node: pytest.Item, loop: asyncio.AbstractEventLoop) -> None:
self._node = node
# children nodes that uses asyncio
# the list can be reset if the current node re-assigns the loop
self._children: List[Runner] = []
self._loop_proxy_cm: Optional[
"contextlib.AbstractContextManager[asyncio.AbstractEventLoop]"
] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._set_loop(loop)

@classmethod
def install(
cls, request: pytest.FixtureRequest, loop: asyncio.AbstractEventLoop
) -> None:
node = request.node
runner = getattr(node, "_asyncio_runner", None)
if runner is None:
runner = cls(node, loop)
node._asyncio_runner = runner
else:
# parametrized non-function scope loop was recalculated
# with other params of precessors
runner._set_loop(loop)
request.addfinalizer(runner._uninstall)

@classmethod
def uninstall(cls, request: pytest.FixtureRequest) -> None:
node = request.node
runner = getattr(node, "_asyncio_runner", None)
assert runner is not None
runner._uninstall()

@classmethod
def get(cls, node: pytest.Item) -> "Runner":
runner = getattr(node, "_asyncio_runner", None)
if runner is not None:
return runner
parent_node = node.parent
if parent_node is None:
# should never happen if pytest_fixture_setup works correctly
raise RuntimeError("Cannot find a node with installed loop")
parent_runner = cls.get(parent_node)
runner = cls(node, parent_runner._loop)
node._asyncio_runner = runner
node.addfinalizer(runner._uninstall)
return runner

def run_test(self, coro: Awaitable[None]) -> None:
task = asyncio.ensure_future(coro, loop=self._loop)
try:
self.run(task)
except BaseException:
# run_until_complete doesn't get the result from exceptions
# that are not subclasses of `Exception`. Consume all
# exceptions to prevent asyncio's warning from logging.
if task.done() and not task.cancelled():
task.exception()
raise

def run(self, coro: Awaitable[_R]) -> _R:
return self._loop.run_until_complete(coro)

def _set_loop(self, loop: asyncio.AbstractEventLoop) -> None:
assert loop is not None
if self._loop_proxy_cm is not None:
self._loop_proxy_cm.__exit__(None, None, None)
self._loop_proxy_cm = aioloop_proxy.proxy(loop)
self._loop = self._loop_proxy_cm.__enter__()
# cleanup children runners, recreate them on the next run
for child in self._children:
child._uninstall()
self._children.clear()

def _uninstall(self) -> None:
if self._loop_proxy_cm is not None:
self._loop_proxy_cm.__exit__(None, None, None)
self._loop_proxy_cm = None
self._loop = None
delattr(self._node, "_asyncio_runner")
36 changes: 17 additions & 19 deletions pytest_asyncio/plugin.py
Expand Up @@ -20,17 +20,19 @@
Set,
TypeVar,
Union,
cast,
overload,
)

import pytest

from ._runner import Runner

if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal


_R = TypeVar("_R")

_ScopeName = Literal["session", "package", "module", "class", "function"]
Expand Down Expand Up @@ -265,6 +267,7 @@ def _wrap_asyncgen(func: Callable[..., AsyncIterator[_R]]) -> Callable[..., _R]:
def _asyncgen_fixture_wrapper(
event_loop: asyncio.AbstractEventLoop, request: SubRequest, **kwargs: Any
) -> _R:
runner = Runner.get(request.node)
gen_obj = func(**_add_kwargs(func, kwargs, event_loop, request))

async def setup() -> _R:
Expand All @@ -284,9 +287,9 @@ async def async_finalizer() -> None:
msg += "Yield only once."
raise ValueError(msg)

event_loop.run_until_complete(async_finalizer())
runner.run(async_finalizer())

result = event_loop.run_until_complete(setup())
result = runner.run(setup())
request.addfinalizer(finalizer)
return result

Expand All @@ -298,11 +301,13 @@ def _wrap_async(func: Callable[..., Awaitable[_R]]) -> Callable[..., _R]:
def _async_fixture_wrapper(
event_loop: asyncio.AbstractEventLoop, request: SubRequest, **kwargs: Any
) -> _R:
runner = Runner.get(request.node)

async def setup() -> _R:
res = await func(**_add_kwargs(func, kwargs, event_loop, request))
return res

return event_loop.run_until_complete(setup())
return runner.run(setup())

return _async_fixture_wrapper

Expand Down Expand Up @@ -366,8 +371,11 @@ def pytest_fixture_setup(
) -> Optional[object]:
"""Adjust the event loop policy when an event loop is produced."""
if fixturedef.argname == "event_loop":
# a marker for future runners lookup
# The lookup doesn't go deeper than a node with this marker set.
outcome = yield
loop = outcome.get_result()
Runner.install(request, loop)
policy = asyncio.get_event_loop_policy()
try:
old_loop = policy.get_event_loop()
Expand All @@ -392,19 +400,18 @@ def pytest_pyfunc_call(pyfuncitem: pytest.Function) -> Optional[object]:
"""
marker = pyfuncitem.get_closest_marker("asyncio")
if marker is not None:
funcargs: Dict[str, object] = pyfuncitem.funcargs # type: ignore[name-defined]
loop = cast(asyncio.AbstractEventLoop, funcargs["event_loop"])
runner = Runner.get(pyfuncitem)
if _is_hypothesis_test(pyfuncitem.obj):
pyfuncitem.obj.hypothesis.inner_test = wrap_in_sync(
pyfuncitem,
pyfuncitem.obj.hypothesis.inner_test,
_loop=loop,
runner,
)
else:
pyfuncitem.obj = wrap_in_sync(
pyfuncitem,
pyfuncitem.obj,
_loop=loop,
runner,
)
yield

Expand All @@ -416,7 +423,7 @@ def _is_hypothesis_test(function: Any) -> bool:
def wrap_in_sync(
pyfuncitem: pytest.Function,
func: Callable[..., Awaitable[Any]],
_loop: asyncio.AbstractEventLoop,
runner: Runner,
):
"""Return a sync wrapper around an async function executing it in the
current event loop."""
Expand All @@ -442,16 +449,7 @@ def inner(**kwargs):
)
)
return
task = asyncio.ensure_future(coro, loop=_loop)
try:
_loop.run_until_complete(task)
except BaseException:
# run_until_complete doesn't get the result from exceptions
# that are not subclasses of `Exception`. Consume all
# exceptions to prevent asyncio's warning from logging.
if task.done() and not task.cancelled():
task.exception()
raise
runner.run_test(coro)

inner._raw_test_func = func # type: ignore[attr-defined]
return inner
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Expand Up @@ -38,6 +38,7 @@ setup_requires =
setuptools_scm >= 6.2

install_requires =
aioloop-proxy >= 0.0.13
pytest >= 6.1.0
typing-extensions >= 4.0; python_version < "3.8"

Expand All @@ -46,6 +47,7 @@ testing =
coverage==6.2
hypothesis >= 5.7.1
flaky >= 3.5.0
pytest-timeout == 2.1.0
mypy == 0.931

[options.entry_points]
Expand Down
Expand Up @@ -54,6 +54,13 @@ async def port_afinalizer():
current_loop = asyncio.get_event_loop_policy().get_event_loop()
current_loop.run_until_complete(port_afinalizer())

worker = asyncio.ensure_future(asyncio.sleep(0.2))
# nested loop proxy is different than explicitly set by
# loop_policy.set_event_loop().
# It is really not a big problem because async functions
# always have the correct 'get_running_loop()' already.
worker = asyncio.ensure_future(
asyncio.sleep(0.2),
loop=asyncio.get_event_loop_policy().get_event_loop(),
)
request.addfinalizer(functools.partial(port_finalizer, worker))
return True
11 changes: 11 additions & 0 deletions tests/conftest.py
@@ -1,5 +1,6 @@
import asyncio

import aioloop_proxy
import pytest

pytest_plugins = "pytester"
Expand Down Expand Up @@ -30,3 +31,13 @@ def factory():
return unused_tcp_port_factory()

return factory


@pytest.fixture(scope="session")
def original_loop():
def func(loop_proxy: aioloop_proxy.LoopProxy) -> asyncio.AbstractEventLoop:
while isinstance(loop_proxy, aioloop_proxy.LoopProxy):
loop_proxy = loop_proxy.get_parent_loop()
return loop_proxy

return func
6 changes: 4 additions & 2 deletions tests/multiloop/test_alternative_loops.py
Expand Up @@ -5,10 +5,12 @@


@pytest.mark.asyncio
async def test_for_custom_loop():
async def test_for_custom_loop(original_loop):
"""This test should be executed using the custom loop."""
await asyncio.sleep(0.01)
assert type(asyncio.get_event_loop()).__name__ == "CustomSelectorLoop"
assert (
type(original_loop(asyncio.get_event_loop())).__name__ == "CustomSelectorLoop"
)


@pytest.mark.asyncio
Expand Down
Expand Up @@ -5,13 +5,13 @@


@pytest.mark.asyncio
async def test_uses_loop_provided_by_custom_policy():
async def test_uses_loop_provided_by_custom_policy(original_loop):
"""Asserts that test cases use the event loop
provided by the custom event loop policy"""
assert type(asyncio.get_event_loop()).__name__ == "TestEventLoop"
assert type(original_loop(asyncio.get_event_loop())).__name__ == "TestEventLoop"


@pytest.mark.asyncio
async def test_custom_policy_is_not_overwritten():
async def test_custom_policy_is_not_overwritten(original_loop):
"""Asserts that any custom event loop policy stays the same across test cases"""
assert type(asyncio.get_event_loop()).__name__ == "TestEventLoop"
assert type(original_loop(asyncio.get_event_loop())).__name__ == "TestEventLoop"
7 changes: 5 additions & 2 deletions tests/sessionloop/test_session_loops.py
Expand Up @@ -5,10 +5,13 @@


@pytest.mark.asyncio
async def test_for_custom_loop():
async def test_for_custom_loop(original_loop):
"""This test should be executed using the custom loop."""
await asyncio.sleep(0.01)
assert type(asyncio.get_event_loop()).__name__ == "CustomSelectorLoopSession"
assert (
type(original_loop(asyncio.get_event_loop())).__name__
== "CustomSelectorLoopSession"
)


@pytest.mark.asyncio
Expand Down
2 changes: 1 addition & 1 deletion tests/test_asyncio_fixture.py
Expand Up @@ -36,6 +36,6 @@ async def fixture_with_params(request):


@pytest.mark.asyncio
async def test_fixture_with_params(fixture_with_params):
async def test_fixture_with_params(request, fixture_with_params):
await asyncio.sleep(0)
assert fixture_with_params % 2 == 0