From defd3a42076284a8dc50f607741d8099e6e067a9 Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 12 Dec 2022 23:34:46 +0100 Subject: [PATCH] Avoid hanging forever after a parallel job was killed (#7834) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Replace multiprocessing.pool with concurrent.futures.ProcessPoolExecutor to avoid deadlocks. In a multiprocessing.pool, if a process terminates in a non-clean fashion (for example, due to OOM or a segmentation fault), the pool will silently replace said process, but the work that the process was supposed to do will never be done, causing pylint to hang indefinitely. The concurrent.futures.ProcessPoolExecutor will raise a BrokenProcessPool exception in that case, avoiding the hang. Co-authored-by: Daniƫl van Noord <13665637+DanielNoord@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> (cherry picked from commit 5eca8ec5947a0d45e588bded0e910a986d4dd77b) --- doc/whatsnew/fragments/3899.bugfix | 4 ++++ pylint/lint/parallel.py | 21 +++++++++--------- pylint/lint/run.py | 9 ++++++-- tests/test_check_parallel.py | 35 +++++++++++++++++++++++++----- 4 files changed, 52 insertions(+), 17 deletions(-) create mode 100644 doc/whatsnew/fragments/3899.bugfix diff --git a/doc/whatsnew/fragments/3899.bugfix b/doc/whatsnew/fragments/3899.bugfix new file mode 100644 index 0000000000..ed0ad38594 --- /dev/null +++ b/doc/whatsnew/fragments/3899.bugfix @@ -0,0 +1,4 @@ +Pylint will no longer deadlock if a parallel job is killed but fail +immediately instead. + +Closes #3899 diff --git a/pylint/lint/parallel.py b/pylint/lint/parallel.py index 646d269943..8de083655d 100644 --- a/pylint/lint/parallel.py +++ b/pylint/lint/parallel.py @@ -23,10 +23,15 @@ except ImportError: multiprocessing = None # type: ignore[assignment] +try: + from concurrent.futures import ProcessPoolExecutor +except ImportError: + ProcessPoolExecutor = None # type: ignore[assignment,misc] + if TYPE_CHECKING: from pylint.lint import PyLinter -# PyLinter object used by worker processes when checking files using multiprocessing +# PyLinter object used by worker processes when checking files using parallel mode # should only be used by the worker processes _worker_linter: PyLinter | None = None @@ -34,8 +39,7 @@ def _worker_initialize( linter: bytes, arguments: None | str | Sequence[str] = None ) -> None: - """Function called to initialize a worker for a Process within a multiprocessing - Pool. + """Function called to initialize a worker for a Process within a concurrent Pool. :param linter: A linter-class (PyLinter) instance pickled with dill :param arguments: File or module name(s) to lint and to be added to sys.path @@ -137,9 +141,9 @@ def check_parallel( # is identical to the linter object here. This is required so that # a custom PyLinter object can be used. initializer = functools.partial(_worker_initialize, arguments=arguments) - with multiprocessing.Pool( - jobs, initializer=initializer, initargs=[dill.dumps(linter)] - ) as pool: + with ProcessPoolExecutor( + max_workers=jobs, initializer=initializer, initargs=(dill.dumps(linter),) + ) as executor: linter.open() all_stats = [] all_mapreduce_data: defaultdict[ @@ -158,7 +162,7 @@ def check_parallel( stats, msg_status, mapreduce_data, - ) in pool.imap_unordered(_worker_check_single_file, files): + ) in executor.map(_worker_check_single_file, files): linter.file_state.base_name = base_name linter.file_state._is_base_filestate = False linter.set_current_module(module, file_path) @@ -168,8 +172,5 @@ def check_parallel( all_mapreduce_data[worker_idx].append(mapreduce_data) linter.msg_status |= msg_status - pool.close() - pool.join() - _merge_mapreduce_data(linter, all_mapreduce_data) linter.stats = merge_stats([linter.stats] + all_stats) diff --git a/pylint/lint/run.py b/pylint/lint/run.py index b8923aac5f..6a6d3a05da 100644 --- a/pylint/lint/run.py +++ b/pylint/lint/run.py @@ -30,6 +30,11 @@ except ImportError: multiprocessing = None # type: ignore[assignment] +try: + from concurrent.futures import ProcessPoolExecutor +except ImportError: + ProcessPoolExecutor = None # type: ignore[assignment,misc] + def _query_cpu() -> int | None: """Try to determine number of CPUs allotted in a docker container. @@ -185,9 +190,9 @@ def __init__( ) sys.exit(32) if linter.config.jobs > 1 or linter.config.jobs == 0: - if multiprocessing is None: + if ProcessPoolExecutor is None: print( - "Multiprocessing library is missing, fallback to single process", + "concurrent.futures module is missing, fallback to single process", file=sys.stderr, ) linter.set_option("jobs", 1) diff --git a/tests/test_check_parallel.py b/tests/test_check_parallel.py index a7fb5c158f..3856f9add0 100644 --- a/tests/test_check_parallel.py +++ b/tests/test_check_parallel.py @@ -9,8 +9,9 @@ from __future__ import annotations import argparse -import multiprocessing import os +from concurrent.futures import ProcessPoolExecutor +from concurrent.futures.process import BrokenProcessPool from pickle import PickleError import dill @@ -182,10 +183,10 @@ def test_worker_initialize_pickling(self) -> None: """ linter = PyLinter(reporter=Reporter()) linter.attribute = argparse.ArgumentParser() # type: ignore[attr-defined] - with multiprocessing.Pool( - 2, initializer=worker_initialize, initargs=[dill.dumps(linter)] - ) as pool: - pool.imap_unordered(print, [1, 2]) + with ProcessPoolExecutor( + max_workers=2, initializer=worker_initialize, initargs=(dill.dumps(linter),) + ) as executor: + executor.map(print, [1, 2]) def test_worker_check_single_file_uninitialised(self) -> None: pylint.lint.parallel._worker_linter = None @@ -571,3 +572,27 @@ def test_map_reduce(self, num_files, num_jobs, num_checkers): assert str(stats_single_proc.by_msg) == str( stats_check_parallel.by_msg ), "Single-proc and check_parallel() should return the same thing" + + @pytest.mark.timeout(5) + def test_no_deadlock_due_to_initializer_error(self) -> None: + """Tests that an error in the initializer for the parallel jobs doesn't + lead to a deadlock. + """ + linter = PyLinter(reporter=Reporter()) + + linter.register_checker(SequentialTestChecker(linter)) + + # Create a dummy file, the actual contents of which will be ignored by the + # register test checkers, but it will trigger at least a single-job to be run. + single_file_container = _gen_file_datas(count=1) + + # The error in the initializer should trigger a BrokenProcessPool exception + with pytest.raises(BrokenProcessPool): + check_parallel( + linter, + jobs=1, + files=iter(single_file_container), + # This will trigger an exception in the initializer for the parallel jobs + # because arguments has to be an Iterable. + arguments=1, # type: ignore[arg-type] + )