Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RuntimeError: cannot schedule new futures after shutdown when using external Kubernetes cluster #707

Open
Artimi opened this issue May 3, 2023 · 4 comments
Labels

Comments

@Artimi
Copy link
Contributor

Artimi commented May 3, 2023

Describe the issue:

Hi,

I'm encountering the following error when trying to run Dask jobs on an external Kubernetes cluster:

RuntimeError: cannot schedule new futures after shutdown

However, when I use a local Kubernetes cluster like rancher-desktop, everything works as expected without any RuntimeError.

I suspect that the issue might be related to the getaddrinfo call, which utilizes ThreadPoolExecutor for asynchronous operation. It seems that the default ThreadPoolExecutor might already be closed by the time it's called, leading to the error.

Minimal Complete Verifiable Example:

from dask_kubernetes.operator import KubeCluster
import time
import random
import joblib


def square(x):
    time.sleep(random.expovariate(1.5))
    return x**2


def main():
    cluster = KubeCluster(
        name="my-dask-cluster",
        image="ghcr.io/dask/dask:2023.3.2-py3.11",
        env={"EXTRA_PIP_PACKAGES": "joblib"},
    )
    print("Cluster created")
    cluster.scale(1)
    client = cluster.get_client()
    print("Client", client)
    joblib.parallel_backend(
        "dask", client=client, pure=False, wait_for_workers_timeout=60
    )

    results = joblib.Parallel(n_jobs=2)(
        joblib.delayed(square)(arg) for arg in range(10)
    )
    print(results)


if __name__ == "__main__":
    main()

This results in

$ python k8s_cluster.py
╭─────────────────── Creating KubeCluster 'my-dask-cluster' ───────────────────╮
│                                                                              │
│   DaskCluster                                                      Running   │
│   Scheduler Pod                                                    Running   │
│   Scheduler Service                                                Created   │
│   Default Worker Group                                             Created   │
│                                                                              │
│ ⠏ Getting dashboard URL                                                      │
╰──────────────────────────────────────────────────────────────────────────────╯
Cluster created
/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/distributed/client.py:1381: VersionMismatchWarning: Mismatched versions found

+-------------+----------------+----------------+---------+
| Package     | Client         | Scheduler      | Workers |
+-------------+----------------+----------------+---------+
| distributed | 2023.3.2.1     | 2023.3.2       | None    |
| lz4         | None           | 4.3.2          | None    |
| python      | 3.11.3.final.0 | 3.11.0.final.0 | None    |
+-------------+----------------+----------------+---------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
Client <Client: 'tcp://10.18.148.16:8786' processes=0 threads=0, memory=0 B>
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Exception ignored in atexit callback: <function reap_clusters at 0x10cc33ba0>
Traceback (most recent call last):
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 1056, in reap_clusters
    asyncio.run(_reap_clusters())
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 1054, in _reap_clusters
    cluster.close(timeout=10)
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 702, in close
    return self.sync(self._close, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/distributed/utils.py", line 349, in sync
    return sync(
           ^^^^^
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/distributed/utils.py", line 416, in sync
    raise exc.with_traceback(tb)
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/distributed/utils.py", line 389, in f
    result = yield future
             ^^^^^^^^^^^^
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
            ^^^^^^^^^^^^^^^
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 710, in _close
    await custom_objects_api.delete_namespaced_custom_object(
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/kubernetes_asyncio/client/api_client.py", line 185, in __call_api
    response_data = await self.request(
                    ^^^^^^^^^^^^^^^^^^^
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 220, in DELETE
    return (await self.request("DELETE", url,
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 177, in request
    r = await self.pool_manager.request(**args)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/aiohttp/client.py", line 536, in _request
    conn = await self._connector.connect(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/aiohttp/connector.py", line 540, in connect
    proto = await self._create_connection(req, traces, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/aiohttp/connector.py", line 901, in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/aiohttp/connector.py", line 1152, in _create_direct_connection
    hosts = await asyncio.shield(host_resolved)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/aiohttp/connector.py", line 874, in _resolve_host
    addrs = await self._resolver.resolve(host, port, family=self._family)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/Artimi/playground/dask/.venv/lib/python3.11/site-packages/aiohttp/resolver.py", line 33, in resolve
    infos = await self._loop.getaddrinfo(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 867, in getaddrinfo
    return await self.run_in_executor(
                 ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 829, in run_in_executor
    executor.submit(func, *args), loop=self)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

Anything else we need to know?:

Environment:

  • Dask version: 2023.3.2
  • Python version: 3.11
  • Operating System: MacOS - host, ghcr.io/dask/dask:2023.3.2-py3.11 - image
  • Install method (conda, pip, source): pip
@jacobtomlinson
Copy link
Member

jacobtomlinson commented May 4, 2023

Thanks for raising this. It definitely looks related to the cluster-reaping finalizer using things that may have already been shut down.

In the short term a workaround would be to either manually close the cluster yourself, or to use KubeCluster in a context manager.

def main():
    cluster = KubeCluster(
        name="my-dask-cluster",
        image="ghcr.io/dask/dask:2023.3.2-py3.11",
        env={"EXTRA_PIP_PACKAGES": "joblib"},
    )
    print("Cluster created")
    cluster.scale(1)
    client = cluster.get_client()
    print("Client", client)
    joblib.parallel_backend(
        "dask", client=client, pure=False, wait_for_workers_timeout=60
    )

    results = joblib.Parallel(n_jobs=2)(
        joblib.delayed(square)(arg) for arg in range(10)
    )
    print(results)
+   client.close()
+   cluster.close()

@Artimi
Copy link
Contributor Author

Artimi commented May 5, 2023

Thank you for the quick response! Yes, this help in this example.
Unfortunately, in the real code, we are using atexit too to close the client and now also the cluster but it ends with the same exception

def main():
    cluster = KubeCluster(
        name="my-dask-cluster",
        image="ghcr.io/dask/dask:2023.3.2-py3.11",
        namespace="ana-ixian",
        env={"EXTRA_PIP_PACKAGES": "joblib"},
        shutdown_on_close=True,
    )
    print("Cluster created")
    cluster.scale(1)
    client = cluster.get_client()
    print("Client", client)
    joblib.parallel_backend(
        "dask", client=client, pure=False, wait_for_workers_timeout=60
    )

    results = joblib.Parallel(n_jobs=2)(
        joblib.delayed(square)(arg) for arg in range(10)
    )
    print(results)
    client.close()
    atexit.register(cluster.close)

Exception

Exception ignored in atexit callback: <bound method KubeCluster.close of KubeCluster(my-dask-cluster, 'tcp://localhost:57174', workers=1, threads=16, memory=125.80 GiB)>
Traceback (most recent call last):
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 702, in close
    return self.sync(self._close, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/distributed/utils.py", line 349, in sync
    return sync(
           ^^^^^
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/distributed/utils.py", line 416, in sync
    raise exc.with_traceback(tb)
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/distributed/utils.py", line 389, in f
    result = yield future
             ^^^^^^^^^^^^
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
            ^^^^^^^^^^^^^^^
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 710, in _close
    await custom_objects_api.delete_namespaced_custom_object(
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/kubernetes_asyncio/client/api_client.py", line 185, in __call_api
    response_data = await self.request(
                    ^^^^^^^^^^^^^^^^^^^
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 220, in DELETE
    return (await self.request("DELETE", url,
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 177, in request
    r = await self.pool_manager.request(**args)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/aiohttp/client.py", line 536, in _request
    conn = await self._connector.connect(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/aiohttp/connector.py", line 540, in connect
    proto = await self._create_connection(req, traces, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/aiohttp/connector.py", line 901, in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/aiohttp/connector.py", line 1152, in _create_direct_connection
    hosts = await asyncio.shield(host_resolved)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/aiohttp/connector.py", line 874, in _resolve_host
    addrs = await self._resolver.resolve(host, port, family=self._family)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/petrsebek/playground/dask/.venv/lib/python3.11/site-packages/aiohttp/resolver.py", line 33, in resolve
    infos = await self._loop.getaddrinfo(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 867, in getaddrinfo
    return await self.run_in_executor(
                 ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 829, in run_in_executor
    executor.submit(func, *args), loop=self)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

I guess we just can't use atexit for the cleanup here.

@Artimi
Copy link
Contributor Author

Artimi commented May 5, 2023

I think that we could bypass the ThreadedResolver and use AsyncResolver in aiohttp which should remove the problem because the thread would not be used at all.

However, to use it I would need to be able to use my connector in aiohttp.ClientSession (see aio-libs/aiohttp#2228 (comment)) but that's not possible since kubernetes_asyncio creates that on its own https://github.com/tomplus/kubernetes_asyncio/blob/master/kubernetes_asyncio/client/rest.py#L63

@jacobtomlinson
Copy link
Member

I guess we just can't use atexit for the cleanup here.

It's a little risky because you can't guarantee that asyncio things will work correctly at that point. I would generally advise against what you are doing.

However, another workaround could be to use the asyncio API here and use asyncio-atexit to ensure things get closed before the event loop closes.

import asyncio
from dask_kubernetes.operator import KubeCluster
import time
import random
import joblib
import asyncio_atexit


def square(x):
    time.sleep(random.expovariate(1.5))
    return x**2


async def main():
    cluster = KubeCluster(
        name="my-dask-cluster",
        image="ghcr.io/dask/dask:2023.3.2-py3.11",
        env={"EXTRA_PIP_PACKAGES": "joblib"},
        asynchronous=True,
    )
    print("Cluster created")
    await cluster.scale(1)
    client = cluster.get_client()  # Not sure how this will behave off the top of my head
    print("Client", client)
    joblib.parallel_backend(
        "dask", client=client, pure=False, wait_for_workers_timeout=60
    )

    results = joblib.Parallel(n_jobs=2)(
        joblib.delayed(square)(arg) for arg in range(10)
    )
    print(results)
    asyncio_atexit.register(cluster.close)


if __name__ == "__main__":
    asyncio.run(main())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants