Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConnectionClosedError during Dask Cluster Creation with k8s #878

Open
terrykong opened this issue Apr 2, 2024 · 1 comment
Open

ConnectionClosedError during Dask Cluster Creation with k8s #878

terrykong opened this issue Apr 2, 2024 · 1 comment
Labels

Comments

@terrykong
Copy link

Describe the issue:

When creating a cluster with dask_kubernetes where kr8s is pinned to 0.9.0, I see a ConnectionClosedError. The cluster still gets created when I check with kubectl, so I presume this is happening after the manifest is submitted to the API server

Minimal Complete Verifiable Example:

Setting up my environment

python3 -m venv venv
source venv/bin/activate
pip install dask_kubernetes==2024.4.0

This will error:

python -c 'from dask_kubernetes.operator.kubecluster import KubeCluster as K; K()'
Task exception was never retrieved
future: <Task finished name='Task-23' coro=<PortForward._sync_sockets() done, defined at /tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError('TCP
socket closed')])>
  + Exception Group Traceback (most recent call last):
  |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
    |     raise ConnectionClosedError("TCP socket closed")
    | kr8s._exceptions.ConnectionClosedError: TCP socket closed
    +------------------------------------
Task exception was never retrieved
future: <Task finished name='Task-21' coro=<PortForward._sync_sockets() done, defined at /tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError('TCP
socket closed')])>
  + Exception Group Traceback (most recent call last):
  |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
    |     raise ConnectionClosedError("TCP socket closed")
    | kr8s._exceptions.ConnectionClosedError: TCP socket closed
╭─────────────── Creating KubeCluster 'dask-terryk-e81a4dc3-a' ────────────────╮
│                                                                              │
│   DaskCluster                                                      Running   │
│   Scheduler Pod                                                    Running   │
│   Scheduler Service                                                Created   │
│   Default Worker Group                                             Created   │
│                                                                              │
│ ⠴ Waiting for scheduler service                                              │
╰──────────────────────────────────────────────────────────────────────────────╯
Task exception was never retrieved
future: <Task finished name='Task-56' coro=<PortForward._sync_sockets() done, defined at /tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError('TCP
socket closed')])>
  + Exception Group Traceback (most recent call last):
  |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
    |     raise ConnectionClosedError("TCP socket closed")
    | kr8s._exceptions.ConnectionClosedError: TCP socket closed
    +------------------------------------
Exception ignored in atexit callback: <function reap_clusters at 0x7f4406bb7250>
Traceback (most recent call last):
  File "/tmp/venv/lib/python3.10/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 1033, in reap_clusters
    asyncio.run(_reap_clusters())
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/tmp/venv/lib/python3.10/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 1031, in _reap_clusters
    cluster.close(timeout=10)
  File "/tmp/venv/lib/python3.10/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 700, in close
    return self.sync(self._close, timeout=timeout)
  File "/tmp/venv/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/tmp/venv/lib/python3.10/site-packages/distributed/utils.py", line 434, in sync
    raise error
  File "/tmp/venv/lib/python3.10/site-packages/distributed/utils.py", line 408, in f
    result = yield future
  File "/tmp/venv/lib/python3.10/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
  File "/tmp/venv/lib/python3.10/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 706, in _close
    cluster = await DaskCluster.get(self.name, namespace=self.namespace)
  File "/tmp/venv/lib/python3.10/site-packages/kr8s/_objects.py", line 177, in get
    resources = await api._get(
  File "/tmp/venv/lib/python3.10/site-packages/kr8s/_api.py", line 332, in _get
    async with self._get_kind(
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/contextlib.py", line 199, in __aenter__
    return await anext(self.gen)
  File "/tmp/venv/lib/python3.10/site-packages/kr8s/_api.py", line 261, in _get_kind
    async with self.call_api(
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/contextlib.py", line 199, in __aenter__
    return await anext(self.gen)
 File "/tmp/venv/lib/python3.10/site-packages/kr8s/_api.py", line 132, in call_api
    response = await self._session.request(**kwargs)
  File "/tmp/venv/lib/python3.10/site-packages/httpx/_client.py", line 1574, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/tmp/venv/lib/python3.10/site-packages/httpx/_client.py", line 1661, in send
    response = await self._send_handling_auth(
  File "/tmp/venv/lib/python3.10/site-packages/httpx/_client.py", line 1689, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/tmp/venv/lib/python3.10/site-packages/httpx/_client.py", line 1726, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/tmp/venv/lib/python3.10/site-packages/httpx/_client.py", line 1763, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/tmp/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 373, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
    raise exc from None
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 196, in handle_async_request
    response = await connection.handle_async_request(
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 99, in handle_async_request
    raise exc
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 76, in handle_async_request
    stream = await self._connect(request)
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 122, in _connect
    stream = await self._network_backend.connect_tcp(**kwargs)
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_backends/auto.py", line 30, in connect_tcp
    return await self._backend.connect_tcp(
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_backends/anyio.py", line 116, in connect_tcp
    stream: anyio.abc.ByteStream = await anyio.connect_tcp(
  File "/tmp/venv/lib/python3.10/site-packages/anyio/_core/_sockets.py", line 195, in connect_tcp
    gai_res = await getaddrinfo(
  File "/tmp/venv/lib/python3.10/site-packages/anyio/_core/_sockets.py", line 573, in getaddrinfo
    gai_res = await get_async_backend().getaddrinfo(
  File "/tmp/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 2370, in getaddrinfo
    return await get_running_loop().getaddrinfo(
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/asyncio/base_events.py", line 863, in getaddrinfo
    return await self.run_in_executor(
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/asyncio/base_events.py", line 821, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
Task exception was never retrieved
future: <Task finished name='Task-44' coro=<PortForward._sync_sockets() done, defined at /tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError('TCP
socket closed')])>
  + Exception Group Traceback (most recent call last):
  |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
    |     raise ConnectionClosedError("TCP socket closed")
    | kr8s._exceptions.ConnectionClosedError: TCP socket closed
    +------------------------------------
Task exception was never retrieved
future: <Task finished name='Task-46' coro=<PortForward._sync_sockets() done, defined at /tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError('TCP
socket closed')])>
  + Exception Group Traceback (most recent call last):
  |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
    |     raise ConnectionClosedError("TCP socket closed")
    | kr8s._exceptions.ConnectionClosedError: TCP socket closed
    +------------------------------------

But if I make the following modifications to my environment:

pip install -U kr8s==0.14.1
ls -1 venv/lib/python*/site-packages/dask_kubernetes/operator/_objects.py | xargs sed -i 's/self._refresh()/self.refresh()/g'

Then everything is fine:

python -c 'from dask_kubernetes.operator.kubecluster import KubeCluster as K; K()'
╭─────────────── Creating KubeCluster 'dask-terryk-c760868d-e' ────────────────╮
│                                                                              │
│   DaskCluster                                                      Running   │
│   Scheduler Pod                                                    Running   │
│   Scheduler Service                                                Created   │
│   Default Worker Group                                             Created   │
│                                                                              │
│ ⠼ Getting dashboard URL                                                      │
╰──────────────────────────────────────────────────────────────────────────────╯

Anything else we need to know?:

Environment:

  • Dask version: 2024.4.0 (dask-kubernetes)
  • Python version: 3.10.12
  • Operating System: Ubuntu 20.04.6 LTS
  • Install method (conda, pip, source): pip
@jacobtomlinson
Copy link
Member

jacobtomlinson commented Apr 3, 2024

Yeah this is a known noisy warning that was fixed in kr8s. This will be closed by #853.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants