Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide stack for suspended coro in test timeout #5446

Merged
merged 6 commits into from Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion distributed/tests/test_scheduler.py
Expand Up @@ -420,7 +420,7 @@ def func(scheduler):
@gen_cluster(
nthreads=[], config={"distributed.scheduler.blocked-handlers": ["test-handler"]}
)
def test_scheduler_init_pulls_blocked_handlers_from_config(s):
async def test_scheduler_init_pulls_blocked_handlers_from_config(s):
assert s.blocked_handlers == ["test-handler"]


Expand Down
42 changes: 21 additions & 21 deletions distributed/tests/test_utils_test.py
Expand Up @@ -106,27 +106,6 @@ def assert_config():
await c.run_on_scheduler(assert_config)


@gen_cluster(client=True)
def test_gen_cluster_legacy_implicit(c, s, a, b):
assert isinstance(c, Client)
assert isinstance(s, Scheduler)
for w in [a, b]:
assert isinstance(w, Worker)
assert s.nthreads == {w.address: w.nthreads for w in [a, b]}
assert (yield c.submit(lambda: 123)) == 123


@gen_cluster(client=True)
@gen.coroutine
def test_gen_cluster_legacy_explicit(c, s, a, b):
assert isinstance(c, Client)
assert isinstance(s, Scheduler)
for w in [a, b]:
assert isinstance(w, Worker)
assert s.nthreads == {w.address: w.nthreads for w in [a, b]}
assert (yield c.submit(lambda: 123)) == 123


@pytest.mark.skip(reason="This hangs on travis")
def test_gen_cluster_cleans_up_client(loop):
import dask.context
Expand Down Expand Up @@ -373,3 +352,24 @@ async def ping_pong():
assert await write_queue.get() == (b.address, {"op": "ping", "reply": True})
write_event.set()
assert await fut == "pong"


@pytest.mark.slow()
def test_provide_stack_on_timeout():
sleep_time = 30

async def inner_test(c, s, a, b):
await asyncio.sleep(sleep_time)

# If this timeout is too small, the cluster setup/teardown might take too
# long and the timeout error we'll receive will be different
test = gen_cluster(client=True, timeout=2)(inner_test)

start = time()
with pytest.raises(asyncio.TimeoutError) as exc:
test()
end = time()
assert "inner_test" in str(exc)
assert "await asyncio.sleep(sleep_time)" in str(exc)
# ensure the task was properly
assert end - start < sleep_time / 2
21 changes: 17 additions & 4 deletions distributed/utils_test.py
Expand Up @@ -919,7 +919,7 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture

def _(func):
if not iscoroutinefunction(func):
func = gen.coroutine(func)
raise RuntimeError("gen_cluster only works for coroutine functions.")

@functools.wraps(func)
def test_func(*outer_args, **kwargs):
Expand Down Expand Up @@ -964,11 +964,24 @@ async def coro():
)
args = [c] + args
try:
future = func(*args, *outer_args, **kwargs)
future = asyncio.wait_for(future, timeout)
result = await future
coro = func(*args, *outer_args, **kwargs)
task = asyncio.create_task(coro)

coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
result = await coro2
if s.validate:
s.validate_state()
except asyncio.TimeoutError as e:
assert task
buffer = io.StringIO()
# This stack indicates where the coro/test is suspended
task.print_stack(file=buffer)
task.cancel()
while not task.cancelled():
await asyncio.sleep(0.01)
raise TimeoutError(
f"Test timeout after {timeout}s.\n{buffer.getvalue()}"
) from e
finally:
if client and c.status not in ("closing", "closed"):
await c._close(fast=s.status == Status.closed)
Expand Down