Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Avoid is_python_shutting_down #8492

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 7 additions & 4 deletions distributed/client.py
Expand Up @@ -100,7 +100,6 @@
format_dashboard_link,
has_keyword,
import_term,
is_python_shutting_down,
log_errors,
nbytes,
sync,
Expand Down Expand Up @@ -207,6 +206,8 @@
Client: Creates futures
"""

_is_finalizing: staticmethod[[], bool] = staticmethod(sys.is_finalizing)

_cb_executor = None
_cb_executor_pid = None

Expand Down Expand Up @@ -507,7 +508,7 @@
except AttributeError:
# Occasionally we see this error when shutting down the client
# https://github.com/dask/distributed/issues/4305
if not is_python_shutting_down():
if not self._is_finalizing():

Check warning on line 511 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L511

Added line #L511 was not covered by tests
raise
except RuntimeError: # closed event loop
pass
Expand Down Expand Up @@ -830,6 +831,8 @@
distributed.LocalCluster:
"""

_is_finalizing: staticmethod[[], bool] = staticmethod(sys.is_finalizing)

_instances: ClassVar[weakref.WeakSet[Client]] = weakref.WeakSet()

_default_event_handlers = {"print": _handle_print, "warn": _handle_warn}
Expand Down Expand Up @@ -1553,7 +1556,7 @@
try:
msgs = await self.scheduler_comm.comm.read()
except CommClosedError:
if is_python_shutting_down():
if self._is_finalizing():
return
if self.status == "running":
if self.cluster and self.cluster.status in (
Expand Down Expand Up @@ -1786,7 +1789,7 @@

assert self.status == "closed"

if not is_python_shutting_down():
if not self._is_finalizing():
self._loop_runner.stop()

async def _shutdown(self):
Expand Down
11 changes: 8 additions & 3 deletions distributed/comm/inproc.py
Expand Up @@ -4,6 +4,7 @@
import itertools
import logging
import os
import sys
import threading
import weakref
from collections import deque, namedtuple
Expand All @@ -14,7 +15,7 @@
from distributed.comm.core import BaseListener, Comm, CommClosedError, Connector
from distributed.comm.registry import Backend, backends
from distributed.protocol import nested_deserialize
from distributed.utils import get_ip, is_python_shutting_down
from distributed.utils import get_ip

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -187,9 +188,13 @@ def _get_finalizer(self):
r = repr(self)

def finalize(
read_q=self._read_q, write_q=self._write_q, write_loop=self._write_loop, r=r
read_q=self._read_q,
write_q=self._write_q,
write_loop=self._write_loop,
is_finalizing=sys.is_finalizing,
r=r,
):
if read_q.peek(None) is _EOF or is_python_shutting_down():
if read_q.peek(None) is _EOF or is_finalizing():
return
logger.warning(f"Closing dangling queue in {r}")
write_loop.add_callback(write_q.put_nowait, _EOF)
Expand Down
7 changes: 4 additions & 3 deletions distributed/core.py
Expand Up @@ -54,7 +54,6 @@
get_traceback,
has_keyword,
import_file,
is_python_shutting_down,
iscoroutinefunction,
offload,
recursive_to_dict,
Expand Down Expand Up @@ -319,6 +318,8 @@ class Server:

"""

_is_finalizing: staticmethod[[], bool] = staticmethod(sys.is_finalizing)

default_ip: ClassVar[str] = ""
default_port: ClassVar[int] = 0

Expand Down Expand Up @@ -900,7 +901,7 @@ async def _handle_comm(self, comm: Comm) -> None:
msg = await comm.read()
logger.debug("Message from %r: %s", address, msg)
except OSError as e:
if not is_python_shutting_down():
if not self._is_finalizing():
logger.debug(
"Lost connection to %r while reading message: %s."
" Last operation: %s",
Expand Down Expand Up @@ -1004,7 +1005,7 @@ async def _handle_comm(self, comm: Comm) -> None:

finally:
del self._comms[comm]
if not is_python_shutting_down() and not comm.closed():
if not self._is_finalizing() and not comm.closed():
try:
comm.abort()
except Exception as e:
Expand Down
3 changes: 1 addition & 2 deletions distributed/scheduler.py
Expand Up @@ -118,7 +118,6 @@
TimeoutError,
format_dashboard_link,
get_fileno_limit,
is_python_shutting_down,
key_split_group,
log_errors,
offload,
Expand Down Expand Up @@ -5607,7 +5606,7 @@ async def add_client(
if not comm.closed():
self.client_comms[client].send({"op": "stream-closed"})
try:
if not is_python_shutting_down():
if not self._is_finalizing():
await self.client_comms[client].close()
del self.client_comms[client]
if self.status == Status.running:
Expand Down
3 changes: 1 addition & 2 deletions distributed/worker.py
Expand Up @@ -101,7 +101,6 @@
get_ip,
has_arg,
in_async_call,
is_python_shutting_down,
iscoroutinefunction,
json_load_robust,
log_errors,
Expand Down Expand Up @@ -1632,7 +1631,7 @@ def _close(executor, wait):
# weird deadlocks particularly if the task that is executing in
# the thread is waiting for a server reply, e.g. when using
# worker clients, semaphores, etc.
if is_python_shutting_down():
if self._is_finalizing():
# If we're shutting down there is no need to wait for daemon
# threads to finish
_close(executor=executor, wait=False)
Expand Down