Skip to content

Commit

Permalink
Add timeout in seconds to exc message
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Oct 21, 2021
1 parent 5d1dedb commit c3ba9f1
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 28 deletions.
2 changes: 1 addition & 1 deletion distributed/tests/test_scheduler.py
Expand Up @@ -420,7 +420,7 @@ def func(scheduler):
@gen_cluster(
nthreads=[], config={"distributed.scheduler.blocked-handlers": ["test-handler"]}
)
def test_scheduler_init_pulls_blocked_handlers_from_config(s):
async def test_scheduler_init_pulls_blocked_handlers_from_config(s):
assert s.blocked_handlers == ["test-handler"]


Expand Down
21 changes: 0 additions & 21 deletions distributed/tests/test_utils_test.py
Expand Up @@ -106,27 +106,6 @@ def assert_config():
await c.run_on_scheduler(assert_config)


@gen_cluster(client=True)
def test_gen_cluster_legacy_implicit(c, s, a, b):
assert isinstance(c, Client)
assert isinstance(s, Scheduler)
for w in [a, b]:
assert isinstance(w, Worker)
assert s.nthreads == {w.address: w.nthreads for w in [a, b]}
assert (yield c.submit(lambda: 123)) == 123


@gen_cluster(client=True)
@gen.coroutine
def test_gen_cluster_legacy_explicit(c, s, a, b):
assert isinstance(c, Client)
assert isinstance(s, Scheduler)
for w in [a, b]:
assert isinstance(w, Worker)
assert s.nthreads == {w.address: w.nthreads for w in [a, b]}
assert (yield c.submit(lambda: 123)) == 123


@pytest.mark.skip(reason="This hangs on travis")
def test_gen_cluster_cleans_up_client(loop):
import dask.context
Expand Down
12 changes: 6 additions & 6 deletions distributed/utils_test.py
Expand Up @@ -919,7 +919,7 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture

def _(func):
if not iscoroutinefunction(func):
func = gen.coroutine(func)
raise RuntimeError("gen_cluster only works for coroutine functions.")

@functools.wraps(func)
def test_func(*outer_args, **kwargs):
Expand Down Expand Up @@ -964,11 +964,11 @@ async def coro():
)
args = [c] + args
try:
future = func(*args, *outer_args, **kwargs)
task = asyncio.create_task(future)
coro = func(*args, *outer_args, **kwargs)
task = asyncio.create_task(coro)

future = asyncio.wait_for(asyncio.shield(task), timeout)
result = await future
coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
result = await coro2
if s.validate:
s.validate_state()
except asyncio.TimeoutError:
Expand All @@ -981,7 +981,7 @@ async def coro():
while not task.cancelled():
await asyncio.sleep(0.01)
raise TimeoutError(
f"Test timeout.\n{buffer.getvalue()}"
f"Test timeout after {timeout}s.\n{buffer.getvalue()}"
) from None
finally:
if client and c.status not in ("closing", "closed"):
Expand Down

0 comments on commit c3ba9f1

Please sign in to comment.