Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport maintenance/2.15.x] Avoid hanging forever after a parallel job was killed #7930

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/whatsnew/fragments/3899.bugfix
@@ -0,0 +1,4 @@
Pylint will no longer deadlock if a parallel job is killed but fail
immediately instead.

Closes #3899
21 changes: 11 additions & 10 deletions pylint/lint/parallel.py
Expand Up @@ -23,19 +23,23 @@
except ImportError:
multiprocessing = None # type: ignore[assignment]

try:
from concurrent.futures import ProcessPoolExecutor
except ImportError:
ProcessPoolExecutor = None # type: ignore[assignment,misc]

if TYPE_CHECKING:
from pylint.lint import PyLinter

# PyLinter object used by worker processes when checking files using multiprocessing
# PyLinter object used by worker processes when checking files using parallel mode
# should only be used by the worker processes
_worker_linter: PyLinter | None = None


def _worker_initialize(
linter: bytes, arguments: None | str | Sequence[str] = None
) -> None:
"""Function called to initialize a worker for a Process within a multiprocessing
Pool.
"""Function called to initialize a worker for a Process within a concurrent Pool.

:param linter: A linter-class (PyLinter) instance pickled with dill
:param arguments: File or module name(s) to lint and to be added to sys.path
Expand Down Expand Up @@ -137,9 +141,9 @@ def check_parallel(
# is identical to the linter object here. This is required so that
# a custom PyLinter object can be used.
initializer = functools.partial(_worker_initialize, arguments=arguments)
with multiprocessing.Pool(
jobs, initializer=initializer, initargs=[dill.dumps(linter)]
) as pool:
with ProcessPoolExecutor(
max_workers=jobs, initializer=initializer, initargs=(dill.dumps(linter),)
) as executor:
linter.open()
all_stats = []
all_mapreduce_data: defaultdict[
Expand All @@ -158,7 +162,7 @@ def check_parallel(
stats,
msg_status,
mapreduce_data,
) in pool.imap_unordered(_worker_check_single_file, files):
) in executor.map(_worker_check_single_file, files):
linter.file_state.base_name = base_name
linter.file_state._is_base_filestate = False
linter.set_current_module(module, file_path)
Expand All @@ -168,8 +172,5 @@ def check_parallel(
all_mapreduce_data[worker_idx].append(mapreduce_data)
linter.msg_status |= msg_status

pool.close()
pool.join()

_merge_mapreduce_data(linter, all_mapreduce_data)
linter.stats = merge_stats([linter.stats] + all_stats)
9 changes: 7 additions & 2 deletions pylint/lint/run.py
Expand Up @@ -30,6 +30,11 @@
except ImportError:
multiprocessing = None # type: ignore[assignment]

try:
from concurrent.futures import ProcessPoolExecutor
except ImportError:
ProcessPoolExecutor = None # type: ignore[assignment,misc]


def _query_cpu() -> int | None:
"""Try to determine number of CPUs allotted in a docker container.
Expand Down Expand Up @@ -185,9 +190,9 @@ def __init__(
)
sys.exit(32)
if linter.config.jobs > 1 or linter.config.jobs == 0:
if multiprocessing is None:
if ProcessPoolExecutor is None:
print(
"Multiprocessing library is missing, fallback to single process",
"concurrent.futures module is missing, fallback to single process",
file=sys.stderr,
)
linter.set_option("jobs", 1)
Expand Down
35 changes: 30 additions & 5 deletions tests/test_check_parallel.py
Expand Up @@ -9,8 +9,9 @@
from __future__ import annotations

import argparse
import multiprocessing
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool
from pickle import PickleError

import dill
Expand Down Expand Up @@ -182,10 +183,10 @@ def test_worker_initialize_pickling(self) -> None:
"""
linter = PyLinter(reporter=Reporter())
linter.attribute = argparse.ArgumentParser() # type: ignore[attr-defined]
with multiprocessing.Pool(
2, initializer=worker_initialize, initargs=[dill.dumps(linter)]
) as pool:
pool.imap_unordered(print, [1, 2])
with ProcessPoolExecutor(
max_workers=2, initializer=worker_initialize, initargs=(dill.dumps(linter),)
) as executor:
executor.map(print, [1, 2])

def test_worker_check_single_file_uninitialised(self) -> None:
pylint.lint.parallel._worker_linter = None
Expand Down Expand Up @@ -571,3 +572,27 @@ def test_map_reduce(self, num_files, num_jobs, num_checkers):
assert str(stats_single_proc.by_msg) == str(
stats_check_parallel.by_msg
), "Single-proc and check_parallel() should return the same thing"

@pytest.mark.timeout(5)
def test_no_deadlock_due_to_initializer_error(self) -> None:
"""Tests that an error in the initializer for the parallel jobs doesn't
lead to a deadlock.
"""
linter = PyLinter(reporter=Reporter())

linter.register_checker(SequentialTestChecker(linter))

# Create a dummy file, the actual contents of which will be ignored by the
# register test checkers, but it will trigger at least a single-job to be run.
single_file_container = _gen_file_datas(count=1)

# The error in the initializer should trigger a BrokenProcessPool exception
with pytest.raises(BrokenProcessPool):
check_parallel(
linter,
jobs=1,
files=iter(single_file_container),
# This will trigger an exception in the initializer for the parallel jobs
# because arguments has to be an Iterable.
arguments=1, # type: ignore[arg-type]
)