From 5d9577a40a32eb48d14e26159ae62e424b1fa418 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Sat, 11 Dec 2021 21:56:49 +0000 Subject: [PATCH] to_dict changes --- distributed/utils_test.py | 68 ++++++++++++++++++++++++++++----------- distributed/worker.py | 23 ++++++++++--- 2 files changed, 67 insertions(+), 24 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index cc1e3994124..4bb31d7d1cf 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -40,6 +40,7 @@ ssl = None # type: ignore import pytest +import yaml from tlz import assoc, memoize, merge from tornado import gen from tornado.ioloop import IOLoop @@ -49,6 +50,7 @@ from distributed.comm.tcp import TCP from . import system +from . import versions as version_module from .client import Client, _global_clients, default_client from .comm import Comm from .compatibility import WINDOWS @@ -990,26 +992,14 @@ async def coro(): # This stack indicates where the coro/test is suspended task.print_stack(file=buffer) - if client: - assert c + if cluster_dump_directory: try: - if cluster_dump_directory: - os.makedirs( - cluster_dump_directory, exist_ok=True - ) - filename = os.path.join( - cluster_dump_directory, func.__name__ - ) - fut = c.dump_cluster_state( - filename, - # Test dumps should be small enough that - # there is no need for a compressed - # binary representation and readability - # is more important - format="yaml", - ) - assert fut is not None - await fut + await dump_cluster_state( + s, + ws, + output_dir=cluster_dump_directory, + func_name=func.__name__, + ) except Exception: print( f"Exception {sys.exc_info()} while trying to " @@ -1096,6 +1086,46 @@ def get_unclosed(): return _ +async def dump_cluster_state( + s: Scheduler, ws: list[ServerNode], output_dir: str, func_name: str +) -> None: + """A variant of Client.dump_cluster_state, which does not expect on any of the below + to work: + + - individual Workers + - Client->Scheduler comms + - Scheduler->Worker comms (unless using Nannies) + """ + scheduler_info = s._to_dict() + versions_info = version_module.get_versions() + + if not ws or isinstance(ws[0], Worker): + workers_info = {w.address: w._to_dict() for w in ws} + else: + # Variant of s.broadcast() that deals with unresponsive workers + async def safe_broadcast(addr: str) -> dict: + try: + return await s.broadcast(msg={"op": "dump_state"}, workers=[addr]) + except Exception: + msg = f"Exception {sys.exc_info()} while trying to dump worker state" + return {addr: msg} + + workers_info = merge( + await asyncio.gather(safe_broadcast(w.address) for w in ws) + ) + + state = { + "scheduler": scheduler_info, + "workers": workers_info, + "versions": versions_info, + } + os.makedirs(output_dir, exist_ok=True) + fname = os.path.join(output_dir, func_name) + ".yaml" + with open(fname, "w") as fh: + yaml.dump(state, fh) + print(f"Dumped cluster state to {fname}") + + def raises(func, exc=Exception): try: func() diff --git a/distributed/worker.py b/distributed/worker.py index 46cc45eac14..dc198e878ad 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -127,6 +127,8 @@ SerializedTask = namedtuple("SerializedTask", ["function", "args", "kwargs", "task"]) +_taskstate_to_dict_guard = False + class InvalidTransition(Exception): pass @@ -226,7 +228,7 @@ def get_nbytes(self) -> int: nbytes = self.nbytes return nbytes if nbytes is not None else DEFAULT_DATA_SIZE - def _to_dict(self, *, exclude: Container[str] = ()) -> dict: + def _to_dict(self, *, exclude: Container[str] = ()) -> dict | str: """ A very verbose dictionary representation for debugging purposes. Not type stable and not inteded for roundtrips. @@ -241,10 +243,21 @@ def _to_dict(self, *, exclude: Container[str] = ()) -> dict: -------- Client.dump_cluster_state """ - return recursive_to_dict( - {k: v for k, v in self.__dict__.items() if k not in exclude}, - exclude=exclude, - ) + # When a task references another task, just print the task repr. All tasks + # should neatly appear under Worker.tasks. This also prevents a RecursionError + # during particularly heavy loads, which have been observed to happen whenever + # there's an acyclic dependency chain of ~200+ tasks. + global _taskstate_to_dict_guard + if _taskstate_to_dict_guard: + return repr(self) + _taskstate_to_dict_guard = True + try: + return recursive_to_dict( + {k: v for k, v in self.__dict__.items() if k not in exclude}, + exclude=exclude, + ) + finally: + _taskstate_to_dict_guard = False def is_protected(self) -> bool: return self.state in PROCESSING or any(