Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test_worker_reconnects_mid_compute_multiple_states_on_scheduler flaky #5377

Closed
jrbourbeau opened this issue Oct 1, 2021 · 3 comments
Closed
Labels
flaky test Intermittent failures on CI.

Comments

@jrbourbeau
Copy link
Member

jrbourbeau commented Oct 1, 2021

distributed/tests/test_worker.py::test_worker_reconnects_mid_compute_multiple_states_on_scheduler has recently started sporadically failing with an asyncio.TimeoutError. See, for example, this CI build on main

Full traceback:
=================================== FAILURES ===================================
_______ test_worker_reconnects_mid_compute_multiple_states_on_scheduler ________

outer_args = (), kwargs = {}, result = None
coro = <function gen_cluster.<locals>._.<locals>.test_func.<locals>.coro at 0x7f4f961a7710>

    @functools.wraps(func)
    def test_func(*outer_args, **kwargs):
        result = None
        workers = []
        with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
    
            async def coro():
                with dask.config.set(config):
                    s = False
                    for _ in range(60):
                        try:
                            s, ws = await start_cluster(
                                nthreads,
                                scheduler,
                                loop,
                                security=security,
                                Worker=Worker,
                                scheduler_kwargs=scheduler_kwargs,
                                worker_kwargs=worker_kwargs,
                            )
                        except Exception as e:
                            logger.error(
                                "Failed to start gen_cluster: "
                                f"{e.__class__.__name__}: {e}; retrying",
                                exc_info=True,
                            )
                            await asyncio.sleep(1)
                        else:
                            workers[:] = ws
                            args = [s] + workers
                            break
                    if s is False:
                        raise Exception("Could not start cluster")
                    if client:
                        c = await Client(
                            s.address,
                            loop=loop,
                            security=security,
                            asynchronous=True,
                            **client_kwargs,
                        )
                        args = [c] + args
                    try:
                        future = func(*args, *outer_args, **kwargs)
                        future = asyncio.wait_for(future, timeout)
                        result = await future
                        if s.validate:
                            s.validate_state()
                    finally:
                        if client and c.status not in ("closing", "closed"):
                            await c._close(fast=s.status == Status.closed)
                        await end_cluster(s, workers)
                        await asyncio.wait_for(cleanup_global_workers(), 1)
    
                    try:
                        c = await default_client()
                    except ValueError:
                        pass
                    else:
                        await c._close(fast=True)
    
                    def get_unclosed():
                        return [c for c in Comm._instances if not c.closed()] + [
                            c
                            for c in _global_clients.values()
                            if c.status != "closed"
                        ]
    
                    try:
                        start = time()
                        while time() < start + 60:
                            gc.collect()
                            if not get_unclosed():
                                break
                            await asyncio.sleep(0.05)
                        else:
                            if allow_unclosed:
                                print(f"Unclosed Comms: {get_unclosed()}")
                            else:
                                raise RuntimeError("Unclosed Comms", get_unclosed())
                    finally:
                        Comm._instances.clear()
                        _global_clients.clear()
    
                    return result
    
            result = loop.run_sync(
>               coro, timeout=timeout * 2 if timeout else timeout
            )

distributed/utils_test.py:995: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/share/miniconda3/envs/dask-distributed/lib/python3.7/site-packages/tornado/ioloop.py:576: in run_sync
    return future_cell[0].result()
distributed/utils_test.py:953: in coro
    result = await future
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fut = <Task cancelled coro=<test_worker_reconnects_mid_compute_multiple_states_on_scheduler() done, defined at /home/runner/work/distributed/distributed/distributed/tests/test_worker.py:2525>>
timeout = 30

    async def wait_for(fut, timeout, *, loop=None):
        """Wait for the single Future or coroutine to complete, with timeout.
    
        Coroutine will be wrapped in Task.
    
        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().
    
        If the wait is cancelled, the task is also cancelled.
    
        This function is a coroutine.
        """
        if loop is None:
            loop = events.get_event_loop()
    
        if timeout is None:
            return await fut
    
        if timeout <= 0:
            fut = ensure_future(fut, loop=loop)
    
            if fut.done():
                return fut.result()
    
            fut.cancel()
            raise futures.TimeoutError()
    
        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)
    
        fut = ensure_future(fut, loop=loop)
Exception ignored in: <function Client.__del__ at 0x7f4ffdd6f5f0>
Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/client.py", line 1185, in __del__
    self.close()
  File "/home/runner/work/distributed/distributed/distributed/client.py", line 1398, in close
    if self.asynchronous:
  File "/home/runner/work/distributed/distributed/distributed/client.py", line 808, in asynchronous
    return self._asynchronous and self.loop is IOLoop.current()
AttributeError: 'Client' object has no attribute '_asynchronous'
        fut.add_done_callback(cb)
    
        try:
            # wait until the future completes or the timeout
            try:
                await waiter
            except futures.CancelledError:
                fut.remove_done_callback(cb)
                fut.cancel()
                raise
    
            if fut.done():
                return fut.result()
            else:
                fut.remove_done_callback(cb)
                # We must ensure that the task is not running
                # after wait_for() returns.
                # See https://bugs.python.org/issue32751
                await _cancel_and_wait(fut, loop=loop)
>               raise futures.TimeoutError()
E               concurrent.futures._base.TimeoutError

/usr/share/miniconda3/envs/dask-distributed/lib/python3.7/asyncio/tasks.py:449: TimeoutError
@jrbourbeau
Copy link
Member Author

Closed via #5436

@fjetter
Copy link
Member

fjetter commented Oct 22, 2021

This is still happening, see https://github.com/dask/distributed/runs/3969235806?check_suite_focus=true

I could trace this back to dropped payload messages in a batchedsend when a worker reconnects. PR upcoming

@fjetter
Copy link
Member

fjetter commented Feb 16, 2022

Duplicate of #5621

@fjetter fjetter marked this as a duplicate of #5621 Feb 16, 2022
@fjetter fjetter closed this as completed Feb 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flaky test Intermittent failures on CI.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants