Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributed.comm.tests.test_ucx_config.test_ucx_config_w_env_var flaky #5229

Closed
fjetter opened this issue Aug 18, 2021 · 12 comments · Fixed by rapidsai/ucx-py#994 or #8272
Closed

distributed.comm.tests.test_ucx_config.test_ucx_config_w_env_var flaky #5229

fjetter opened this issue Aug 18, 2021 · 12 comments · Fixed by rapidsai/ucx-py#994 or #8272
Labels
flaky test Intermittent failures on CI.

Comments

@fjetter
Copy link
Member

fjetter commented Aug 18, 2021

I noticed the UCX test distributed.comm.tests.test_ucx_config.test_ucx_config_w_env_var to occasionally fail

E.g. https://gpuci.gpuopenanalytics.com/job/dask/job/distributed/job/prb/job/distributed-prb/234/CUDA_VER=11.2,LINUX_VER=ubuntu18.04,PYTHON_VER=3.8,RAPIDS_VER=21.10/testReport/junit/distributed.comm.tests/test_ucx_config/test_ucx_config_w_env_var/

Traceback
self = <Client: scheduler='ucx://172.17.0.3:13339'>

    async def _handle_report(self):
        """Listen to scheduler"""
        with log_errors():
            try:
                while True:
                    if self.scheduler_comm is None:
                        break
                    try:
>                       msgs = await self.scheduler_comm.comm.read()

distributed/client.py:1222: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <UCX Client->Scheduler local=None remote=ucx://172.17.0.3:13339>
deserializers = ('cuda', 'dask', 'pickle', 'error')

    async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
        with log_errors():
            if self.closed():
                raise CommClosedError("Endpoint is closed -- unable to read message")
    
            if deserializers is None:
                deserializers = ("cuda", "dask", "pickle", "error")
    
            try:
                # Recv meta data
    
                # Recv close flag and number of frames (_Bool, int64)
                msg = host_array(struct.calcsize("?Q"))
                await self.ep.recv(msg)
                (shutdown, nframes) = struct.unpack("?Q", msg)
    
                if shutdown:  # The writer is closing the connection
>                   raise CommClosedError("Connection closed by writer")
E                   distributed.comm.core.CommClosedError: Connection closed by writer

distributed/comm/ucx.py:255: CommClosedError

During handling of the above exception, another exception occurred:

cleanup = None
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f872c175550>
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7f87b936b2e0>

    def test_ucx_config_w_env_var(cleanup, loop, monkeypatch):
        size = "1000.00 MB"
        monkeypatch.setenv("DASK_RMM__POOL_SIZE", size)
    
        dask.config.refresh()
    
        port = "13339"
        sched_addr = f"ucx://{HOST}:{port}"
    
        with popen(
            ["dask-scheduler", "--no-dashboard", "--protocol", "ucx", "--port", port]
        ) as sched:
            with popen(
                [
                    "dask-worker",
                    sched_addr,
                    "--no-dashboard",
                    "--protocol",
                    "ucx",
                    "--no-nanny",
                ]
            ) as w:
                with Client(sched_addr, loop=loop, timeout=10) as c:
                    while not c.scheduler_info()["workers"]:
                        sleep(0.1)
    
                    # Check for RMM pool resource type
                    rmm_resource = c.run_on_scheduler(
                        rmm.mr.get_current_device_resource_type
                    )
                    assert rmm_resource == rmm.mr.PoolMemoryResource
    
                    worker_addr = list(c.scheduler_info()["workers"])[0]
                    worker_rmm_usage = c.run(rmm.mr.get_current_device_resource_type)
                    rmm_resource = worker_rmm_usage[worker_addr]
>                   assert rmm_resource == rmm.mr.PoolMemoryResource

distributed/comm/tests/test_ucx_config.py:118: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:1187: in __exit__
    self.close()
distributed/client.py:1430: in close
    sync(self.loop, self._close, fast=True, callback_timeout=timeout)
distributed/utils.py:325: in sync
    raise exc.with_traceback(tb)
distributed/utils.py:308: in f
    result[0] = yield future
/opt/conda/envs/dask/lib/python3.8/site-packages/tornado/gen.py:762: in run
    value = future.result()
/opt/conda/envs/dask/lib/python3.8/asyncio/tasks.py:494: in wait_for
    return fut.result()
distributed/client.py:1339: in _close
    await asyncio.wait_for(
/opt/conda/envs/dask/lib/python3.8/asyncio/tasks.py:494: in wait_for
    return fut.result()
distributed/client.py:1228: in _handle_report
    await self._reconnect()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Client: scheduler='ucx://172.17.0.3:13339'>

    async def _reconnect(self):
        with log_errors():
>           assert self.scheduler_comm.comm.closed()
E           AssertionError

distributed/client.py:1053: AssertionError

cc @dask/gpu

@pentschev
Copy link
Member

Thanks for reporting @fjetter , I'll see if I can reproduce it and fix on my end.

@jrbourbeau
Copy link
Member

It looks like test_ucx_config_w_env_var has started to fail again. For an example, see this gpuCI build from #5327.

@jakirkham
Copy link
Member

It looks like there are some other things happening in that CI build. In particular 'subscribe-topic' appears to be added in PR ( #5217 ) from 8 days ago, which shows up in stderr logging output. Are we sure there is not a different bug here? @fjetter do you know what's going on here?

14:31:10 distributed.core - ERROR - 'subscribe-topic'
14:31:10 Traceback (most recent call last):
14:31:10   File "/opt/conda/envs/dask/lib/python3.8/site-packages/distributed/core.py", line 570, in handle_stream
14:31:10     handler = self.stream_handlers[op]
14:31:10 KeyError: 'subscribe-topic'

@fjetter
Copy link
Member Author

fjetter commented Sep 28, 2021

I would need more than five lines of traceback. My first guess would be that this is a version mismatch but I need more context information

@pentschev
Copy link
Member

You can take a look at the gpuCI build link that @jrbourbeau posted above, specifically lines 1699-1751. But indeed there's a mismatched version error starting in line 1820 of the same build. Maybe this was just a temporary failure, could you confirm if that happened again since that day @jrbourbeau ?

@jrbourbeau
Copy link
Member

could you confirm if that happened again since that day @jrbourbeau ?

I don't recall seeing this failure recently, but I also haven't been tracking too closely. I'll keep an eye out over the next few days.

@fjetter
Copy link
Member Author

fjetter commented Oct 26, 2021

I got another one here https://gpuci.gpuopenanalytics.com/job/dask/job/distributed/job/prb/job/distributed-prb/707/CUDA_VER=11.2,LINUX_VER=ubuntu18.04,PYTHON_VER=3.8,RAPIDS_VER=21.12/testReport/junit/distributed.comm.tests/test_ucx_config/test_ucx_config_w_env_var/

This time with a different exception

    async def connect(self, address: str, deserialize=True, **connection_args) -> UCX:
        logger.debug("UCXConnector.connect: %s", address)
        ip, port = parse_host_port(address)
        init_once()
        try:
            ep = await ucx_create_endpoint(ip, port)
>       except (ucp.exceptions.UCXCloseError, ucp.exceptions.UCXCanceled,) + (
            getattr(ucp.exceptions, "UCXConnectionReset", ()),
            getattr(ucp.exceptions, "UCXNotConnected", ()),
        ):
E       TypeError: catching classes that do not inherit from BaseException is not allowed

over in #5441

@pentschev
Copy link
Member

@fjetter have you seen this more recently as well? I can't reproduce this locally and that build was from a time where UCX-Py may have been broken but fixed already in rapidsai/ucx-py#801 . With that said, I believe this should not be an issue anymore, but please ping me again if it does show up.

@charlesbluca
Copy link
Member

Bumping this issue because I ran into a timeout on one of my gpuCI runs which seems to have been caused by this test:

16:20:04 distributed/comm/tests/test_ucx_config.py::test_ucx_config PASSED        [ 10%]

16:25:04 distributed/comm/tests/test_ucx_config.py::test_ucx_config_w_env_var 

16:25:04 +++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++

16:25:04 

16:25:04 ~~~~~~~~~~~~~~~~~~~~~~ Stack of IO loop (140418690385664) ~~~~~~~~~~~~~~~~~~~~~~

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 890, in _bootstrap

16:25:04     self._bootstrap_inner()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 932, in _bootstrap_inner

16:25:04     self.run()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 870, in run

16:25:04     self._target(*self._args, **self._kwargs)

16:25:04   File "/workspace/distributed/utils.py", line 443, in run_loop

16:25:04     loop.start()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/tornado/platform/asyncio.py", line 199, in start

16:25:04     self.asyncio_loop.run_forever()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/asyncio/base_events.py", line 570, in run_forever

16:25:04     self._run_once()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once

16:25:04     event_list = self._selector.select(timeout)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/selectors.py", line 468, in select

16:25:04     fd_event_list = self._selector.poll(timeout, max_ev)

16:25:04 

16:25:04 ~~~~~~~~~~~~~~~~ Stack of TCP-Executor-155-1 (140419415742208) ~~~~~~~~~~~~~~~~~

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 890, in _bootstrap

16:25:04     self._bootstrap_inner()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 932, in _bootstrap_inner

16:25:04     self.run()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 870, in run

16:25:04     self._target(*self._args, **self._kwargs)

16:25:04   File "/workspace/distributed/threadpoolexecutor.py", line 51, in _worker

16:25:04     task = work_queue.get(timeout=1)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/queue.py", line 179, in get

16:25:04     self.not_empty.wait(remaining)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 306, in wait

16:25:04     gotit = waiter.acquire(True, timeout)

16:25:04 

16:25:04 ~~~~~~~~~~~~~~~~ Stack of TCP-Executor-155-0 (140416652572416) ~~~~~~~~~~~~~~~~~

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 890, in _bootstrap

16:25:04     self._bootstrap_inner()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 932, in _bootstrap_inner

16:25:04     self.run()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 870, in run

16:25:04     self._target(*self._args, **self._kwargs)

16:25:04   File "/workspace/distributed/threadpoolexecutor.py", line 51, in _worker

16:25:04     task = work_queue.get(timeout=1)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/queue.py", line 179, in get

16:25:04     self.not_empty.wait(remaining)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 306, in wait

16:25:04     gotit = waiter.acquire(True, timeout)

16:25:04 

16:25:04 ~~~~~~~~~~~~~~~~~~ Stack of Dask-Offload_0 (140416358958848) ~~~~~~~~~~~~~~~~~~~

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 890, in _bootstrap

16:25:04     self._bootstrap_inner()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 932, in _bootstrap_inner

16:25:04     self.run()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 870, in run

16:25:04     self._target(*self._args, **self._kwargs)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/concurrent/futures/thread.py", line 78, in _worker

16:25:04     work_item = work_queue.get(block=True)

16:25:04 

16:25:04 ~~~~~~~~~~~~~~~~~~~~ Stack of MainThread (140421816862528) ~~~~~~~~~~~~~~~~~~~~~

16:25:04   File "/opt/conda/envs/dask/bin/py.test", line 11, in <module>

16:25:04     sys.exit(console_main())

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/config/__init__.py", line 185, in console_main

16:25:04     code = main()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/config/__init__.py", line 162, in main

16:25:04     ret: Union[ExitCode, int] = config.hook.pytest_cmdline_main(

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_hooks.py", line 265, in __call__

16:25:04     return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_manager.py", line 80, in _hookexec

16:25:04     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_callers.py", line 39, in _multicall

16:25:04     res = hook_impl.function(*args)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/main.py", line 316, in pytest_cmdline_main

16:25:04     return wrap_session(config, _main)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/main.py", line 269, in wrap_session

16:25:04     session.exitstatus = doit(config, session) or 0

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/main.py", line 323, in _main

16:25:04     config.hook.pytest_runtestloop(session=session)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_hooks.py", line 265, in __call__

16:25:04     return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_manager.py", line 80, in _hookexec

16:25:04     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_callers.py", line 39, in _multicall

16:25:04     res = hook_impl.function(*args)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/main.py", line 348, in pytest_runtestloop

16:25:04     item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_hooks.py", line 265, in __call__

16:25:04     return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_manager.py", line 80, in _hookexec

16:25:04     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_callers.py", line 39, in _multicall

16:25:04     res = hook_impl.function(*args)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pytest_rerunfailures.py", line 497, in pytest_runtest_protocol

16:25:04     reports = runtestprotocol(item, nextitem=nextitem, log=False)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/runner.py", line 126, in runtestprotocol

16:25:04     reports.append(call_and_report(item, "call", log))

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/runner.py", line 215, in call_and_report

16:25:04     call = call_runtest_hook(item, when, **kwds)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/runner.py", line 254, in call_runtest_hook

16:25:04     return CallInfo.from_call(

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/runner.py", line 311, in from_call

16:25:04     result: Optional[TResult] = func()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/runner.py", line 255, in <lambda>

16:25:04     lambda: ihook(item=item, **kwds), when=when, reraise=reraise

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_hooks.py", line 265, in __call__

16:25:04     return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_manager.py", line 80, in _hookexec

16:25:04     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_callers.py", line 39, in _multicall

16:25:04     res = hook_impl.function(*args)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/runner.py", line 162, in pytest_runtest_call

16:25:04     item.runtest()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/python.py", line 1641, in runtest

16:25:04     self.ihook.pytest_pyfunc_call(pyfuncitem=self)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_hooks.py", line 265, in __call__

16:25:04     return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_manager.py", line 80, in _hookexec

16:25:04     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/pluggy/_callers.py", line 39, in _multicall

16:25:04     res = hook_impl.function(*args)

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/site-packages/_pytest/python.py", line 183, in pytest_pyfunc_call

16:25:04     result = testfunction(**testargs)

16:25:04   File "/workspace/distributed/comm/tests/test_ucx_config.py", line 122, in test_ucx_config_w_env_var

16:25:04     with Client(sched_addr, loop=loop, timeout=10) as c:

16:25:04   File "/workspace/distributed/client.py", line 923, in __init__

16:25:04     self.start(timeout=timeout)

16:25:04   File "/workspace/distributed/client.py", line 1074, in start

16:25:04     self._loop_runner.start()

16:25:04   File "/workspace/distributed/utils.py", line 415, in start

16:25:04     self._start_unlocked()

16:25:04   File "/workspace/distributed/utils.py", line 449, in _start_unlocked

16:25:04     thread = threading.Thread(target=run_loop, name="IO loop")

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 799, in __init__

16:25:04     self._started = Event()

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 506, in __init__

16:25:04     self._cond = Condition(Lock())

16:25:04   File "/opt/conda/envs/dask/lib/python3.8/threading.py", line 233, in __init__

16:25:04     self._release_save = lock._release_save

16:25:04   File "/workspace/distributed/client.py", line 1339, in __del__

16:25:04     self.close()

16:25:04   File "/workspace/distributed/client.py", line 1584, in close

16:25:04     self._loop_runner.stop()

16:25:04   File "/workspace/distributed/utils.py", line 479, in stop

16:25:04     with self._lock:

16:25:04 

16:25:04 +++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++

@jakirkham
Copy link
Member

Also saw the same thing as Charles here

@fjetter fjetter changed the title distributed.comm.tests.test_ucx_config.test_ucx_config_w_env_var flaky distributed.comm.tests.test_ucx_config.test_ucx_config_w_env_var flaky Feb 16, 2022
@fjetter fjetter mentioned this issue Feb 16, 2022
51 tasks
@crusaderky
Copy link
Collaborator

This is failing pretty much at all times now. I'm xfailing it for the time being.

@pentschev
Copy link
Member

rapidsai/ucx-py#994 will hopefully fix the root of this issue.

rapids-bot bot pushed a commit to rapidsai/ucx-py that referenced this issue Oct 11, 2023
After some debugging of Distributed tests with UCX it was observed that sometimes `exchange_peer_info` hangs indefinitely, specifically when executing `stream_recv` on the client side. The causes for this is unknown but believed to be due to messages being lost if there's either multiple stream messages being transferred simultaneously among various endpoints or being lost due to the receiving end taking too long to launch `stream_recv`, see #509 for a similar issue related to stream API. By adding a timeout doesn't allow recovery, but at least allows a UCX-Py client to retry upon failure to establish the endpoint.

This change seems to resolve dask/distributed#5229, at least it isn't reproducible locally with this change.

Additionally do a roundtrip message transfer for `test_send_recv_am, which should resolve #797 and seems to be caused by checking received messages too early, before they are actually received by the listener. A roundtrip ensures the client receives the reply and thus prevents us from the checking for a transfer that didn't complete yet.

Ensure now also that the listener is closed before validating `test_close_callback` conditions, which was also flaky.

Finally, ensure we close the loop in test fixture, thus preventing `DeprecationWarning`s from pytest-asyncio
which currently closes unclosed event loop but will stop doing that in future releases.

Closes #797

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Ray Douglass (https://github.com/raydouglass)
  - Charles Blackmon-Luca (https://github.com/charlesbluca)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #994
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flaky test Intermittent failures on CI.
Projects
None yet
6 participants