Skip to content

Commit

Permalink
CI test the future loky-3.3.0 branch (#1338)
Browse files Browse the repository at this point in the history
  • Loading branch information
ogrisel committed Sep 16, 2022
1 parent 8aca6f4 commit cea26ff
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 41 deletions.
5 changes: 3 additions & 2 deletions CHANGES.rst
Expand Up @@ -34,8 +34,9 @@ Development version

- Vendor cloudpickle 2.2.0 which adds support for PyPy 3.8+.

- Vendor loky 3.2.0 which fixes a bug with leaking processes in case of
nested loky parallel calls.
- Vendor loky 3.3.0 which fixes a bug with leaking processes in case of
nested loky parallel calls and more reliability spawn the correct
number of reusable workers.

Release 1.1.0
--------------
Expand Down
8 changes: 4 additions & 4 deletions azure-pipelines.yml
Expand Up @@ -55,15 +55,15 @@ jobs:
# dependencies.
imageName: 'ubuntu-latest'
PYTHON_VERSION: "3.10"
EXTRA_CONDA_PACKAGES: "numpy=1.22 distributed=2022.2.0"
EXTRA_CONDA_PACKAGES: "numpy=1.23 distributed=2022.2.0"
linux_py37_distributed:
imageName: 'ubuntu-latest'
PYTHON_VERSION: "3.7"
EXTRA_CONDA_PACKAGES: "numpy=1.15 distributed=2.13"
linux_py310_cython:
imageName: 'ubuntu-latest'
PYTHON_VERSION: "3.10"
EXTRA_CONDA_PACKAGES: "numpy=1.22"
EXTRA_CONDA_PACKAGES: "numpy=1.23"
CYTHON: "true"
linux_py37_no_multiprocessing_no_lzma:
imageName: 'ubuntu-latest'
Expand All @@ -78,12 +78,12 @@ jobs:
windows_py310:
imageName: "windows-latest"
PYTHON_VERSION: "3.10"
EXTRA_CONDA_PACKAGES: "numpy=1.22"
EXTRA_CONDA_PACKAGES: "numpy=1.23"

macos_py310:
imageName: "macos-latest"
PYTHON_VERSION: "3.10"
EXTRA_CONDA_PACKAGES: "numpy=1.22"
EXTRA_CONDA_PACKAGES: "numpy=1.23"
macos_py37_no_numpy:
imageName: "macos-latest"
PYTHON_VERSION: "3.7"
Expand Down
2 changes: 1 addition & 1 deletion joblib/externals/loky/__init__.py
Expand Up @@ -29,4 +29,4 @@
"wrap_non_picklable_objects", "set_loky_pickler"]


__version__ = '3.2.0'
__version__ = '3.3.0'
39 changes: 27 additions & 12 deletions joblib/externals/loky/process_executor.py
Expand Up @@ -678,9 +678,12 @@ def process_result_item(self, result_item):
with self.processes_management_lock:
p = self.processes.pop(result_item, None)

# p can be None is the executor is concurrently shutting down.
# p can be None if the executor is concurrently shutting down.
if p is not None:
p._worker_exit_lock.release()
mp.util.debug(
f"joining {p.name} when processing {p.pid} as result_item"
)
p.join()
del p

Expand All @@ -699,7 +702,8 @@ def process_result_item(self, result_item):
"executor. This can be caused by a too short worker "
"timeout or by a memory leak.", UserWarning
)
executor._adjust_process_count()
with executor._processes_management_lock:
executor._adjust_process_count()
executor = None
else:
# Received a _ResultItem so mark the future as completed.
Expand Down Expand Up @@ -837,15 +841,23 @@ def join_executor_internals(self):
self.thread_wakeup.close()

# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
active_processes = list(self.processes.values())
mp.util.debug(f"joining {len(active_processes)} processes")
for p in active_processes:
mp.util.debug(f"joining process {p.name}")
p.join()
# some ctx.Queue methods may deadlock on macOS.
with self.processes_management_lock:
mp.util.debug(f"joining {len(self.processes)} processes")
n_joined_processes = 0
while True:
try:
pid, p = self.processes.popitem()
mp.util.debug(f"joining process {p.name} with pid {pid}")
p.join()
n_joined_processes += 1
except KeyError:
break

mp.util.debug("executor management thread clean shutdown of worker "
f"processes: {active_processes}")
mp.util.debug(
"executor management thread clean shutdown of "
f"{n_joined_processes} workers"
)

def get_n_children_alive(self):
# This is an upper bound on the number of children alive.
Expand Down Expand Up @@ -1082,7 +1094,7 @@ def _start_executor_manager_thread(self):
_python_exit)

def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
while len(self._processes) < self._max_workers:
worker_exit_lock = self._context.BoundedSemaphore(1)
args = (self._call_queue, self._result_queue, self._initializer,
self._initargs, self._processes_management_lock,
Expand All @@ -1098,7 +1110,10 @@ def _adjust_process_count(self):
p._worker_exit_lock = worker_exit_lock
p.start()
self._processes[p.pid] = p
mp.util.debug(f'Adjust process count : {self._processes}')
mp.util.debug(
f"Adjusted process count to {self._max_workers}: "
f"{[(p.name, pid) for pid, p in self._processes.items()]}"
)

def _ensure_executor_running(self):
"""ensures all workers and management thread are running
Expand Down
40 changes: 18 additions & 22 deletions joblib/test/test_memmapping.py
Expand Up @@ -613,29 +613,25 @@ def test_many_parallel_calls_on_same_object(backend):
delayed(return_slice_of_data)(data, 0, 20)
for _ in range(10)
)
slice_of_data = Parallel(
n_jobs=2, max_nbytes=1, backend='{b}')(
delayed(return_slice_of_data)(data, 0, 20)
for _ in range(10)
)
'''.format(b=backend)

for _ in range(3):
env = os.environ.copy()
env['PYTHONPATH'] = os.path.dirname(__file__)
p = subprocess.Popen([sys.executable, '-c', cmd],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE, env=env)
p.wait()
out, err = p.communicate()
assert p.returncode == 0, err
assert out == b''
if sys.version_info[:3] not in [(3, 8, 0), (3, 8, 1)]:
# In early versions of Python 3.8, a reference leak
# https://github.com/cloudpipe/cloudpickle/issues/327, holds
# references to pickled objects, generating race condition during
# cleanup finalizers of joblib and noisy resource_tracker outputs.
assert b'resource_tracker' not in err
env = os.environ.copy()
env['PYTHONPATH'] = os.path.dirname(__file__)
p = subprocess.Popen(
[sys.executable, '-c', cmd],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
env=env,
)
p.wait()
out, err = p.communicate()
assert p.returncode == 0, err
assert out == b''
if sys.version_info[:3] not in [(3, 8, 0), (3, 8, 1)]:
# In early versions of Python 3.8, a reference leak
# https://github.com/cloudpipe/cloudpickle/issues/327, holds
# references to pickled objects, generating race condition during
# cleanup finalizers of joblib and noisy resource_tracker outputs.
assert b'resource_tracker' not in err


@with_numpy
Expand Down

0 comments on commit cea26ff

Please sign in to comment.