From aa2384f5bba23d2d3ecd669d8956b418df96d5cf Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 21 Oct 2021 15:09:15 +0200 Subject: [PATCH 1/6] Provide stack for suspended coro in test timeout --- distributed/tests/test_utils_test.py | 18 ++++++++++++++++++ distributed/utils_test.py | 14 +++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 0de9c7793c..db289f75d9 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -373,3 +373,21 @@ async def ping_pong(): assert await write_queue.get() == (b.address, {"op": "ping", "reply": True}) write_event.set() assert await fut == "pong" + + +def test_provide_stack_on_timeout(): + sleep_time = 30 + + async def inner_test(c, s, a, b): + await asyncio.sleep(sleep_time) + + test = gen_cluster(client=True, timeout=0.5)(inner_test) + + start = time() + with pytest.raises(asyncio.TimeoutError) as exc: + test() + end = time() + assert "inner_test" in str(exc) + assert "await asyncio.sleep(sleep_time)" in str(exc) + # ensure the task was properly + assert end - start < sleep_time / 2 diff --git a/distributed/utils_test.py b/distributed/utils_test.py index c622a2faff..e4d4265256 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -965,10 +965,22 @@ async def coro(): args = [c] + args try: future = func(*args, *outer_args, **kwargs) - future = asyncio.wait_for(future, timeout) + task = asyncio.create_task(future) + + future = asyncio.wait_for(asyncio.shield(task), timeout) result = await future if s.validate: s.validate_state() + except asyncio.TimeoutError: + assert task + buffer = io.StringIO() + # This stack indicates where the coro/test is + # suspended + task.print_stack(file=buffer) + task.cancel() + while not task.cancelled(): + await asyncio.sleep(0.01) + raise TimeoutError(f"Test timeout.\n{buffer.getvalue()}") finally: if client and c.status not in ("closing", "closed"): await c._close(fast=s.status == Status.closed) From 5d1dedb808ebb2f67c0b7520efc92dca177f9dcc Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 21 Oct 2021 15:13:27 +0200 Subject: [PATCH 2/6] prune exception cause --- distributed/utils_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index e4d4265256..9770bd2cd7 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -980,7 +980,9 @@ async def coro(): task.cancel() while not task.cancelled(): await asyncio.sleep(0.01) - raise TimeoutError(f"Test timeout.\n{buffer.getvalue()}") + raise TimeoutError( + f"Test timeout.\n{buffer.getvalue()}" + ) from None finally: if client and c.status not in ("closing", "closed"): await c._close(fast=s.status == Status.closed) From c3ba9f1e62426ead26dbd6462f182df0c91a6f36 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 21 Oct 2021 15:16:54 +0200 Subject: [PATCH 3/6] Add timeout in seconds to exc message --- distributed/tests/test_scheduler.py | 2 +- distributed/tests/test_utils_test.py | 21 --------------------- distributed/utils_test.py | 12 ++++++------ 3 files changed, 7 insertions(+), 28 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index affa58702c..7f2e7f942b 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -420,7 +420,7 @@ def func(scheduler): @gen_cluster( nthreads=[], config={"distributed.scheduler.blocked-handlers": ["test-handler"]} ) -def test_scheduler_init_pulls_blocked_handlers_from_config(s): +async def test_scheduler_init_pulls_blocked_handlers_from_config(s): assert s.blocked_handlers == ["test-handler"] diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index db289f75d9..2ee91d8d36 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -106,27 +106,6 @@ def assert_config(): await c.run_on_scheduler(assert_config) -@gen_cluster(client=True) -def test_gen_cluster_legacy_implicit(c, s, a, b): - assert isinstance(c, Client) - assert isinstance(s, Scheduler) - for w in [a, b]: - assert isinstance(w, Worker) - assert s.nthreads == {w.address: w.nthreads for w in [a, b]} - assert (yield c.submit(lambda: 123)) == 123 - - -@gen_cluster(client=True) -@gen.coroutine -def test_gen_cluster_legacy_explicit(c, s, a, b): - assert isinstance(c, Client) - assert isinstance(s, Scheduler) - for w in [a, b]: - assert isinstance(w, Worker) - assert s.nthreads == {w.address: w.nthreads for w in [a, b]} - assert (yield c.submit(lambda: 123)) == 123 - - @pytest.mark.skip(reason="This hangs on travis") def test_gen_cluster_cleans_up_client(loop): import dask.context diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 9770bd2cd7..8584345c41 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -919,7 +919,7 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture def _(func): if not iscoroutinefunction(func): - func = gen.coroutine(func) + raise RuntimeError("gen_cluster only works for coroutine functions.") @functools.wraps(func) def test_func(*outer_args, **kwargs): @@ -964,11 +964,11 @@ async def coro(): ) args = [c] + args try: - future = func(*args, *outer_args, **kwargs) - task = asyncio.create_task(future) + coro = func(*args, *outer_args, **kwargs) + task = asyncio.create_task(coro) - future = asyncio.wait_for(asyncio.shield(task), timeout) - result = await future + coro2 = asyncio.wait_for(asyncio.shield(task), timeout) + result = await coro2 if s.validate: s.validate_state() except asyncio.TimeoutError: @@ -981,7 +981,7 @@ async def coro(): while not task.cancelled(): await asyncio.sleep(0.01) raise TimeoutError( - f"Test timeout.\n{buffer.getvalue()}" + f"Test timeout after {timeout}s.\n{buffer.getvalue()}" ) from None finally: if client and c.status not in ("closing", "closed"): From 3b6107b480eaccf7c8d8cf847a262a7194412c4b Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 21 Oct 2021 15:58:17 +0100 Subject: [PATCH 4/6] Update distributed/utils_test.py --- distributed/utils_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 8584345c41..1ee7ceb361 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -974,8 +974,7 @@ async def coro(): except asyncio.TimeoutError: assert task buffer = io.StringIO() - # This stack indicates where the coro/test is - # suspended + # This stack indicates where the coro/test is suspended task.print_stack(file=buffer) task.cancel() while not task.cancelled(): From cfa233ce9d75a67ca0aa2858b4560ce825bd7c67 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 21 Oct 2021 18:00:38 +0200 Subject: [PATCH 5/6] Include exception context and raise timeout --- distributed/tests/test_utils_test.py | 5 ++++- distributed/utils_test.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 2ee91d8d36..50a27570df 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -354,13 +354,16 @@ async def ping_pong(): assert await fut == "pong" +@pytest.mark.slow() def test_provide_stack_on_timeout(): sleep_time = 30 async def inner_test(c, s, a, b): await asyncio.sleep(sleep_time) - test = gen_cluster(client=True, timeout=0.5)(inner_test) + # If this timeout is too small, the cluster setup/teardown might take too + # long and the timeout error we'll receive will be different + test = gen_cluster(client=True, timeout=2)(inner_test) start = time() with pytest.raises(asyncio.TimeoutError) as exc: diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 1ee7ceb361..a355f381b9 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -981,7 +981,7 @@ async def coro(): await asyncio.sleep(0.01) raise TimeoutError( f"Test timeout after {timeout}s.\n{buffer.getvalue()}" - ) from None + ) finally: if client and c.status not in ("closing", "closed"): await c._close(fast=s.status == Status.closed) From 94705cbfdb217ee1274499ca6db8e144f9ffb021 Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Thu, 21 Oct 2021 18:13:36 +0200 Subject: [PATCH 6/6] Apply suggestions from code review Co-authored-by: crusaderky --- distributed/utils_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index a355f381b9..d5a4a07f53 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -971,7 +971,7 @@ async def coro(): result = await coro2 if s.validate: s.validate_state() - except asyncio.TimeoutError: + except asyncio.TimeoutError as e: assert task buffer = io.StringIO() # This stack indicates where the coro/test is suspended @@ -981,7 +981,7 @@ async def coro(): await asyncio.sleep(0.01) raise TimeoutError( f"Test timeout after {timeout}s.\n{buffer.getvalue()}" - ) + ) from e finally: if client and c.status not in ("closing", "closed"): await c._close(fast=s.status == Status.closed)