Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributed/tests/test_cancelled_state.py failures #5527

Closed
jrbourbeau opened this issue Nov 18, 2021 · 0 comments · Fixed by #5528
Closed

distributed/tests/test_cancelled_state.py failures #5527

jrbourbeau opened this issue Nov 18, 2021 · 0 comments · Fixed by #5528
Assignees
Labels
bug Something is broken

Comments

@jrbourbeau
Copy link
Member

We recently merged #5525, #5503, and #5507 which contain various deadlock-related fixes. Tests passed in each individual PR (outside of a few flaky tests), however both distributed/tests/test_cancelled_state.py::test_executing_cancelled_error and distributed/tests/test_cancelled_state.py::test_flight_cancelled_error, which were added in #5503, are failing consistently on main with an asyncio.TimeoutError (I'm also able to reproduce locally). This indicates that, while each PR may have been okay on its own, they are not interacting well with each other.

See this CI build for an example of the test failures -- I've included full tracebacks below, though note they aren't super insightful.

Full tracebacks:
=================================== FAILURES ===================================
________________________ test_executing_cancelled_error ________________________

fut = <Future cancelled>, timeout = 30

    async def wait_for(fut, timeout, *, loop=None):
        """Wait for the single Future or coroutine to complete, with timeout.
    
        Coroutine will be wrapped in Task.
    
        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().
    
        If the wait is cancelled, the task is also cancelled.
    
        This function is a coroutine.
        """
        if loop is None:
            loop = events.get_running_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)
    
        if timeout is None:
            return await fut
    
        if timeout <= 0:
            fut = ensure_future(fut, loop=loop)
    
            if fut.done():
                return fut.result()
    
            await _cancel_and_wait(fut, loop=loop)
            try:
                fut.result()
            except exceptions.CancelledError as exc:
                raise exceptions.TimeoutError() from exc
            else:
                raise exceptions.TimeoutError()
    
        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)
    
        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)
    
        try:
            # wait until the future completes or the timeout
            try:
                await waiter
            except exceptions.CancelledError:
                if fut.done():
                    return fut.result()
                else:
                    fut.remove_done_callback(cb)
                    # We must ensure that the task is not running
                    # after wait_for() returns.
                    # See https://bugs.python.org/issue32751
                    await _cancel_and_wait(fut, loop=loop)
                    raise
    
            if fut.done():
                return fut.result()
            else:
                fut.remove_done_callback(cb)
                # We must ensure that the task is not running
                # after wait_for() returns.
                # See https://bugs.python.org/issue32751
                await _cancel_and_wait(fut, loop=loop)
                # In case task cancellation failed with some
                # exception, we should re-raise it
                # See https://bugs.python.org/issue40607
                try:
>                   fut.result()
E                   asyncio.exceptions.CancelledError

/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py:492: CancelledError

The above exception was the direct cause of the following exception:

    async def coro():
        with dask.config.set(config):
            s = False
            for _ in range(60):
                try:
                    s, ws = await start_cluster(
                        nthreads,
                        scheduler,
                        loop,
                        security=security,
                        Worker=Worker,
                        scheduler_kwargs=scheduler_kwargs,
                        worker_kwargs=worker_kwargs,
                    )
                except Exception as e:
                    logger.error(
                        "Failed to start gen_cluster: "
                        f"{e.__class__.__name__}: {e}; retrying",
                        exc_info=True,
                    )
                    await asyncio.sleep(1)
                else:
                    workers[:] = ws
                    args = [s] + workers
                    break
            if s is False:
                raise Exception("Could not start cluster")
            if client:
                c = await Client(
                    s.address,
                    loop=loop,
                    security=security,
                    asynchronous=True,
                    **client_kwargs,
                )
                args = [c] + args
            try:
                coro = func(*args, *outer_args, **kwargs)
                task = asyncio.create_task(coro)
    
                coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
>               result = await coro2

distributed/utils_test.py:975: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fut = <Future cancelled>, timeout = 30

    async def wait_for(fut, timeout, *, loop=None):
        """Wait for the single Future or coroutine to complete, with timeout.
    
        Coroutine will be wrapped in Task.
    
        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().
    
        If the wait is cancelled, the task is also cancelled.
    
        This function is a coroutine.
        """
        if loop is None:
            loop = events.get_running_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)
    
        if timeout is None:
            return await fut
    
        if timeout <= 0:
            fut = ensure_future(fut, loop=loop)
    
            if fut.done():
                return fut.result()
    
            await _cancel_and_wait(fut, loop=loop)
            try:
                fut.result()
            except exceptions.CancelledError as exc:
                raise exceptions.TimeoutError() from exc
            else:
                raise exceptions.TimeoutError()
    
        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)
    
        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)
    
        try:
            # wait until the future completes or the timeout
            try:
                await waiter
            except exceptions.CancelledError:
                if fut.done():
                    return fut.result()
                else:
                    fut.remove_done_callback(cb)
                    # We must ensure that the task is not running
                    # after wait_for() returns.
                    # See https://bugs.python.org/issue32751
                    await _cancel_and_wait(fut, loop=loop)
                    raise
    
            if fut.done():
                return fut.result()
            else:
                fut.remove_done_callback(cb)
                # We must ensure that the task is not running
                # after wait_for() returns.
                # See https://bugs.python.org/issue32751
                await _cancel_and_wait(fut, loop=loop)
                # In case task cancellation failed with some
                # exception, we should re-raise it
                # See https://bugs.python.org/issue40607
                try:
                    fut.result()
                except exceptions.CancelledError as exc:
>                   raise exceptions.TimeoutError() from exc
E                   asyncio.exceptions.TimeoutError

/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py:494: TimeoutError

The above exception was the direct cause of the following exception:

outer_args = (), kwargs = {}, result = None
coro = <function gen_cluster.<locals>._.<locals>.test_func.<locals>.coro at 0x7f0762dad430>

    @functools.wraps(func)
    def test_func(*outer_args, **kwargs):
        result = None
        workers = []
        with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
    
            async def coro():
                with dask.config.set(config):
                    s = False
                    for _ in range(60):
                        try:
                            s, ws = await start_cluster(
                                nthreads,
                                scheduler,
                                loop,
                                security=security,
                                Worker=Worker,
                                scheduler_kwargs=scheduler_kwargs,
                                worker_kwargs=worker_kwargs,
                            )
                        except Exception as e:
                            logger.error(
                                "Failed to start gen_cluster: "
                                f"{e.__class__.__name__}: {e}; retrying",
                                exc_info=True,
                            )
                            await asyncio.sleep(1)
                        else:
                            workers[:] = ws
                            args = [s] + workers
                            break
                    if s is False:
                        raise Exception("Could not start cluster")
                    if client:
                        c = await Client(
                            s.address,
                            loop=loop,
                            security=security,
                            asynchronous=True,
                            **client_kwargs,
                        )
                        args = [c] + args
                    try:
                        coro = func(*args, *outer_args, **kwargs)
                        task = asyncio.create_task(coro)
    
                        coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
                        result = await coro2
                        if s.validate:
                            s.validate_state()
                    except asyncio.TimeoutError as e:
                        assert task
                        buffer = io.StringIO()
                        # This stack indicates where the coro/test is suspended
                        task.print_stack(file=buffer)
    
                        if client:
                            assert c
                            try:
                                if cluster_dump_directory:
                                    if not os.path.exists(cluster_dump_directory):
                                        os.makedirs(cluster_dump_directory)
                                    filename = os.path.join(
                                        cluster_dump_directory, func.__name__
                                    )
                                    fut = c.dump_cluster_state(
                                        filename,
                                        # Test dumps should be small enough that
                                        # there is no need for a compressed
                                        # binary representation and readability
                                        # is more important
                                        format="yaml",
                                    )
                                    assert fut is not None
                                    await fut
                            except Exception:
                                print(
                                    f"Exception {sys.exc_info()} while trying to dump cluster state."
                                )
    
                        task.cancel()
                        while not task.cancelled():
                            await asyncio.sleep(0.01)
                        raise TimeoutError(
                            f"Test timeout after {timeout}s.\n{buffer.getvalue()}"
                        ) from e
                    finally:
                        if client and c.status not in ("closing", "closed"):
                            await c._close(fast=s.status == Status.closed)
                        await end_cluster(s, workers)
                        await asyncio.wait_for(cleanup_global_workers(), 1)
    
                    try:
                        c = await default_client()
                    except ValueError:
                        pass
                    else:
                        await c._close(fast=True)
    
                    def get_unclosed():
                        return [c for c in Comm._instances if not c.closed()] + [
                            c
                            for c in _global_clients.values()
                            if c.status != "closed"
                        ]
    
                    try:
                        start = time()
                        while time() < start + 60:
                            gc.collect()
                            if not get_unclosed():
                                break
                            await asyncio.sleep(0.05)
                        else:
                            if allow_unclosed:
                                print(f"Unclosed Comms: {get_unclosed()}")
                            else:
                                raise RuntimeError("Unclosed Comms", get_unclosed())
                    finally:
                        Comm._instances.clear()
                        _global_clients.clear()
    
                    return result
    
>           result = loop.run_sync(
                coro, timeout=timeout * 2 if timeout else timeout
            )

distributed/utils_test.py:1052: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/ioloop.py:530: in run_sync
    return future_cell[0].result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    async def coro():
        with dask.config.set(config):
            s = False
            for _ in range(60):
                try:
                    s, ws = await start_cluster(
                        nthreads,
                        scheduler,
                        loop,
                        security=security,
                        Worker=Worker,
                        scheduler_kwargs=scheduler_kwargs,
                        worker_kwargs=worker_kwargs,
                    )
                except Exception as e:
                    logger.error(
                        "Failed to start gen_cluster: "
                        f"{e.__class__.__name__}: {e}; retrying",
                        exc_info=True,
                    )
                    await asyncio.sleep(1)
                else:
                    workers[:] = ws
                    args = [s] + workers
                    break
            if s is False:
                raise Exception("Could not start cluster")
            if client:
                c = await Client(
                    s.address,
                    loop=loop,
                    security=security,
                    asynchronous=True,
                    **client_kwargs,
                )
                args = [c] + args
            try:
                coro = func(*args, *outer_args, **kwargs)
                task = asyncio.create_task(coro)
    
                coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
                result = await coro2
                if s.validate:
                    s.validate_state()
            except asyncio.TimeoutError as e:
                assert task
                buffer = io.StringIO()
                # This stack indicates where the coro/test is suspended
                task.print_stack(file=buffer)
    
                if client:
                    assert c
                    try:
                        if cluster_dump_directory:
                            if not os.path.exists(cluster_dump_directory):
                                os.makedirs(cluster_dump_directory)
                            filename = os.path.join(
                                cluster_dump_directory, func.__name__
                            )
                            fut = c.dump_cluster_state(
                                filename,
                                # Test dumps should be small enough that
                                # there is no need for a compressed
                                # binary representation and readability
                                # is more important
                                format="yaml",
                            )
                            assert fut is not None
                            await fut
                    except Exception:
                        print(
                            f"Exception {sys.exc_info()} while trying to dump cluster state."
                        )
    
                task.cancel()
                while not task.cancelled():
                    await asyncio.sleep(0.01)
>               raise TimeoutError(
                    f"Test timeout after {timeout}s.\n{buffer.getvalue()}"
                ) from e
E               asyncio.exceptions.TimeoutError: Test timeout after 30s.
E               Stack for <Task pending name='Task-64578' coro=<test_executing_cancelled_error() running at /home/runner/work/distributed/distributed/distributed/tests/test_cancelled_state.py:163> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f0762704b80>()]>> (most recent call last):
E                 File "/home/runner/work/distributed/distributed/distributed/tests/test_cancelled_state.py", line 163, in test_executing_cancelled_error
E                   await asyncio.sleep(0.01)

distributed/utils_test.py:1011: TimeoutError
----------------------------- Captured stderr call -----------------------------
distributed.worker - WARNING - Compute Failed
Function:  wait_and_raise
args:      ()
kwargs:    {}
Exception: 'RuntimeError()'

_________________________ test_flight_cancelled_error __________________________

fut = <Future cancelled>, timeout = 30

    async def wait_for(fut, timeout, *, loop=None):
        """Wait for the single Future or coroutine to complete, with timeout.
    
        Coroutine will be wrapped in Task.
    
        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().
    
        If the wait is cancelled, the task is also cancelled.
    
        This function is a coroutine.
        """
        if loop is None:
            loop = events.get_running_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)
    
        if timeout is None:
            return await fut
    
        if timeout <= 0:
            fut = ensure_future(fut, loop=loop)
    
            if fut.done():
                return fut.result()
    
            await _cancel_and_wait(fut, loop=loop)
            try:
                fut.result()
            except exceptions.CancelledError as exc:
                raise exceptions.TimeoutError() from exc
            else:
                raise exceptions.TimeoutError()
    
        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)
    
        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)
    
        try:
            # wait until the future completes or the timeout
            try:
                await waiter
            except exceptions.CancelledError:
                if fut.done():
                    return fut.result()
                else:
                    fut.remove_done_callback(cb)
                    # We must ensure that the task is not running
                    # after wait_for() returns.
                    # See https://bugs.python.org/issue32751
                    await _cancel_and_wait(fut, loop=loop)
                    raise
    
            if fut.done():
                return fut.result()
            else:
                fut.remove_done_callback(cb)
                # We must ensure that the task is not running
                # after wait_for() returns.
                # See https://bugs.python.org/issue32751
                await _cancel_and_wait(fut, loop=loop)
                # In case task cancellation failed with some
                # exception, we should re-raise it
                # See https://bugs.python.org/issue40607
                try:
>                   fut.result()
E                   asyncio.exceptions.CancelledError

/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py:492: CancelledError

The above exception was the direct cause of the following exception:

    async def coro():
        with dask.config.set(config):
            s = False
            for _ in range(60):
                try:
                    s, ws = await start_cluster(
                        nthreads,
                        scheduler,
                        loop,
                        security=security,
                        Worker=Worker,
                        scheduler_kwargs=scheduler_kwargs,
                        worker_kwargs=worker_kwargs,
                    )
                except Exception as e:
                    logger.error(
                        "Failed to start gen_cluster: "
                        f"{e.__class__.__name__}: {e}; retrying",
                        exc_info=True,
                    )
                    await asyncio.sleep(1)
                else:
                    workers[:] = ws
                    args = [s] + workers
                    break
            if s is False:
                raise Exception("Could not start cluster")
            if client:
                c = await Client(
                    s.address,
                    loop=loop,
                    security=security,
                    asynchronous=True,
                    **client_kwargs,
                )
                args = [c] + args
            try:
                coro = func(*args, *outer_args, **kwargs)
                task = asyncio.create_task(coro)
    
                coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
>               result = await coro2

distributed/utils_test.py:975: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fut = <Future cancelled>, timeout = 30

    async def wait_for(fut, timeout, *, loop=None):
        """Wait for the single Future or coroutine to complete, with timeout.
    
        Coroutine will be wrapped in Task.
    
        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().
    
        If the wait is cancelled, the task is also cancelled.
    
        This function is a coroutine.
        """
        if loop is None:
            loop = events.get_running_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)
    
        if timeout is None:
            return await fut
    
        if timeout <= 0:
            fut = ensure_future(fut, loop=loop)
    
            if fut.done():
                return fut.result()
    
            await _cancel_and_wait(fut, loop=loop)
            try:
                fut.result()
            except exceptions.CancelledError as exc:
                raise exceptions.TimeoutError() from exc
            else:
                raise exceptions.TimeoutError()
    
        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)
    
        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)
    
        try:
            # wait until the future completes or the timeout
            try:
                await waiter
            except exceptions.CancelledError:
                if fut.done():
                    return fut.result()
                else:
                    fut.remove_done_callback(cb)
                    # We must ensure that the task is not running
                    # after wait_for() returns.
                    # See https://bugs.python.org/issue32751
                    await _cancel_and_wait(fut, loop=loop)
                    raise
    
            if fut.done():
                return fut.result()
            else:
                fut.remove_done_callback(cb)
                # We must ensure that the task is not running
                # after wait_for() returns.
                # See https://bugs.python.org/issue32751
                await _cancel_and_wait(fut, loop=loop)
                # In case task cancellation failed with some
                # exception, we should re-raise it
                # See https://bugs.python.org/issue40607
                try:
                    fut.result()
                except exceptions.CancelledError as exc:
>                   raise exceptions.TimeoutError() from exc
E                   asyncio.exceptions.TimeoutError

/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py:494: TimeoutError

The above exception was the direct cause of the following exception:

outer_args = (), kwargs = {}, result = None
coro = <function gen_cluster.<locals>._.<locals>.test_func.<locals>.coro at 0x7f0762b23310>

    @functools.wraps(func)
    def test_func(*outer_args, **kwargs):
        result = None
        workers = []
        with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
    
            async def coro():
                with dask.config.set(config):
                    s = False
                    for _ in range(60):
                        try:
                            s, ws = await start_cluster(
                                nthreads,
                                scheduler,
                                loop,
                                security=security,
                                Worker=Worker,
                                scheduler_kwargs=scheduler_kwargs,
                                worker_kwargs=worker_kwargs,
                            )
                        except Exception as e:
                            logger.error(
                                "Failed to start gen_cluster: "
                                f"{e.__class__.__name__}: {e}; retrying",
                                exc_info=True,
                            )
                            await asyncio.sleep(1)
                        else:
                            workers[:] = ws
                            args = [s] + workers
                            break
                    if s is False:
                        raise Exception("Could not start cluster")
                    if client:
                        c = await Client(
                            s.address,
                            loop=loop,
                            security=security,
                            asynchronous=True,
                            **client_kwargs,
                        )
                        args = [c] + args
                    try:
                        coro = func(*args, *outer_args, **kwargs)
                        task = asyncio.create_task(coro)
    
                        coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
                        result = await coro2
                        if s.validate:
                            s.validate_state()
                    except asyncio.TimeoutError as e:
                        assert task
                        buffer = io.StringIO()
                        # This stack indicates where the coro/test is suspended
                        task.print_stack(file=buffer)
    
                        if client:
                            assert c
                            try:
                                if cluster_dump_directory:
                                    if not os.path.exists(cluster_dump_directory):
                                        os.makedirs(cluster_dump_directory)
                                    filename = os.path.join(
                                        cluster_dump_directory, func.__name__
                                    )
                                    fut = c.dump_cluster_state(
                                        filename,
                                        # Test dumps should be small enough that
                                        # there is no need for a compressed
                                        # binary representation and readability
                                        # is more important
                                        format="yaml",
                                    )
                                    assert fut is not None
                                    await fut
                            except Exception:
                                print(
                                    f"Exception {sys.exc_info()} while trying to dump cluster state."
                                )
    
                        task.cancel()
                        while not task.cancelled():
                            await asyncio.sleep(0.01)
                        raise TimeoutError(
                            f"Test timeout after {timeout}s.\n{buffer.getvalue()}"
                        ) from e
                    finally:
                        if client and c.status not in ("closing", "closed"):
                            await c._close(fast=s.status == Status.closed)
                        await end_cluster(s, workers)
                        await asyncio.wait_for(cleanup_global_workers(), 1)
    
                    try:
                        c = await default_client()
                    except ValueError:
                        pass
                    else:
                        await c._close(fast=True)
    
                    def get_unclosed():
                        return [c for c in Comm._instances if not c.closed()] + [
                            c
                            for c in _global_clients.values()
                            if c.status != "closed"
                        ]
    
                    try:
                        start = time()
                        while time() < start + 60:
                            gc.collect()
                            if not get_unclosed():
                                break
                            await asyncio.sleep(0.05)
                        else:
                            if allow_unclosed:
                                print(f"Unclosed Comms: {get_unclosed()}")
                            else:
                                raise RuntimeError("Unclosed Comms", get_unclosed())
                    finally:
                        Comm._instances.clear()
                        _global_clients.clear()
    
                    return result
    
>           result = loop.run_sync(
                coro, timeout=timeout * 2 if timeout else timeout
            )

distributed/utils_test.py:1052: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/ioloop.py:530: in run_sync
    return future_cell[0].result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    async def coro():
        with dask.config.set(config):
            s = False
            for _ in range(60):
                try:
                    s, ws = await start_cluster(
                        nthreads,
                        scheduler,
                        loop,
                        security=security,
                        Worker=Worker,
                        scheduler_kwargs=scheduler_kwargs,
                        worker_kwargs=worker_kwargs,
                    )
                except Exception as e:
                    logger.error(
                        "Failed to start gen_cluster: "
                        f"{e.__class__.__name__}: {e}; retrying",
                        exc_info=True,
                    )
                    await asyncio.sleep(1)
                else:
                    workers[:] = ws
                    args = [s] + workers
                    break
            if s is False:
                raise Exception("Could not start cluster")
            if client:
                c = await Client(
                    s.address,
                    loop=loop,
                    security=security,
                    asynchronous=True,
                    **client_kwargs,
                )
                args = [c] + args
            try:
                coro = func(*args, *outer_args, **kwargs)
                task = asyncio.create_task(coro)
    
                coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
                result = await coro2
                if s.validate:
                    s.validate_state()
            except asyncio.TimeoutError as e:
                assert task
                buffer = io.StringIO()
                # This stack indicates where the coro/test is suspended
                task.print_stack(file=buffer)
    
                if client:
                    assert c
                    try:
                        if cluster_dump_directory:
                            if not os.path.exists(cluster_dump_directory):
                                os.makedirs(cluster_dump_directory)
                            filename = os.path.join(
                                cluster_dump_directory, func.__name__
                            )
                            fut = c.dump_cluster_state(
                                filename,
                                # Test dumps should be small enough that
                                # there is no need for a compressed
                                # binary representation and readability
                                # is more important
                                format="yaml",
                            )
                            assert fut is not None
                            await fut
                    except Exception:
                        print(
                            f"Exception {sys.exc_info()} while trying to dump cluster state."
                        )
    
                task.cancel()
                while not task.cancelled():
                    await asyncio.sleep(0.01)
>               raise TimeoutError(
                    f"Test timeout after {timeout}s.\n{buffer.getvalue()}"
                ) from e
E               asyncio.exceptions.TimeoutError: Test timeout after 30s.
E               Stack for <Task pending name='Task-64948' coro=<test_flight_cancelled_error() running at /home/runner/work/distributed/distributed/distributed/tests/test_cancelled_state.py:208> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f075f63eeb0>()]>> (most recent call last):
E                 File "/home/runner/work/distributed/distributed/distributed/tests/test_cancelled_state.py", line 208, in test_flight_cancelled_error
E                   await asyncio.sleep(0.01)

distributed/utils_test.py:1011: TimeoutError
----------------------------- Captured stderr call -----------------------------
distributed.worker - ERROR - 
Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 2914, in gather_dep
    response = await get_data_from_worker(
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/unittest/mock.py", line 2165, in _execute_mock_call
    result = await effect(*args, **kwargs)
  File "/home/runner/work/distributed/distributed/distributed/tests/test_cancelled_state.py", line 187, in wait_and_raise
    raise RuntimeError()
RuntimeError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/utils.py", line 653, in log_errors
    yield
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 2914, in gather_dep
    response = await get_data_from_worker(
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/unittest/mock.py", line 2165, in _execute_mock_call
    result = await effect(*args, **kwargs)
  File "/home/runner/work/distributed/distributed/distributed/tests/test_cancelled_state.py", line 187, in wait_and_raise
    raise RuntimeError()
RuntimeError
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f075df18dc0>>, <Task finished name='Task-64956' coro=<Worker.gather_dep() done, defined at /home/runner/work/distributed/distributed/distributed/worker.py:2858> exception=RuntimeError()>)
Traceback (most recent call last):
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 2914, in gather_dep
    response = await get_data_from_worker(
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/unittest/mock.py", line 2165, in _execute_mock_call
    result = await effect(*args, **kwargs)
  File "/home/runner/work/distributed/distributed/distributed/tests/test_cancelled_state.py", line 187, in wait_and_raise
    raise RuntimeError()
RuntimeError

cc @fjetter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants