diff --git a/CHANGES.rst b/CHANGES.rst index bded2b4dd..1c6c90c2b 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -34,8 +34,9 @@ Development version - Vendor cloudpickle 2.2.0 which adds support for PyPy 3.8+. -- Vendor loky 3.2.0 which fixes a bug with leaking processes in case of - nested loky parallel calls. +- Vendor loky 3.3.0 which fixes a bug with leaking processes in case of + nested loky parallel calls and more reliability spawn the correct + number of reusable workers. Release 1.1.0 -------------- diff --git a/azure-pipelines.yml b/azure-pipelines.yml index a3c1e40bc..3b58a4d3c 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -55,7 +55,7 @@ jobs: # dependencies. imageName: 'ubuntu-latest' PYTHON_VERSION: "3.10" - EXTRA_CONDA_PACKAGES: "numpy=1.22 distributed=2022.2.0" + EXTRA_CONDA_PACKAGES: "numpy=1.23 distributed=2022.2.0" linux_py37_distributed: imageName: 'ubuntu-latest' PYTHON_VERSION: "3.7" @@ -63,7 +63,7 @@ jobs: linux_py310_cython: imageName: 'ubuntu-latest' PYTHON_VERSION: "3.10" - EXTRA_CONDA_PACKAGES: "numpy=1.22" + EXTRA_CONDA_PACKAGES: "numpy=1.23" CYTHON: "true" linux_py37_no_multiprocessing_no_lzma: imageName: 'ubuntu-latest' @@ -78,12 +78,12 @@ jobs: windows_py310: imageName: "windows-latest" PYTHON_VERSION: "3.10" - EXTRA_CONDA_PACKAGES: "numpy=1.22" + EXTRA_CONDA_PACKAGES: "numpy=1.23" macos_py310: imageName: "macos-latest" PYTHON_VERSION: "3.10" - EXTRA_CONDA_PACKAGES: "numpy=1.22" + EXTRA_CONDA_PACKAGES: "numpy=1.23" macos_py37_no_numpy: imageName: "macos-latest" PYTHON_VERSION: "3.7" diff --git a/joblib/externals/loky/__init__.py b/joblib/externals/loky/__init__.py index cd56e26c2..fd2008d78 100644 --- a/joblib/externals/loky/__init__.py +++ b/joblib/externals/loky/__init__.py @@ -29,4 +29,4 @@ "wrap_non_picklable_objects", "set_loky_pickler"] -__version__ = '3.2.0' +__version__ = '3.3.0' diff --git a/joblib/externals/loky/process_executor.py b/joblib/externals/loky/process_executor.py index 62b0a2ea8..4e3e819ce 100644 --- a/joblib/externals/loky/process_executor.py +++ b/joblib/externals/loky/process_executor.py @@ -678,9 +678,12 @@ def process_result_item(self, result_item): with self.processes_management_lock: p = self.processes.pop(result_item, None) - # p can be None is the executor is concurrently shutting down. + # p can be None if the executor is concurrently shutting down. if p is not None: p._worker_exit_lock.release() + mp.util.debug( + f"joining {p.name} when processing {p.pid} as result_item" + ) p.join() del p @@ -699,7 +702,8 @@ def process_result_item(self, result_item): "executor. This can be caused by a too short worker " "timeout or by a memory leak.", UserWarning ) - executor._adjust_process_count() + with executor._processes_management_lock: + executor._adjust_process_count() executor = None else: # Received a _ResultItem so mark the future as completed. @@ -837,15 +841,23 @@ def join_executor_internals(self): self.thread_wakeup.close() # If .join() is not called on the created processes then - # some ctx.Queue methods may deadlock on Mac OS X. - active_processes = list(self.processes.values()) - mp.util.debug(f"joining {len(active_processes)} processes") - for p in active_processes: - mp.util.debug(f"joining process {p.name}") - p.join() + # some ctx.Queue methods may deadlock on macOS. + with self.processes_management_lock: + mp.util.debug(f"joining {len(self.processes)} processes") + n_joined_processes = 0 + while True: + try: + pid, p = self.processes.popitem() + mp.util.debug(f"joining process {p.name} with pid {pid}") + p.join() + n_joined_processes += 1 + except KeyError: + break - mp.util.debug("executor management thread clean shutdown of worker " - f"processes: {active_processes}") + mp.util.debug( + "executor management thread clean shutdown of " + f"{n_joined_processes} workers" + ) def get_n_children_alive(self): # This is an upper bound on the number of children alive. @@ -1082,7 +1094,7 @@ def _start_executor_manager_thread(self): _python_exit) def _adjust_process_count(self): - for _ in range(len(self._processes), self._max_workers): + while len(self._processes) < self._max_workers: worker_exit_lock = self._context.BoundedSemaphore(1) args = (self._call_queue, self._result_queue, self._initializer, self._initargs, self._processes_management_lock, @@ -1098,7 +1110,10 @@ def _adjust_process_count(self): p._worker_exit_lock = worker_exit_lock p.start() self._processes[p.pid] = p - mp.util.debug(f'Adjust process count : {self._processes}') + mp.util.debug( + f"Adjusted process count to {self._max_workers}: " + f"{[(p.name, pid) for pid, p in self._processes.items()]}" + ) def _ensure_executor_running(self): """ensures all workers and management thread are running diff --git a/joblib/test/test_memmapping.py b/joblib/test/test_memmapping.py index 9c495ef2f..bdc825f06 100644 --- a/joblib/test/test_memmapping.py +++ b/joblib/test/test_memmapping.py @@ -613,29 +613,25 @@ def test_many_parallel_calls_on_same_object(backend): delayed(return_slice_of_data)(data, 0, 20) for _ in range(10) ) - slice_of_data = Parallel( - n_jobs=2, max_nbytes=1, backend='{b}')( - delayed(return_slice_of_data)(data, 0, 20) - for _ in range(10) - ) '''.format(b=backend) - - for _ in range(3): - env = os.environ.copy() - env['PYTHONPATH'] = os.path.dirname(__file__) - p = subprocess.Popen([sys.executable, '-c', cmd], - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, env=env) - p.wait() - out, err = p.communicate() - assert p.returncode == 0, err - assert out == b'' - if sys.version_info[:3] not in [(3, 8, 0), (3, 8, 1)]: - # In early versions of Python 3.8, a reference leak - # https://github.com/cloudpipe/cloudpickle/issues/327, holds - # references to pickled objects, generating race condition during - # cleanup finalizers of joblib and noisy resource_tracker outputs. - assert b'resource_tracker' not in err + env = os.environ.copy() + env['PYTHONPATH'] = os.path.dirname(__file__) + p = subprocess.Popen( + [sys.executable, '-c', cmd], + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + env=env, + ) + p.wait() + out, err = p.communicate() + assert p.returncode == 0, err + assert out == b'' + if sys.version_info[:3] not in [(3, 8, 0), (3, 8, 1)]: + # In early versions of Python 3.8, a reference leak + # https://github.com/cloudpipe/cloudpickle/issues/327, holds + # references to pickled objects, generating race condition during + # cleanup finalizers of joblib and noisy resource_tracker outputs. + assert b'resource_tracker' not in err @with_numpy