diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index affa58702c..7f2e7f942b 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -420,7 +420,7 @@ def func(scheduler): @gen_cluster( nthreads=[], config={"distributed.scheduler.blocked-handlers": ["test-handler"]} ) -def test_scheduler_init_pulls_blocked_handlers_from_config(s): +async def test_scheduler_init_pulls_blocked_handlers_from_config(s): assert s.blocked_handlers == ["test-handler"] diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 0de9c7793c..50a27570df 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -106,27 +106,6 @@ def assert_config(): await c.run_on_scheduler(assert_config) -@gen_cluster(client=True) -def test_gen_cluster_legacy_implicit(c, s, a, b): - assert isinstance(c, Client) - assert isinstance(s, Scheduler) - for w in [a, b]: - assert isinstance(w, Worker) - assert s.nthreads == {w.address: w.nthreads for w in [a, b]} - assert (yield c.submit(lambda: 123)) == 123 - - -@gen_cluster(client=True) -@gen.coroutine -def test_gen_cluster_legacy_explicit(c, s, a, b): - assert isinstance(c, Client) - assert isinstance(s, Scheduler) - for w in [a, b]: - assert isinstance(w, Worker) - assert s.nthreads == {w.address: w.nthreads for w in [a, b]} - assert (yield c.submit(lambda: 123)) == 123 - - @pytest.mark.skip(reason="This hangs on travis") def test_gen_cluster_cleans_up_client(loop): import dask.context @@ -373,3 +352,24 @@ async def ping_pong(): assert await write_queue.get() == (b.address, {"op": "ping", "reply": True}) write_event.set() assert await fut == "pong" + + +@pytest.mark.slow() +def test_provide_stack_on_timeout(): + sleep_time = 30 + + async def inner_test(c, s, a, b): + await asyncio.sleep(sleep_time) + + # If this timeout is too small, the cluster setup/teardown might take too + # long and the timeout error we'll receive will be different + test = gen_cluster(client=True, timeout=2)(inner_test) + + start = time() + with pytest.raises(asyncio.TimeoutError) as exc: + test() + end = time() + assert "inner_test" in str(exc) + assert "await asyncio.sleep(sleep_time)" in str(exc) + # ensure the task was properly + assert end - start < sleep_time / 2 diff --git a/distributed/utils_test.py b/distributed/utils_test.py index c622a2faff..d5a4a07f53 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -919,7 +919,7 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture def _(func): if not iscoroutinefunction(func): - func = gen.coroutine(func) + raise RuntimeError("gen_cluster only works for coroutine functions.") @functools.wraps(func) def test_func(*outer_args, **kwargs): @@ -964,11 +964,24 @@ async def coro(): ) args = [c] + args try: - future = func(*args, *outer_args, **kwargs) - future = asyncio.wait_for(future, timeout) - result = await future + coro = func(*args, *outer_args, **kwargs) + task = asyncio.create_task(coro) + + coro2 = asyncio.wait_for(asyncio.shield(task), timeout) + result = await coro2 if s.validate: s.validate_state() + except asyncio.TimeoutError as e: + assert task + buffer = io.StringIO() + # This stack indicates where the coro/test is suspended + task.print_stack(file=buffer) + task.cancel() + while not task.cancelled(): + await asyncio.sleep(0.01) + raise TimeoutError( + f"Test timeout after {timeout}s.\n{buffer.getvalue()}" + ) from e finally: if client and c.status not in ("closing", "closed"): await c._close(fast=s.status == Status.closed)