Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CI test the future loky-3.3.0 branch #1338

Merged
merged 12 commits into from Sep 16, 2022
5 changes: 3 additions & 2 deletions CHANGES.rst
Expand Up @@ -34,8 +34,9 @@ Development version

- Vendor cloudpickle 2.2.0 which adds support for PyPy 3.8+.

- Vendor loky 3.2.0 which fixes a bug with leaking processes in case of
nested loky parallel calls.
- Vendor loky 3.3.0 which fixes a bug with leaking processes in case of
nested loky parallel calls and more reliability spawn the correct
number of reusable workers.

Release 1.1.0
--------------
Expand Down
8 changes: 4 additions & 4 deletions azure-pipelines.yml
Expand Up @@ -55,15 +55,15 @@ jobs:
# dependencies.
imageName: 'ubuntu-latest'
PYTHON_VERSION: "3.10"
EXTRA_CONDA_PACKAGES: "numpy=1.22 distributed=2022.2.0"
EXTRA_CONDA_PACKAGES: "numpy=1.23 distributed=2022.2.0"
linux_py37_distributed:
imageName: 'ubuntu-latest'
PYTHON_VERSION: "3.7"
EXTRA_CONDA_PACKAGES: "numpy=1.15 distributed=2.13"
linux_py310_cython:
imageName: 'ubuntu-latest'
PYTHON_VERSION: "3.10"
EXTRA_CONDA_PACKAGES: "numpy=1.22"
EXTRA_CONDA_PACKAGES: "numpy=1.23"
CYTHON: "true"
linux_py37_no_multiprocessing_no_lzma:
imageName: 'ubuntu-latest'
Expand All @@ -78,12 +78,12 @@ jobs:
windows_py310:
imageName: "windows-latest"
PYTHON_VERSION: "3.10"
EXTRA_CONDA_PACKAGES: "numpy=1.22"
EXTRA_CONDA_PACKAGES: "numpy=1.23"

macos_py310:
imageName: "macos-latest"
PYTHON_VERSION: "3.10"
EXTRA_CONDA_PACKAGES: "numpy=1.22"
EXTRA_CONDA_PACKAGES: "numpy=1.23"
macos_py37_no_numpy:
imageName: "macos-latest"
PYTHON_VERSION: "3.7"
Expand Down
2 changes: 1 addition & 1 deletion joblib/externals/loky/__init__.py
Expand Up @@ -29,4 +29,4 @@
"wrap_non_picklable_objects", "set_loky_pickler"]


__version__ = '3.2.0'
__version__ = '3.3.0'
39 changes: 27 additions & 12 deletions joblib/externals/loky/process_executor.py
Expand Up @@ -678,9 +678,12 @@ def process_result_item(self, result_item):
with self.processes_management_lock:
p = self.processes.pop(result_item, None)

# p can be None is the executor is concurrently shutting down.
# p can be None if the executor is concurrently shutting down.
if p is not None:
p._worker_exit_lock.release()
mp.util.debug(
f"joining {p.name} when processing {p.pid} as result_item"
)
p.join()
del p

Expand All @@ -699,7 +702,8 @@ def process_result_item(self, result_item):
"executor. This can be caused by a too short worker "
"timeout or by a memory leak.", UserWarning
)
executor._adjust_process_count()
with executor._processes_management_lock:
executor._adjust_process_count()
executor = None
else:
# Received a _ResultItem so mark the future as completed.
Expand Down Expand Up @@ -837,15 +841,23 @@ def join_executor_internals(self):
self.thread_wakeup.close()

# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
active_processes = list(self.processes.values())
mp.util.debug(f"joining {len(active_processes)} processes")
for p in active_processes:
mp.util.debug(f"joining process {p.name}")
p.join()
# some ctx.Queue methods may deadlock on macOS.
with self.processes_management_lock:
mp.util.debug(f"joining {len(self.processes)} processes")
n_joined_processes = 0
while True:
try:
pid, p = self.processes.popitem()
mp.util.debug(f"joining process {p.name} with pid {pid}")
p.join()
n_joined_processes += 1
except KeyError:
break

mp.util.debug("executor management thread clean shutdown of worker "
f"processes: {active_processes}")
mp.util.debug(
"executor management thread clean shutdown of "
f"{n_joined_processes} workers"
)

def get_n_children_alive(self):
# This is an upper bound on the number of children alive.
Expand Down Expand Up @@ -1082,7 +1094,7 @@ def _start_executor_manager_thread(self):
_python_exit)

def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
while len(self._processes) < self._max_workers:
worker_exit_lock = self._context.BoundedSemaphore(1)
args = (self._call_queue, self._result_queue, self._initializer,
self._initargs, self._processes_management_lock,
Expand All @@ -1098,7 +1110,10 @@ def _adjust_process_count(self):
p._worker_exit_lock = worker_exit_lock
p.start()
self._processes[p.pid] = p
mp.util.debug(f'Adjust process count : {self._processes}')
mp.util.debug(
f"Adjusted process count to {self._max_workers}: "
f"{[(p.name, pid) for pid, p in self._processes.items()]}"
)

def _ensure_executor_running(self):
"""ensures all workers and management thread are running
Expand Down
40 changes: 18 additions & 22 deletions joblib/test/test_memmapping.py
Expand Up @@ -613,29 +613,25 @@ def test_many_parallel_calls_on_same_object(backend):
delayed(return_slice_of_data)(data, 0, 20)
for _ in range(10)
)
slice_of_data = Parallel(
n_jobs=2, max_nbytes=1, backend='{b}')(
delayed(return_slice_of_data)(data, 0, 20)
for _ in range(10)
)
'''.format(b=backend)

for _ in range(3):
env = os.environ.copy()
env['PYTHONPATH'] = os.path.dirname(__file__)
p = subprocess.Popen([sys.executable, '-c', cmd],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE, env=env)
p.wait()
out, err = p.communicate()
assert p.returncode == 0, err
assert out == b''
if sys.version_info[:3] not in [(3, 8, 0), (3, 8, 1)]:
# In early versions of Python 3.8, a reference leak
# https://github.com/cloudpipe/cloudpickle/issues/327, holds
# references to pickled objects, generating race condition during
# cleanup finalizers of joblib and noisy resource_tracker outputs.
assert b'resource_tracker' not in err
env = os.environ.copy()
env['PYTHONPATH'] = os.path.dirname(__file__)
p = subprocess.Popen(
[sys.executable, '-c', cmd],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
env=env,
)
p.wait()
out, err = p.communicate()
assert p.returncode == 0, err
assert out == b''
if sys.version_info[:3] not in [(3, 8, 0), (3, 8, 1)]:
# In early versions of Python 3.8, a reference leak
# https://github.com/cloudpipe/cloudpickle/issues/327, holds
# references to pickled objects, generating race condition during
# cleanup finalizers of joblib and noisy resource_tracker outputs.
assert b'resource_tracker' not in err


@with_numpy
Expand Down