Skip to content

Commit

Permalink
Vendor loky 3.2.0 (#1333)
Browse files Browse the repository at this point in the history
  • Loading branch information
ogrisel committed Sep 14, 2022
1 parent bdf47e9 commit 4a314b1
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 19 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Expand Up @@ -34,6 +34,9 @@ Development version

- Vendor cloudpickle 2.2.0 which adds support for PyPy 3.8+.

- Vendor loky 3.2.0 which fixes a bug with leaking processes in case of
nested loky parallel calls.

Release 1.1.0
--------------

Expand Down
2 changes: 1 addition & 1 deletion joblib/externals/loky/__init__.py
Expand Up @@ -29,4 +29,4 @@
"wrap_non_picklable_objects", "set_loky_pickler"]


__version__ = '3.1.0'
__version__ = '3.2.0'
10 changes: 8 additions & 2 deletions joblib/externals/loky/backend/synchronize.py
Expand Up @@ -95,8 +95,14 @@ def _after_fork(obj):

@staticmethod
def _cleanup(name):
sem_unlink(name)
resource_tracker.unregister(name, "semlock")
try:
sem_unlink(name)
except FileNotFoundError:
# Already unlinked, possibly by user code: ignore and make sure to
# unregister the semaphore from the resource tracker.
pass
finally:
resource_tracker.unregister(name, "semlock")

def _make_methods(self):
self.acquire = self._semlock.acquire
Expand Down
50 changes: 38 additions & 12 deletions joblib/externals/loky/process_executor.py
Expand Up @@ -68,7 +68,7 @@
import itertools
import traceback
import threading
from time import time
from time import time, sleep
import multiprocessing as mp
from functools import partial
from pickle import PicklingError
Expand Down Expand Up @@ -184,8 +184,9 @@ def _python_exit():
global _global_shutdown
_global_shutdown = True
items = list(_threads_wakeups.items())
mp.util.debug("Interpreter shutting down. Waking up "
f"executor_manager_thread {items}")
if len(items) > 0:
mp.util.debug("Interpreter shutting down. Waking up "
f"executor_manager_thread {items}")
for _, (shutdown_lock, thread_wakeup) in items:
with shutdown_lock:
thread_wakeup.wakeup()
Expand Down Expand Up @@ -360,8 +361,8 @@ def _process_worker(call_queue, result_queue, initializer, initargs,
to by the worker.
initializer: A callable initializer, or None
initargs: A tuple of args for the initializer
process_management_lock: A ctx.Lock avoiding worker timeout while some
workers are being spawned.
processes_management_lock: A ctx.Lock avoiding worker timeout while
some workers are being spawned.
timeout: maximum time to wait for a new item in the call_queue. If that
time is expired, the worker will shutdown.
worker_exit_lock: Lock to avoid flagging the executor as broken on
Expand Down Expand Up @@ -409,11 +410,20 @@ def _process_worker(call_queue, result_queue, initializer, initargs,
mp.util.debug('Exiting with code 1')
sys.exit(1)
if call_item is None:
# Notify queue management thread about clean worker shutdown
# Notify queue management thread about worker shutdown
result_queue.put(pid)
with worker_exit_lock:
is_clean = worker_exit_lock.acquire(True, timeout=30)

# Early notify any loky executor running in this worker process
# (nested parallelism) that this process is about to shutdown to
# avoid a deadlock waiting undifinitely for the worker to finish.
_python_exit()

if is_clean:
mp.util.debug('Exited cleanly')
return
else:
mp.util.info('Main process did not release worker_exit')
return
try:
r = call_item()
except BaseException as e:
Expand Down Expand Up @@ -776,22 +786,36 @@ def shutdown_workers(self):
with self.processes_management_lock:
n_children_to_stop = 0
for p in list(self.processes.values()):
mp.util.debug(f"releasing worker exit lock on {p.name}")
p._worker_exit_lock.release()
n_children_to_stop += 1

mp.util.debug(f"found {n_children_to_stop} processes to stop")

# Send the right number of sentinels, to make sure all children are
# properly terminated. Do it with a mechanism that avoid hanging on
# Full queue when all workers have already been shutdown.
n_sentinels_sent = 0
cooldown_time = 0.001
while (n_sentinels_sent < n_children_to_stop
and self.get_n_children_alive() > 0):
for _ in range(n_children_to_stop - n_sentinels_sent):
try:
self.call_queue.put_nowait(None)
n_sentinels_sent += 1
except queue.Full:
except queue.Full as e:
if cooldown_time > 10.0:
raise e
mp.util.info(
"full call_queue prevented to send all sentinels at "
"once, waiting..."
)
sleep(cooldown_time)
cooldown_time *= 2
break

mp.util.debug(f"sent {n_sentinels_sent} sentinels to the call queue")

def join_executor_internals(self):
self.shutdown_workers()

Expand All @@ -814,12 +838,14 @@ def join_executor_internals(self):

# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
mp.util.debug("joining processes")
for p in list(self.processes.values()):
active_processes = list(self.processes.values())
mp.util.debug(f"joining {len(active_processes)} processes")
for p in active_processes:
mp.util.debug(f"joining process {p.name}")
p.join()

mp.util.debug("executor management thread clean shutdown of worker "
f"processes: {list(self.processes)}")
f"processes: {active_processes}")

def get_n_children_alive(self):
# This is an upper bound on the number of children alive.
Expand Down
5 changes: 1 addition & 4 deletions joblib/test/test_parallel.py
Expand Up @@ -215,10 +215,7 @@ def _assert_warning_nested(backend, inner_n_jobs, expected):
@with_multiprocessing
@parametrize('parent_backend,child_backend,expected', [
('loky', 'multiprocessing', True),
# XXX: loky nested under loky causes pytest 7+ to freeze after tests end
# when using warnings.catch_warnings:
# deadlock happens in loky/process_executor.py", line 193, in _python_exit
# ('loky', 'loky', False),
('loky', 'loky', False),
('multiprocessing', 'multiprocessing', True),
('multiprocessing', 'loky', True),
('threading', 'multiprocessing', True),
Expand Down

0 comments on commit 4a314b1

Please sign in to comment.