Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid hanging forever after a parallel job was killed #7834

Merged
merged 10 commits into from Dec 12, 2022
4 changes: 4 additions & 0 deletions doc/whatsnew/fragments/3899.bugfix
@@ -0,0 +1,4 @@
Pylint will no longer deadlock if a parallel job is killed but fail
immediately instead.

Closes #3899
12 changes: 5 additions & 7 deletions pylint/lint/parallel.py
Expand Up @@ -8,6 +8,7 @@
import warnings
from collections import defaultdict
from collections.abc import Iterable, Sequence
from concurrent.futures import ProcessPoolExecutor
from typing import TYPE_CHECKING, Any

import dill
Expand Down Expand Up @@ -137,9 +138,9 @@ def check_parallel(
# is identical to the linter object here. This is required so that
# a custom PyLinter object can be used.
initializer = functools.partial(_worker_initialize, arguments=arguments)
with multiprocessing.Pool(
jobs, initializer=initializer, initargs=[dill.dumps(linter)]
) as pool:
with ProcessPoolExecutor(
max_workers=jobs, initializer=initializer, initargs=(dill.dumps(linter),)
) as executor:
linter.open()
all_stats = []
all_mapreduce_data: defaultdict[
Expand All @@ -158,7 +159,7 @@ def check_parallel(
stats,
msg_status,
mapreduce_data,
) in pool.imap_unordered(_worker_check_single_file, files):
) in executor.map(_worker_check_single_file, files):
linter.file_state.base_name = base_name
linter.file_state._is_base_filestate = False
linter.set_current_module(module, file_path)
Expand All @@ -168,8 +169,5 @@ def check_parallel(
all_mapreduce_data[worker_idx].append(mapreduce_data)
linter.msg_status |= msg_status

DanielNoord marked this conversation as resolved.
Show resolved Hide resolved
pool.close()
pool.join()

_merge_mapreduce_data(linter, all_mapreduce_data)
linter.stats = merge_stats([linter.stats] + all_stats)
35 changes: 30 additions & 5 deletions tests/test_check_parallel.py
Expand Up @@ -9,8 +9,9 @@
from __future__ import annotations

import argparse
import multiprocessing
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool
from pickle import PickleError

import dill
Expand Down Expand Up @@ -182,10 +183,10 @@ def test_worker_initialize_pickling(self) -> None:
"""
linter = PyLinter(reporter=Reporter())
linter.attribute = argparse.ArgumentParser() # type: ignore[attr-defined]
with multiprocessing.Pool(
2, initializer=worker_initialize, initargs=[dill.dumps(linter)]
) as pool:
pool.imap_unordered(print, [1, 2])
with ProcessPoolExecutor(
max_workers=2, initializer=worker_initialize, initargs=(dill.dumps(linter),)
) as executor:
executor.map(print, [1, 2])

def test_worker_check_single_file_uninitialised(self) -> None:
pylint.lint.parallel._worker_linter = None
Expand Down Expand Up @@ -577,3 +578,27 @@ def test_map_reduce(self, num_files: int, num_jobs: int, num_checkers: int) -> N
assert str(stats_single_proc.by_msg) == str(
stats_check_parallel.by_msg
), "Single-proc and check_parallel() should return the same thing"

@pytest.mark.timeout(1)
def test_no_deadlock_due_to_initializer_error(self) -> None:
"""Tests that an error in the initializer for the parallel jobs doesn't
lead to a deadlock.
"""
linter = PyLinter(reporter=Reporter())

linter.register_checker(SequentialTestChecker(linter))

# Create a dummy file, the actual contents of which will be ignored by the
# register test checkers, but it will trigger at least a single-job to be run.
single_file_container = _gen_file_datas(count=1)

# The error in the initializer should trigger a BrokenProcessPool exception
with pytest.raises(BrokenProcessPool):
check_parallel(
linter,
jobs=1,
files=iter(single_file_container),
# This will trigger an exception in the initializer for the parallel jobs
# because arguments has to be an Iterable.
arguments=1, # type: ignore[arg-type]
)