diff --git a/doc/whatsnew/fragments/3899.bugfix b/doc/whatsnew/fragments/3899.bugfix new file mode 100644 index 0000000000..ed0ad38594 --- /dev/null +++ b/doc/whatsnew/fragments/3899.bugfix @@ -0,0 +1,4 @@ +Pylint will no longer deadlock if a parallel job is killed but fail +immediately instead. + +Closes #3899 diff --git a/pylint/lint/parallel.py b/pylint/lint/parallel.py index 8d0053108b..9730914b75 100644 --- a/pylint/lint/parallel.py +++ b/pylint/lint/parallel.py @@ -23,10 +23,15 @@ except ImportError: multiprocessing = None # type: ignore[assignment] +try: + from concurrent.futures import ProcessPoolExecutor +except ImportError: + ProcessPoolExecutor = None # type: ignore[assignment,misc] + if TYPE_CHECKING: from pylint.lint import PyLinter -# PyLinter object used by worker processes when checking files using multiprocessing +# PyLinter object used by worker processes when checking files using parallel mode # should only be used by the worker processes _worker_linter: PyLinter | None = None @@ -34,8 +39,7 @@ def _worker_initialize( linter: bytes, arguments: None | str | Sequence[str] = None ) -> None: - """Function called to initialize a worker for a Process within a multiprocessing - Pool. + """Function called to initialize a worker for a Process within a concurrent Pool. :param linter: A linter-class (PyLinter) instance pickled with dill :param arguments: File or module name(s) to lint and to be added to sys.path @@ -137,9 +141,9 @@ def check_parallel( # is identical to the linter object here. This is required so that # a custom PyLinter object can be used. initializer = functools.partial(_worker_initialize, arguments=arguments) - with multiprocessing.Pool( - jobs, initializer=initializer, initargs=[dill.dumps(linter)] - ) as pool: + with ProcessPoolExecutor( + max_workers=jobs, initializer=initializer, initargs=(dill.dumps(linter),) + ) as executor: linter.open() all_stats = [] all_mapreduce_data: defaultdict[ @@ -158,7 +162,7 @@ def check_parallel( stats, msg_status, mapreduce_data, - ) in pool.imap_unordered(_worker_check_single_file, files): + ) in executor.map(_worker_check_single_file, files): linter.file_state.base_name = base_name linter.file_state._is_base_filestate = False linter.set_current_module(module, file_path) @@ -168,8 +172,5 @@ def check_parallel( all_mapreduce_data[worker_idx].append(mapreduce_data) linter.msg_status |= msg_status - pool.close() - pool.join() - _merge_mapreduce_data(linter, all_mapreduce_data) linter.stats = merge_stats([linter.stats] + all_stats) diff --git a/pylint/lint/run.py b/pylint/lint/run.py index b8923aac5f..6a6d3a05da 100644 --- a/pylint/lint/run.py +++ b/pylint/lint/run.py @@ -30,6 +30,11 @@ except ImportError: multiprocessing = None # type: ignore[assignment] +try: + from concurrent.futures import ProcessPoolExecutor +except ImportError: + ProcessPoolExecutor = None # type: ignore[assignment,misc] + def _query_cpu() -> int | None: """Try to determine number of CPUs allotted in a docker container. @@ -185,9 +190,9 @@ def __init__( ) sys.exit(32) if linter.config.jobs > 1 or linter.config.jobs == 0: - if multiprocessing is None: + if ProcessPoolExecutor is None: print( - "Multiprocessing library is missing, fallback to single process", + "concurrent.futures module is missing, fallback to single process", file=sys.stderr, ) linter.set_option("jobs", 1) diff --git a/tests/test_check_parallel.py b/tests/test_check_parallel.py index 15b59304c0..96e67517ec 100644 --- a/tests/test_check_parallel.py +++ b/tests/test_check_parallel.py @@ -9,8 +9,9 @@ from __future__ import annotations import argparse -import multiprocessing import os +from concurrent.futures import ProcessPoolExecutor +from concurrent.futures.process import BrokenProcessPool from pickle import PickleError import dill @@ -182,10 +183,10 @@ def test_worker_initialize_pickling(self) -> None: """ linter = PyLinter(reporter=Reporter()) linter.attribute = argparse.ArgumentParser() # type: ignore[attr-defined] - with multiprocessing.Pool( - 2, initializer=worker_initialize, initargs=[dill.dumps(linter)] - ) as pool: - pool.imap_unordered(print, [1, 2]) + with ProcessPoolExecutor( + max_workers=2, initializer=worker_initialize, initargs=(dill.dumps(linter),) + ) as executor: + executor.map(print, [1, 2]) def test_worker_check_single_file_uninitialised(self) -> None: pylint.lint.parallel._worker_linter = None @@ -577,3 +578,27 @@ def test_map_reduce(self, num_files: int, num_jobs: int, num_checkers: int) -> N assert str(stats_single_proc.by_msg) == str( stats_check_parallel.by_msg ), "Single-proc and check_parallel() should return the same thing" + + @pytest.mark.timeout(5) + def test_no_deadlock_due_to_initializer_error(self) -> None: + """Tests that an error in the initializer for the parallel jobs doesn't + lead to a deadlock. + """ + linter = PyLinter(reporter=Reporter()) + + linter.register_checker(SequentialTestChecker(linter)) + + # Create a dummy file, the actual contents of which will be ignored by the + # register test checkers, but it will trigger at least a single-job to be run. + single_file_container = _gen_file_datas(count=1) + + # The error in the initializer should trigger a BrokenProcessPool exception + with pytest.raises(BrokenProcessPool): + check_parallel( + linter, + jobs=1, + files=iter(single_file_container), + # This will trigger an exception in the initializer for the parallel jobs + # because arguments has to be an Iterable. + arguments=1, # type: ignore[arg-type] + )