diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index affa58702c..7f2e7f942b 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -420,7 +420,7 @@ def func(scheduler): @gen_cluster( nthreads=[], config={"distributed.scheduler.blocked-handlers": ["test-handler"]} ) -def test_scheduler_init_pulls_blocked_handlers_from_config(s): +async def test_scheduler_init_pulls_blocked_handlers_from_config(s): assert s.blocked_handlers == ["test-handler"] diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index db289f75d9..2ee91d8d36 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -106,27 +106,6 @@ def assert_config(): await c.run_on_scheduler(assert_config) -@gen_cluster(client=True) -def test_gen_cluster_legacy_implicit(c, s, a, b): - assert isinstance(c, Client) - assert isinstance(s, Scheduler) - for w in [a, b]: - assert isinstance(w, Worker) - assert s.nthreads == {w.address: w.nthreads for w in [a, b]} - assert (yield c.submit(lambda: 123)) == 123 - - -@gen_cluster(client=True) -@gen.coroutine -def test_gen_cluster_legacy_explicit(c, s, a, b): - assert isinstance(c, Client) - assert isinstance(s, Scheduler) - for w in [a, b]: - assert isinstance(w, Worker) - assert s.nthreads == {w.address: w.nthreads for w in [a, b]} - assert (yield c.submit(lambda: 123)) == 123 - - @pytest.mark.skip(reason="This hangs on travis") def test_gen_cluster_cleans_up_client(loop): import dask.context diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 9770bd2cd7..8584345c41 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -919,7 +919,7 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture def _(func): if not iscoroutinefunction(func): - func = gen.coroutine(func) + raise RuntimeError("gen_cluster only works for coroutine functions.") @functools.wraps(func) def test_func(*outer_args, **kwargs): @@ -964,11 +964,11 @@ async def coro(): ) args = [c] + args try: - future = func(*args, *outer_args, **kwargs) - task = asyncio.create_task(future) + coro = func(*args, *outer_args, **kwargs) + task = asyncio.create_task(coro) - future = asyncio.wait_for(asyncio.shield(task), timeout) - result = await future + coro2 = asyncio.wait_for(asyncio.shield(task), timeout) + result = await coro2 if s.validate: s.validate_state() except asyncio.TimeoutError: @@ -981,7 +981,7 @@ async def coro(): while not task.cancelled(): await asyncio.sleep(0.01) raise TimeoutError( - f"Test timeout.\n{buffer.getvalue()}" + f"Test timeout after {timeout}s.\n{buffer.getvalue()}" ) from None finally: if client and c.status not in ("closing", "closed"):