From fc1baf7f04c1334ea0f7ffbc8962c3463ad91704 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Wed, 23 Nov 2022 18:58:10 +0100 Subject: [PATCH 01/10] Replace multiprocessing.pool with concurrent.futures.ProcessPoolExecutor to avoid deadlocks. In a multiprocessing.pool, if a process terminates in a non-clean fashion (for example, due to OOM or a segmentation fault), the pool will silently replace said process, but the work that the process was supposed to do will never be done, causing pylint to hang indefinitely. The concurrent.futures.ProcessPoolExecutor will raise a BrokenProcessPool exception in that case, avoiding the hang. --- pylint/lint/parallel.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pylint/lint/parallel.py b/pylint/lint/parallel.py index 8d0053108b..d1dd07f0cf 100644 --- a/pylint/lint/parallel.py +++ b/pylint/lint/parallel.py @@ -8,6 +8,7 @@ import warnings from collections import defaultdict from collections.abc import Iterable, Sequence +from concurrent.futures import ProcessPoolExecutor from typing import TYPE_CHECKING, Any import dill @@ -137,9 +138,9 @@ def check_parallel( # is identical to the linter object here. This is required so that # a custom PyLinter object can be used. initializer = functools.partial(_worker_initialize, arguments=arguments) - with multiprocessing.Pool( - jobs, initializer=initializer, initargs=[dill.dumps(linter)] - ) as pool: + with ProcessPoolExecutor( + max_workers=jobs, initializer=initializer, initargs=(dill.dumps(linter)) + ) as executor: linter.open() all_stats = [] all_mapreduce_data: defaultdict[ @@ -158,7 +159,7 @@ def check_parallel( stats, msg_status, mapreduce_data, - ) in pool.imap_unordered(_worker_check_single_file, files): + ) in executor.map(_worker_check_single_file, files): linter.file_state.base_name = base_name linter.file_state._is_base_filestate = False linter.set_current_module(module, file_path) @@ -168,8 +169,5 @@ def check_parallel( all_mapreduce_data[worker_idx].append(mapreduce_data) linter.msg_status |= msg_status - pool.close() - pool.join() - _merge_mapreduce_data(linter, all_mapreduce_data) linter.stats = merge_stats([linter.stats] + all_stats) From d5e17d86b57dcf13456c7415299eb57258c75319 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 24 Nov 2022 11:59:36 +0100 Subject: [PATCH 02/10] Fix initargs parameter to be a tuple. --- pylint/lint/parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pylint/lint/parallel.py b/pylint/lint/parallel.py index d1dd07f0cf..1acdfe550a 100644 --- a/pylint/lint/parallel.py +++ b/pylint/lint/parallel.py @@ -139,7 +139,7 @@ def check_parallel( # a custom PyLinter object can be used. initializer = functools.partial(_worker_initialize, arguments=arguments) with ProcessPoolExecutor( - max_workers=jobs, initializer=initializer, initargs=(dill.dumps(linter)) + max_workers=jobs, initializer=initializer, initargs=(dill.dumps(linter),) ) as executor: linter.open() all_stats = [] From 26577726ea9b048fd6da1fa2241857930c1250ff Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 24 Nov 2022 20:12:07 +0100 Subject: [PATCH 03/10] add whatsnew fragment --- doc/whatsnew/fragments/3899.bugfix | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 doc/whatsnew/fragments/3899.bugfix diff --git a/doc/whatsnew/fragments/3899.bugfix b/doc/whatsnew/fragments/3899.bugfix new file mode 100644 index 0000000000..ed0ad38594 --- /dev/null +++ b/doc/whatsnew/fragments/3899.bugfix @@ -0,0 +1,4 @@ +Pylint will no longer deadlock if a parallel job is killed but fail +immediately instead. + +Closes #3899 From 2e47049a16b21232c37bd7ab0b00950fdd3793a4 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Mon, 28 Nov 2022 12:05:48 +0100 Subject: [PATCH 04/10] Add a test that an error in the initializer for the parallel jobs, doesn't lead to a deadlock. --- tests/test_check_parallel.py | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/tests/test_check_parallel.py b/tests/test_check_parallel.py index 15b59304c0..e3cd0e43f6 100644 --- a/tests/test_check_parallel.py +++ b/tests/test_check_parallel.py @@ -9,8 +9,9 @@ from __future__ import annotations import argparse -import multiprocessing import os +from concurrent.futures import ProcessPoolExecutor +from concurrent.futures.process import BrokenProcessPool from pickle import PickleError import dill @@ -182,10 +183,10 @@ def test_worker_initialize_pickling(self) -> None: """ linter = PyLinter(reporter=Reporter()) linter.attribute = argparse.ArgumentParser() # type: ignore[attr-defined] - with multiprocessing.Pool( - 2, initializer=worker_initialize, initargs=[dill.dumps(linter)] - ) as pool: - pool.imap_unordered(print, [1, 2]) + with ProcessPoolExecutor( + max_workers=2, initializer=worker_initialize, initargs=(dill.dumps(linter),) + ) as executor: + executor.map(print, [1, 2]) def test_worker_check_single_file_uninitialised(self) -> None: pylint.lint.parallel._worker_linter = None @@ -577,3 +578,27 @@ def test_map_reduce(self, num_files: int, num_jobs: int, num_checkers: int) -> N assert str(stats_single_proc.by_msg) == str( stats_check_parallel.by_msg ), "Single-proc and check_parallel() should return the same thing" + + @pytest.mark.timeout(1) + def test_no_deadlock_due_to_initializer_error(self) -> None: + """Tests that an error in the initializer for the parallel jobs doesn't + lead to a deadlock. + """ + linter = PyLinter(reporter=Reporter()) + + linter.register_checker(SequentialTestChecker(linter)) + + # Create a dummy file, the actual contents of which will be ignored by the + # register test checkers, but it will trigger at least a single-job to be run. + single_file_container = _gen_file_datas(count=1) + + # The error in the initializer should trigger a BrokenProcessPool exception + with pytest.raises(BrokenProcessPool): + check_parallel( + linter, + jobs=1, + files=iter(single_file_container), + # This will trigger an exception in the initializer for the parallel jobs + # because arguments has to be an Iterable. + arguments=1, # type: ignore[arg-type] + ) From 236e5e918f82ad5ce9a4dda736b2567bd03e3fb8 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Mon, 5 Dec 2022 10:20:43 +0100 Subject: [PATCH 05/10] Increase timeout for parallel initializer test to avoid flakiness. --- tests/test_check_parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_check_parallel.py b/tests/test_check_parallel.py index e3cd0e43f6..96e67517ec 100644 --- a/tests/test_check_parallel.py +++ b/tests/test_check_parallel.py @@ -579,7 +579,7 @@ def test_map_reduce(self, num_files: int, num_jobs: int, num_checkers: int) -> N stats_check_parallel.by_msg ), "Single-proc and check_parallel() should return the same thing" - @pytest.mark.timeout(1) + @pytest.mark.timeout(5) def test_no_deadlock_due_to_initializer_error(self) -> None: """Tests that an error in the initializer for the parallel jobs doesn't lead to a deadlock. From 61b83427af296e0142691dbd8ee9cce5d892f0c0 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Tue, 6 Dec 2022 12:28:29 +0100 Subject: [PATCH 06/10] Guard concurrent.futures imports, because they are not available in Emscripten and WASI. --- pylint/lint/parallel.py | 6 +++++- tests/test_check_parallel.py | 9 +++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/pylint/lint/parallel.py b/pylint/lint/parallel.py index 1acdfe550a..6acc65932e 100644 --- a/pylint/lint/parallel.py +++ b/pylint/lint/parallel.py @@ -8,7 +8,6 @@ import warnings from collections import defaultdict from collections.abc import Iterable, Sequence -from concurrent.futures import ProcessPoolExecutor from typing import TYPE_CHECKING, Any import dill @@ -24,6 +23,11 @@ except ImportError: multiprocessing = None # type: ignore[assignment] +try: + from concurrent.futures import ProcessPoolExecutor +except ImportError: + ProcessPoolExecutor = None # type: ignore[assignment,misc] + if TYPE_CHECKING: from pylint.lint import PyLinter diff --git a/tests/test_check_parallel.py b/tests/test_check_parallel.py index 96e67517ec..2f49da37ac 100644 --- a/tests/test_check_parallel.py +++ b/tests/test_check_parallel.py @@ -10,8 +10,6 @@ import argparse import os -from concurrent.futures import ProcessPoolExecutor -from concurrent.futures.process import BrokenProcessPool from pickle import PickleError import dill @@ -29,6 +27,13 @@ from pylint.typing import FileItem from pylint.utils import LinterStats, ModuleStats +try: + from concurrent.futures import ProcessPoolExecutor + from concurrent.futures.process import BrokenProcessPool +except ImportError: + ProcessPoolExecutor = None # type: ignore[assignment,misc] + BrokenProcessPool = None # type: ignore[assignment,misc] + def _gen_file_data(idx: int = 0) -> FileItem: """Generates a file to use as a stream.""" From a52cb6c0135fa1b948fd27d7affae04965369f7b Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 8 Dec 2022 23:33:38 +0100 Subject: [PATCH 07/10] Check availability of concurrent.futures module instead of multiprocessing for single process fallback. --- pylint/lint/run.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pylint/lint/run.py b/pylint/lint/run.py index b8923aac5f..6a6d3a05da 100644 --- a/pylint/lint/run.py +++ b/pylint/lint/run.py @@ -30,6 +30,11 @@ except ImportError: multiprocessing = None # type: ignore[assignment] +try: + from concurrent.futures import ProcessPoolExecutor +except ImportError: + ProcessPoolExecutor = None # type: ignore[assignment,misc] + def _query_cpu() -> int | None: """Try to determine number of CPUs allotted in a docker container. @@ -185,9 +190,9 @@ def __init__( ) sys.exit(32) if linter.config.jobs > 1 or linter.config.jobs == 0: - if multiprocessing is None: + if ProcessPoolExecutor is None: print( - "Multiprocessing library is missing, fallback to single process", + "concurrent.futures module is missing, fallback to single process", file=sys.stderr, ) linter.set_option("jobs", 1) From 30af13a035803f5e93768494598d8b9a299988b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Noord?= <13665637+DanielNoord@users.noreply.github.com> Date: Sat, 10 Dec 2022 12:22:23 +0100 Subject: [PATCH 08/10] Small fixes --- pylint/lint/parallel.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pylint/lint/parallel.py b/pylint/lint/parallel.py index 6acc65932e..9730914b75 100644 --- a/pylint/lint/parallel.py +++ b/pylint/lint/parallel.py @@ -31,7 +31,7 @@ if TYPE_CHECKING: from pylint.lint import PyLinter -# PyLinter object used by worker processes when checking files using multiprocessing +# PyLinter object used by worker processes when checking files using parallel mode # should only be used by the worker processes _worker_linter: PyLinter | None = None @@ -39,8 +39,7 @@ def _worker_initialize( linter: bytes, arguments: None | str | Sequence[str] = None ) -> None: - """Function called to initialize a worker for a Process within a multiprocessing - Pool. + """Function called to initialize a worker for a Process within a concurrent Pool. :param linter: A linter-class (PyLinter) instance pickled with dill :param arguments: File or module name(s) to lint and to be added to sys.path From e164bead20401e233e55ea20a6e647a67dfdf61f Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 12 Dec 2022 12:06:02 +0100 Subject: [PATCH 09/10] Update tests/test_check_parallel.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Daniƫl van Noord <13665637+DanielNoord@users.noreply.github.com> --- tests/test_check_parallel.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/test_check_parallel.py b/tests/test_check_parallel.py index 2f49da37ac..3ed73827f8 100644 --- a/tests/test_check_parallel.py +++ b/tests/test_check_parallel.py @@ -27,12 +27,8 @@ from pylint.typing import FileItem from pylint.utils import LinterStats, ModuleStats -try: - from concurrent.futures import ProcessPoolExecutor - from concurrent.futures.process import BrokenProcessPool -except ImportError: - ProcessPoolExecutor = None # type: ignore[assignment,misc] - BrokenProcessPool = None # type: ignore[assignment,misc] +from concurrent.futures import ProcessPoolExecutor +from concurrent.futures.process import BrokenProcessPool def _gen_file_data(idx: int = 0) -> FileItem: From 69729dbede2eb0b5dc5c5a6870b63189a50e4d8e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 12 Dec 2022 11:07:53 +0000 Subject: [PATCH 10/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_check_parallel.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/test_check_parallel.py b/tests/test_check_parallel.py index 3ed73827f8..96e67517ec 100644 --- a/tests/test_check_parallel.py +++ b/tests/test_check_parallel.py @@ -10,6 +10,8 @@ import argparse import os +from concurrent.futures import ProcessPoolExecutor +from concurrent.futures.process import BrokenProcessPool from pickle import PickleError import dill @@ -27,9 +29,6 @@ from pylint.typing import FileItem from pylint.utils import LinterStats, ModuleStats -from concurrent.futures import ProcessPoolExecutor -from concurrent.futures.process import BrokenProcessPool - def _gen_file_data(idx: int = 0) -> FileItem: """Generates a file to use as a stream."""