From 8695e7f53e0f4c3d6751ca953d79ed544d643f22 Mon Sep 17 00:00:00 2001 From: Olivier Grisel Date: Wed, 23 Feb 2022 10:12:02 +0100 Subject: [PATCH 1/2] Sync loky 3.1.0 --- joblib/externals/loky/__init__.py | 17 +- joblib/externals/loky/_base.py | 611 +----------------- joblib/externals/loky/backend/__init__.py | 14 +- .../loky/backend/_posix_reduction.py | 50 +- joblib/externals/loky/backend/_posix_wait.py | 105 --- .../externals/loky/backend/_win_reduction.py | 133 ++-- joblib/externals/loky/backend/_win_wait.py | 58 -- joblib/externals/loky/backend/compat.py | 41 -- joblib/externals/loky/backend/compat_posix.py | 13 - joblib/externals/loky/backend/compat_win32.py | 46 -- joblib/externals/loky/backend/context.py | 301 ++++----- joblib/externals/loky/backend/fork_exec.py | 16 +- joblib/externals/loky/backend/managers.py | 51 -- .../loky/backend/popen_loky_posix.py | 291 ++++----- .../loky/backend/popen_loky_win32.py | 47 +- joblib/externals/loky/backend/process.py | 62 +- joblib/externals/loky/backend/queues.py | 85 +-- joblib/externals/loky/backend/reduction.py | 129 +--- .../loky/backend/resource_tracker.py | 103 ++- joblib/externals/loky/backend/semlock.py | 274 -------- joblib/externals/loky/backend/spawn.py | 24 +- joblib/externals/loky/backend/synchronize.py | 78 ++- joblib/externals/loky/backend/utils.py | 184 +++--- joblib/externals/loky/cloudpickle_wrapper.py | 21 +- joblib/externals/loky/initializers.py | 10 +- joblib/externals/loky/process_executor.py | 149 ++--- joblib/externals/loky/reusable_executor.py | 49 +- 27 files changed, 689 insertions(+), 2273 deletions(-) delete mode 100644 joblib/externals/loky/backend/_posix_wait.py delete mode 100644 joblib/externals/loky/backend/_win_wait.py delete mode 100644 joblib/externals/loky/backend/compat.py delete mode 100644 joblib/externals/loky/backend/compat_posix.py delete mode 100644 joblib/externals/loky/backend/compat_win32.py delete mode 100644 joblib/externals/loky/backend/managers.py delete mode 100644 joblib/externals/loky/backend/semlock.py diff --git a/joblib/externals/loky/__init__.py b/joblib/externals/loky/__init__.py index d46e59f41..5a26c0335 100644 --- a/joblib/externals/loky/__init__.py +++ b/joblib/externals/loky/__init__.py @@ -3,11 +3,18 @@ :class:`ProcessPoolExecutor` and a function :func:`get_reusable_executor` which hide the pool management under the hood. """ -from ._base import Executor, Future -from ._base import wait, as_completed -from ._base import TimeoutError, CancelledError -from ._base import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION +from concurrent.futures import ( + ALL_COMPLETED, + FIRST_COMPLETED, + FIRST_EXCEPTION, + CancelledError, + Executor, + TimeoutError, + as_completed, + wait, +) +from ._base import Future from .backend.context import cpu_count from .backend.reduction import set_loky_pickler from .reusable_executor import get_reusable_executor @@ -22,4 +29,4 @@ "wrap_non_picklable_objects", "set_loky_pickler"] -__version__ = '3.0.0' +__version__ = '3.1.0' diff --git a/joblib/externals/loky/_base.py b/joblib/externals/loky/_base.py index 92422bbf3..cd8f34100 100644 --- a/joblib/externals/loky/_base.py +++ b/joblib/externals/loky/_base.py @@ -1,5 +1,5 @@ ############################################################################### -# Backport concurrent.futures for python2.7/3.3 +# Modification of concurrent.futures.Future # # author: Thomas Moreau and Olivier Grisel # @@ -10,618 +10,19 @@ # Copyright 2009 Brian Quinlan. All Rights Reserved. # Licensed to PSF under a Contributor Agreement. -import sys -import time -import logging -import threading -import collections - - -if sys.version_info[:2] >= (3, 3): - - from concurrent.futures import wait, as_completed - from concurrent.futures import TimeoutError, CancelledError - from concurrent.futures import Executor, Future as _BaseFuture - - from concurrent.futures import FIRST_EXCEPTION - from concurrent.futures import ALL_COMPLETED, FIRST_COMPLETED - - from concurrent.futures._base import LOGGER - from concurrent.futures._base import PENDING, RUNNING, CANCELLED - from concurrent.futures._base import CANCELLED_AND_NOTIFIED, FINISHED -else: - - FIRST_COMPLETED = 'FIRST_COMPLETED' - FIRST_EXCEPTION = 'FIRST_EXCEPTION' - ALL_COMPLETED = 'ALL_COMPLETED' - _AS_COMPLETED = '_AS_COMPLETED' - - # Possible future states (for internal use by the futures package). - PENDING = 'PENDING' - RUNNING = 'RUNNING' - # The future was cancelled by the user... - CANCELLED = 'CANCELLED' - # ...and _Waiter.add_cancelled() was called by a worker. - CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' - FINISHED = 'FINISHED' - - _FUTURE_STATES = [ - PENDING, - RUNNING, - CANCELLED, - CANCELLED_AND_NOTIFIED, - FINISHED - ] - - _STATE_TO_DESCRIPTION_MAP = { - PENDING: "pending", - RUNNING: "running", - CANCELLED: "cancelled", - CANCELLED_AND_NOTIFIED: "cancelled", - FINISHED: "finished" - } - - # Logger for internal use by the futures package. - LOGGER = logging.getLogger("concurrent.futures") - - class Error(Exception): - """Base class for all future-related exceptions.""" - pass - - class CancelledError(Error): - """The Future was cancelled.""" - pass - - class TimeoutError(Error): - """The operation exceeded the given deadline.""" - pass - - class _Waiter(object): - """Provides the event that wait() and as_completed() block on.""" - def __init__(self): - self.event = threading.Event() - self.finished_futures = [] - - def add_result(self, future): - self.finished_futures.append(future) - - def add_exception(self, future): - self.finished_futures.append(future) - - def add_cancelled(self, future): - self.finished_futures.append(future) - - class _AsCompletedWaiter(_Waiter): - """Used by as_completed().""" - - def __init__(self): - super(_AsCompletedWaiter, self).__init__() - self.lock = threading.Lock() - - def add_result(self, future): - with self.lock: - super(_AsCompletedWaiter, self).add_result(future) - self.event.set() - - def add_exception(self, future): - with self.lock: - super(_AsCompletedWaiter, self).add_exception(future) - self.event.set() - - def add_cancelled(self, future): - with self.lock: - super(_AsCompletedWaiter, self).add_cancelled(future) - self.event.set() - - class _FirstCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_COMPLETED).""" - - def add_result(self, future): - super(_FirstCompletedWaiter, self).add_result(future) - self.event.set() - - def add_exception(self, future): - super(_FirstCompletedWaiter, self).add_exception(future) - self.event.set() - - def add_cancelled(self, future): - super(_FirstCompletedWaiter, self).add_cancelled(future) - self.event.set() - - class _AllCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" - - def __init__(self, num_pending_calls, stop_on_exception): - self.num_pending_calls = num_pending_calls - self.stop_on_exception = stop_on_exception - self.lock = threading.Lock() - super(_AllCompletedWaiter, self).__init__() - - def _decrement_pending_calls(self): - with self.lock: - self.num_pending_calls -= 1 - if not self.num_pending_calls: - self.event.set() - - def add_result(self, future): - super(_AllCompletedWaiter, self).add_result(future) - self._decrement_pending_calls() - - def add_exception(self, future): - super(_AllCompletedWaiter, self).add_exception(future) - if self.stop_on_exception: - self.event.set() - else: - self._decrement_pending_calls() - - def add_cancelled(self, future): - super(_AllCompletedWaiter, self).add_cancelled(future) - self._decrement_pending_calls() - - class _AcquireFutures(object): - """A context manager that does an ordered acquire of Future conditions. - """ - - def __init__(self, futures): - self.futures = sorted(futures, key=id) - - def __enter__(self): - for future in self.futures: - future._condition.acquire() - - def __exit__(self, *args): - for future in self.futures: - future._condition.release() - - def _create_and_install_waiters(fs, return_when): - if return_when == _AS_COMPLETED: - waiter = _AsCompletedWaiter() - elif return_when == FIRST_COMPLETED: - waiter = _FirstCompletedWaiter() - else: - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] - for f in fs) - - if return_when == FIRST_EXCEPTION: - waiter = _AllCompletedWaiter(pending_count, - stop_on_exception=True) - elif return_when == ALL_COMPLETED: - waiter = _AllCompletedWaiter(pending_count, - stop_on_exception=False) - else: - raise ValueError("Invalid return condition: %r" % return_when) - - for f in fs: - f._waiters.append(waiter) - - return waiter - - def as_completed(fs, timeout=None): - """An iterator over the given futures that yields each as it completes. - - Args: - fs: The sequence of Futures (possibly created by different - Executors) to iterate over. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator that yields the given Futures as they complete - (finished or cancelled). If any given Futures are duplicated, they - will be returned once. - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - """ - if timeout is not None: - end_time = timeout + time.time() - - fs = set(fs) - with _AcquireFutures(fs): - finished = set( - f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - pending = fs - finished - waiter = _create_and_install_waiters(fs, _AS_COMPLETED) - - try: - for future in finished: - yield future - - while pending: - if timeout is None: - wait_timeout = None - else: - wait_timeout = end_time - time.time() - if wait_timeout < 0: - raise TimeoutError('%d (of %d) futures unfinished' % ( - len(pending), len(fs))) - - waiter.event.wait(wait_timeout) - - with waiter.lock: - finished = waiter.finished_futures - waiter.finished_futures = [] - waiter.event.clear() - - for future in finished: - yield future - pending.remove(future) - - finally: - for f in fs: - with f._condition: - f._waiters.remove(waiter) - - DoneAndNotDoneFutures = collections.namedtuple( - 'DoneAndNotDoneFutures', 'done not_done') - - def wait(fs, timeout=None, return_when=ALL_COMPLETED): - """Wait for the futures in the given sequence to complete. - - Args: - fs: The sequence of Futures (possibly created by different - Executors) to wait upon. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when this function should return. The - options are: - - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises an exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are - cancelled. - - Returns: - A named 2-tuple of sets. The first set, named 'done', contains the - futures that completed (is finished or cancelled) before the wait - completed. The second set, named 'not_done', contains uncompleted - futures. - """ - with _AcquireFutures(fs): - done = set(f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - not_done = set(fs) - done - - if (return_when == FIRST_COMPLETED) and done: - return DoneAndNotDoneFutures(done, not_done) - elif (return_when == FIRST_EXCEPTION) and done: - if any(f for f in done - if not f.cancelled() and f.exception() is not None): - return DoneAndNotDoneFutures(done, not_done) - - if len(done) == len(fs): - return DoneAndNotDoneFutures(done, not_done) - - waiter = _create_and_install_waiters(fs, return_when) - - waiter.event.wait(timeout) - for f in fs: - with f._condition: - f._waiters.remove(waiter) - - done.update(waiter.finished_futures) - return DoneAndNotDoneFutures(done, set(fs) - done) - - class _BaseFuture(object): - """Represents the result of an asynchronous computation.""" - - def __init__(self): - """Initializes the future. Should not be called by clients.""" - self._condition = threading.Condition() - self._state = PENDING - self._result = None - self._exception = None - self._waiters = [] - self._done_callbacks = [] - - def __repr__(self): - with self._condition: - if self._state == FINISHED: - if self._exception: - return '<%s at %#x state=%s raised %s>' % ( - self.__class__.__name__, - id(self), - _STATE_TO_DESCRIPTION_MAP[self._state], - self._exception.__class__.__name__) - else: - return '<%s at %#x state=%s returned %s>' % ( - self.__class__.__name__, - id(self), - _STATE_TO_DESCRIPTION_MAP[self._state], - self._result.__class__.__name__) - return '<%s at %#x state=%s>' % ( - self.__class__.__name__, - id(self), - _STATE_TO_DESCRIPTION_MAP[self._state]) - - def cancel(self): - """Cancel the future if possible. - - Returns True if the future was cancelled, False otherwise. A future - cannot be cancelled if it is running or has already completed. - """ - with self._condition: - if self._state in [RUNNING, FINISHED]: - return False - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - return True - - self._state = CANCELLED - self._condition.notify_all() - - self._invoke_callbacks() - return True - - def cancelled(self): - """Return True if the future was cancelled.""" - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] - - def running(self): - """Return True if the future is currently executing.""" - with self._condition: - return self._state == RUNNING - - def done(self): - """Return True of the future was cancelled or finished executing. - """ - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, - FINISHED] - - def __get_result(self): - if self._exception: - raise self._exception - else: - return self._result - - def add_done_callback(self, fn): - """Attaches a callable that will be called when the future finishes. - - Args: - fn: A callable that will be called with this future as its only - argument when the future completes or is cancelled. The - callable will always be called by a thread in the same - process in which it was added. If the future has already - completed or been cancelled then the callable will be - called immediately. These callables are called in the order - that they were added. - """ - with self._condition: - if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, - FINISHED]: - self._done_callbacks.append(fn) - return - fn(self) - - def result(self, timeout=None): - """Return the result of the call that the future represents. - - Args: - timeout: The number of seconds to wait for the result if the - future isn't done. If None, then there is no limit on the - wait time. - - Returns: - The result of the call that the future represents. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the - given timeout. - Exception: If the call raised then that exception will be - raised. - """ - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - else: - raise TimeoutError() - - def exception(self, timeout=None): - """Return the exception raised by the call that the future - represents. - - Args: - timeout: The number of seconds to wait for the exception if the - future isn't done. If None, then there is no limit on the - wait time. - - Returns: - The exception raised by the call that the future represents or - None if the call completed without raising. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the - given timeout. - """ - - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - else: - raise TimeoutError() - - # The following methods should only be used by Executors and in tests. - def set_running_or_notify_cancel(self): - """Mark the future as running or process any cancel notifications. - - Should only be used by Executor implementations and unit tests. - - If the future has been cancelled (cancel() was called and returned - True) then any threads waiting on the future completing (though - calls to as_completed() or wait()) are notified and False is - returned. - - If the future was not cancelled then it is put in the running state - (future calls to running() will return True) and True is returned. - - This method should be called by Executor implementations before - executing the work associated with this future. If this method - returns False then the work should not be executed. - - Returns: - False if the Future was cancelled, True otherwise. - - Raises: - RuntimeError: if this method was already called or if - set_result() or set_exception() was called. - """ - with self._condition: - if self._state == CANCELLED: - self._state = CANCELLED_AND_NOTIFIED - for waiter in self._waiters: - waiter.add_cancelled(self) - # self._condition.notify_all() is not necessary because - # self.cancel() triggers a notification. - return False - elif self._state == PENDING: - self._state = RUNNING - return True - else: - LOGGER.critical('Future %s in unexpected state: %s', - id(self), - self._state) - raise RuntimeError('Future in unexpected state') - - def set_result(self, result): - """Sets the return value of work associated with the future. - - Should only be used by Executor implementations and unit tests. - """ - with self._condition: - self._result = result - self._state = FINISHED - for waiter in self._waiters: - waiter.add_result(self) - self._condition.notify_all() - self._invoke_callbacks() - - def set_exception(self, exception): - """Sets the result of the future as being the given exception. - - Should only be used by Executor implementations and unit tests. - """ - with self._condition: - self._exception = exception - self._state = FINISHED - for waiter in self._waiters: - waiter.add_exception(self) - self._condition.notify_all() - self._invoke_callbacks() - - class Executor(object): - """This is an abstract base class for concrete asynchronous executors. - """ - - def submit(self, fn, *args, **kwargs): - """Submits a callable to be executed with the given arguments. - - Schedules the callable to be executed as fn(*args, **kwargs) and - returns a Future instance representing the execution of the - callable. - - Returns: - A Future representing the given call. - """ - raise NotImplementedError() - - def map(self, fn, *iterables, **kwargs): - """Returns an iterator equivalent to map(fn, iter). - - Args: - fn: A callable that will take as many arguments as there are - passed iterables. - timeout: The maximum number of seconds to wait. If None, then - there is no limit on the wait time. - chunksize: The size of the chunks the iterable will be broken - into before being passed to a child process. This argument - is only used by ProcessPoolExecutor; it is ignored by - ThreadPoolExecutor. - - Returns: - An iterator equivalent to: map(func, *iterables) but the calls - may be evaluated out-of-order. - - Raises: - TimeoutError: If the entire result iterator could not be - generated before the given timeout. - Exception: If fn(*args) raises for any values. - """ - timeout = kwargs.get('timeout') - if timeout is not None: - end_time = timeout + time.time() - - fs = [self.submit(fn, *args) for args in zip(*iterables)] - - # Yield must be hidden in closure so that the futures are submitted - # before the first iterator value is required. - def result_iterator(): - try: - for future in fs: - if timeout is None: - yield future.result() - else: - yield future.result(end_time - time.time()) - finally: - for future in fs: - future.cancel() - return result_iterator() - - def shutdown(self, wait=True): - """Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by - the executor have been reclaimed. - """ - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown(wait=True) - return False +from concurrent.futures import Future as _BaseFuture +from concurrent.futures._base import LOGGER # To make loky._base.Future instances awaitable by concurrent.futures.wait, # derive our custom Future class from _BaseFuture. _invoke_callback is the only # modification made to this class in loky. +# TODO investigate why using `concurrent.futures.Future` directly does not +# always work in our test suite. class Future(_BaseFuture): def _invoke_callbacks(self): for callback in self._done_callbacks: try: callback(self) except BaseException: - LOGGER.exception('exception calling callback for %r', self) + LOGGER.exception(f'exception calling callback for {self!r}') diff --git a/joblib/externals/loky/backend/__init__.py b/joblib/externals/loky/backend/__init__.py index a65ce0e8b..c31023cc5 100644 --- a/joblib/externals/loky/backend/__init__.py +++ b/joblib/externals/loky/backend/__init__.py @@ -1,16 +1,14 @@ import os -import sys +from multiprocessing import synchronize from .context import get_context -if sys.version_info > (3, 4): - def _make_name(): - name = '/loky-%i-%s' % (os.getpid(), next(synchronize.SemLock._rand)) - return name +def _make_name(): + return f'/loky-{os.getpid()}-{next(synchronize.SemLock._rand)}' - # monkey patch the name creation for multiprocessing - from multiprocessing import synchronize - synchronize.SemLock._make_name = staticmethod(_make_name) + +# monkey patch the name creation for multiprocessing +synchronize.SemLock._make_name = staticmethod(_make_name) __all__ = ["get_context"] diff --git a/joblib/externals/loky/backend/_posix_reduction.py b/joblib/externals/loky/backend/_posix_reduction.py index e0e394d3c..e9f34ed56 100644 --- a/joblib/externals/loky/backend/_posix_reduction.py +++ b/joblib/externals/loky/backend/_posix_reduction.py @@ -7,18 +7,12 @@ # * Add adapted reduction for LokyProcesses and socket/Connection # import os -import sys import socket import _socket +from multiprocessing.connection import Connection +from multiprocessing.context import get_spawning_popen from .reduction import register -from .context import get_spawning_popen - -if sys.version_info >= (3, 3): - from multiprocessing.connection import Connection -else: - from _multiprocessing import Connection - HAVE_SEND_HANDLE = (hasattr(socket, 'CMSG_LEN') and hasattr(socket, 'SCM_RIGHTS') and @@ -26,8 +20,7 @@ def _mk_inheritable(fd): - if sys.version_info[:2] > (3, 3): - os.set_inheritable(fd, True) + os.set_inheritable(fd, True) return fd @@ -36,7 +29,7 @@ def DupFd(fd): popen_obj = get_spawning_popen() if popen_obj is not None: return popen_obj.DupFd(popen_obj.duplicate_for_child(fd)) - elif HAVE_SEND_HANDLE and sys.version_info[:2] > (3, 3): + elif HAVE_SEND_HANDLE: from multiprocessing import resource_sharer return resource_sharer.DupFd(fd) else: @@ -46,31 +39,26 @@ def DupFd(fd): ) -if sys.version_info[:2] != (3, 3): - def _reduce_socket(s): - df = DupFd(s.fileno()) - return _rebuild_socket, (df, s.family, s.type, s.proto) +def _reduce_socket(s): + df = DupFd(s.fileno()) + return _rebuild_socket, (df, s.family, s.type, s.proto) - def _rebuild_socket(df, family, type, proto): - fd = df.detach() - return socket.fromfd(fd, family, type, proto) -else: - from multiprocessing.reduction import reduce_socket as _reduce_socket +def _rebuild_socket(df, family, type, proto): + fd = df.detach() + return socket.fromfd(fd, family, type, proto) -register(socket.socket, _reduce_socket) -register(_socket.socket, _reduce_socket) +def rebuild_connection(df, readable, writable): + fd = df.detach() + return Connection(fd, readable, writable) -if sys.version_info[:2] != (3, 3): - def reduce_connection(conn): - df = DupFd(conn.fileno()) - return rebuild_connection, (df, conn.readable, conn.writable) - def rebuild_connection(df, readable, writable): - fd = df.detach() - return Connection(fd, readable, writable) -else: - from multiprocessing.reduction import reduce_connection +def reduce_connection(conn): + df = DupFd(conn.fileno()) + return rebuild_connection, (df, conn.readable, conn.writable) + +register(socket.socket, _reduce_socket) +register(_socket.socket, _reduce_socket) register(Connection, reduce_connection) diff --git a/joblib/externals/loky/backend/_posix_wait.py b/joblib/externals/loky/backend/_posix_wait.py deleted file mode 100644 index d935882dc..000000000 --- a/joblib/externals/loky/backend/_posix_wait.py +++ /dev/null @@ -1,105 +0,0 @@ -############################################################################### -# Compat for wait function on UNIX based system -# -# author: Thomas Moreau and Olivier Grisel -# -# adapted from multiprocessing/connection.py (17/02/2017) -# * Backport wait function to python2.7 -# - -import platform -import select -import socket -import errno -SYSTEM = platform.system() - -try: - import ctypes -except ImportError: # pragma: no cover - ctypes = None # noqa - -if SYSTEM == 'Darwin' and ctypes is not None: - from ctypes.util import find_library - libSystem = ctypes.CDLL(find_library('libSystem.dylib')) - CoreServices = ctypes.CDLL(find_library('CoreServices'), - use_errno=True) - mach_absolute_time = libSystem.mach_absolute_time - mach_absolute_time.restype = ctypes.c_uint64 - absolute_to_nanoseconds = CoreServices.AbsoluteToNanoseconds - absolute_to_nanoseconds.restype = ctypes.c_uint64 - absolute_to_nanoseconds.argtypes = [ctypes.c_uint64] - - def monotonic(): - return absolute_to_nanoseconds(mach_absolute_time()) * 1e-9 - -elif SYSTEM == 'Linux' and ctypes is not None: - # from stackoverflow: - # questions/1205722/how-do-i-get-monotonic-time-durations-in-python - import ctypes - import os - - CLOCK_MONOTONIC = 1 # see - - class timespec(ctypes.Structure): - _fields_ = [ - ('tv_sec', ctypes.c_long), - ('tv_nsec', ctypes.c_long), - ] - - librt = ctypes.CDLL('librt.so.1', use_errno=True) - clock_gettime = librt.clock_gettime - clock_gettime.argtypes = [ - ctypes.c_int, ctypes.POINTER(timespec), - ] - - def monotonic(): # noqa - t = timespec() - if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(t)) != 0: - errno_ = ctypes.get_errno() - raise OSError(errno_, os.strerror(errno_)) - return t.tv_sec + t.tv_nsec * 1e-9 -else: # pragma: no cover - from time import time as monotonic - - -if hasattr(select, 'poll'): - def _poll(fds, timeout): - if timeout is not None: - timeout = int(timeout * 1000) # timeout is in milliseconds - fd_map = {} - pollster = select.poll() - for fd in fds: - pollster.register(fd, select.POLLIN) - if hasattr(fd, 'fileno'): - fd_map[fd.fileno()] = fd - else: - fd_map[fd] = fd - ls = [] - for fd, event in pollster.poll(timeout): - if event & select.POLLNVAL: # pragma: no cover - raise ValueError('invalid file descriptor %i' % fd) - ls.append(fd_map[fd]) - return ls -else: - def _poll(fds, timeout): - return select.select(fds, [], [], timeout)[0] - - -def wait(object_list, timeout=None): - ''' - Wait till an object in object_list is ready/readable. - Returns list of those objects which are ready/readable. - ''' - if timeout is not None: - if timeout <= 0: - return _poll(object_list, 0) - else: - deadline = monotonic() + timeout - while True: - try: - return _poll(object_list, timeout) - except (OSError, IOError, socket.error) as e: # pragma: no cover - if e.errno != errno.EINTR: - raise - if timeout is not None: - timeout = deadline - monotonic() diff --git a/joblib/externals/loky/backend/_win_reduction.py b/joblib/externals/loky/backend/_win_reduction.py index 142e6e7c8..7f50c9f61 100644 --- a/joblib/externals/loky/backend/_win_reduction.py +++ b/joblib/externals/loky/backend/_win_reduction.py @@ -7,93 +7,54 @@ # * Add adapted reduction for LokyProcesses and socket/PipeConnection # import os -import sys import socket -from .reduction import register - - -if sys.platform == 'win32': - if sys.version_info[:2] < (3, 3): - from _multiprocessing import PipeConnection - else: - import _winapi - from multiprocessing.connection import PipeConnection - - -if sys.version_info[:2] >= (3, 4) and sys.platform == 'win32': - class DupHandle(object): - def __init__(self, handle, access, pid=None): - # duplicate handle for process with given pid - if pid is None: - pid = os.getpid() - proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) - try: - self._handle = _winapi.DuplicateHandle( - _winapi.GetCurrentProcess(), - handle, proc, access, False, 0) - finally: - _winapi.CloseHandle(proc) - self._access = access - self._pid = pid - - def detach(self): - # retrieve handle from process which currently owns it - if self._pid == os.getpid(): - return self._handle - proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, - self._pid) - try: - return _winapi.DuplicateHandle( - proc, self._handle, _winapi.GetCurrentProcess(), - self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) - finally: - _winapi.CloseHandle(proc) - - def reduce_pipe_connection(conn): - access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | - (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) - dh = DupHandle(conn.fileno(), access) - return rebuild_pipe_connection, (dh, conn.readable, conn.writable) +import _winapi +from multiprocessing.connection import PipeConnection +from multiprocessing.reduction import _reduce_socket - def rebuild_pipe_connection(dh, readable, writable): - from multiprocessing.connection import PipeConnection - handle = dh.detach() - return PipeConnection(handle, readable, writable) - register(PipeConnection, reduce_pipe_connection) - -elif sys.platform == 'win32': - # Older Python versions - from multiprocessing.reduction import reduce_pipe_connection - register(PipeConnection, reduce_pipe_connection) - - -if sys.version_info[:2] < (3, 3) and sys.platform == 'win32': - from _multiprocessing import win32 - from multiprocessing.reduction import reduce_handle, rebuild_handle - close = win32.CloseHandle - - def fromfd(handle, family, type_, proto=0): - s = socket.socket(family, type_, proto, fileno=handle) - if s.__class__ is not socket.socket: - s = socket.socket(_sock=s) - return s - - def reduce_socket(s): - if not hasattr(socket, "fromfd"): - raise TypeError("sockets cannot be pickled on this system.") - reduced_handle = reduce_handle(s.fileno()) - return _rebuild_socket, (reduced_handle, s.family, s.type, s.proto) +from .reduction import register - def _rebuild_socket(reduced_handle, family, type_, proto): - handle = rebuild_handle(reduced_handle) - s = fromfd(handle, family, type_, proto) - close(handle) - return s - register(socket.socket, reduce_socket) -elif sys.version_info[:2] < (3, 4): - from multiprocessing.reduction import reduce_socket - register(socket.socket, reduce_socket) -else: - from multiprocessing.reduction import _reduce_socket - register(socket.socket, _reduce_socket) +class DupHandle: + def __init__(self, handle, access, pid=None): + # duplicate handle for process with given pid + if pid is None: + pid = os.getpid() + proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) + try: + self._handle = _winapi.DuplicateHandle( + _winapi.GetCurrentProcess(), + handle, proc, access, False, 0) + finally: + _winapi.CloseHandle(proc) + self._access = access + self._pid = pid + + def detach(self): + # retrieve handle from process which currently owns it + if self._pid == os.getpid(): + return self._handle + proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, + self._pid) + try: + return _winapi.DuplicateHandle( + proc, self._handle, _winapi.GetCurrentProcess(), + self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) + finally: + _winapi.CloseHandle(proc) + + +def rebuild_pipe_connection(dh, readable, writable): + handle = dh.detach() + return PipeConnection(handle, readable, writable) + + +def reduce_pipe_connection(conn): + access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | + (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) + dh = DupHandle(conn.fileno(), access) + return rebuild_pipe_connection, (dh, conn.readable, conn.writable) + + +register(PipeConnection, reduce_pipe_connection) +register(socket.socket, _reduce_socket) diff --git a/joblib/externals/loky/backend/_win_wait.py b/joblib/externals/loky/backend/_win_wait.py deleted file mode 100644 index 73271316d..000000000 --- a/joblib/externals/loky/backend/_win_wait.py +++ /dev/null @@ -1,58 +0,0 @@ -############################################################################### -# Compat for wait function on Windows system -# -# author: Thomas Moreau and Olivier Grisel -# -# adapted from multiprocessing/connection.py (17/02/2017) -# * Backport wait function to python2.7 -# - -import ctypes -import sys -from time import sleep - - -if sys.platform == 'win32' and sys.version_info[:2] < (3, 3): - from _subprocess import WaitForSingleObject, WAIT_OBJECT_0 - - try: - from time import monotonic - except ImportError: - # Backward old for crappy old Python that did not have cross-platform - # monotonic clock by default. - - # TODO: do we want to add support for cygwin at some point? See: - # https://github.com/atdt/monotonic/blob/master/monotonic.py - GetTickCount64 = ctypes.windll.kernel32.GetTickCount64 - GetTickCount64.restype = ctypes.c_ulonglong - - def monotonic(): - """Monotonic clock, cannot go backward.""" - return GetTickCount64() / 1000.0 - - def wait(handles, timeout=None): - """Backward compat for python2.7 - - This function wait for either: - * one connection is ready for read, - * one process handle has exited or got killed, - * timeout is reached. Note that this function has a precision of 2 - msec. - """ - if timeout is not None: - deadline = monotonic() + timeout - - while True: - # We cannot use select as in windows it only support sockets - ready = [] - for h in handles: - if type(h) in [int, long]: - if WaitForSingleObject(h, 0) == WAIT_OBJECT_0: - ready += [h] - elif h.poll(0): - ready.append(h) - if len(ready) > 0: - return ready - sleep(.001) - if timeout is not None and deadline - monotonic() <= 0: - return [] diff --git a/joblib/externals/loky/backend/compat.py b/joblib/externals/loky/backend/compat.py deleted file mode 100644 index aa406c6cf..000000000 --- a/joblib/externals/loky/backend/compat.py +++ /dev/null @@ -1,41 +0,0 @@ -############################################################################### -# Compat file to import the correct modules for each platform and python -# version. -# -# author: Thomas Moreau and Olivier grisel -# -import sys - -PY3 = sys.version_info[:2] >= (3, 3) - -if PY3: - import queue -else: - import Queue as queue - -if sys.version_info >= (3, 4): - from multiprocessing.process import BaseProcess -else: - from multiprocessing.process import Process as BaseProcess - -# Platform specific compat -if sys.platform == "win32": - from .compat_win32 import wait -else: - from .compat_posix import wait - - -def set_cause(exc, cause): - exc.__cause__ = cause - - if not PY3: - # Preformat message here. - if exc.__cause__ is not None: - exc.args = ("{}\n\nThis was caused directly by {}".format( - exc.args if len(exc.args) != 1 else exc.args[0], - str(exc.__cause__)),) - - return exc - - -__all__ = ["queue", "BaseProcess", "set_cause", "wait"] diff --git a/joblib/externals/loky/backend/compat_posix.py b/joblib/externals/loky/backend/compat_posix.py deleted file mode 100644 index c8e4e4a43..000000000 --- a/joblib/externals/loky/backend/compat_posix.py +++ /dev/null @@ -1,13 +0,0 @@ -# flake8: noqa -############################################################################### -# Compat file to load the correct wait function -# -# author: Thomas Moreau and Olivier grisel -# -import sys - -# Compat wait -if sys.version_info < (3, 3): - from ._posix_wait import wait -else: - from multiprocessing.connection import wait diff --git a/joblib/externals/loky/backend/compat_win32.py b/joblib/externals/loky/backend/compat_win32.py deleted file mode 100644 index 5df15f55f..000000000 --- a/joblib/externals/loky/backend/compat_win32.py +++ /dev/null @@ -1,46 +0,0 @@ -# flake8: noqa: F401 -import sys -import numbers - -if sys.platform == "win32": - # Avoid import error by code introspection tools such as test runners - # trying to import this module while running on non-Windows systems. - - # Compat Popen - if sys.version_info[:2] >= (3, 4): - from multiprocessing.popen_spawn_win32 import Popen - else: - from multiprocessing.forking import Popen - - # wait compat - if sys.version_info[:2] < (3, 3): - from ._win_wait import wait - else: - from multiprocessing.connection import wait - - # Compat _winapi - if sys.version_info[:2] >= (3, 4): - import _winapi - else: - import os - import msvcrt - if sys.version_info[:2] < (3, 3): - import _subprocess as win_api - from _multiprocessing import win32 - else: - import _winapi as win_api - - class _winapi: - CreateProcess = win_api.CreateProcess - - @staticmethod - def CloseHandle(h): - if isinstance(h, numbers.Integral): - # Cast long to int for 64-bit Python 2.7 under Windows - h = int(h) - if sys.version_info[:2] < (3, 3): - if not isinstance(h, int): - h = h.Detach() - win32.CloseHandle(h) - else: - win_api.CloseHandle(h) diff --git a/joblib/externals/loky/backend/context.py b/joblib/externals/loky/backend/context.py index 76f6520d3..7e551688b 100644 --- a/joblib/externals/loky/backend/context.py +++ b/joblib/externals/loky/backend/context.py @@ -1,6 +1,5 @@ ############################################################################### -# Basic context management with LokyContext and provides -# compat for UNIX 2.7 and 3.3 +# Basic context management with LokyContext # # author: Thomas Moreau and Olivier Grisel # @@ -8,88 +7,46 @@ # * Create a context ensuring loky uses only objects that are compatible # * Add LokyContext to the list of context of multiprocessing so loky can be # used with multiprocessing.set_start_method -# * Add some compat function for python2.7 and 3.3. +# * Implement a CFS-aware amd physical-core aware cpu_count function. # -from __future__ import division - import os import sys +import math import subprocess import traceback import warnings import multiprocessing as mp - +from multiprocessing import get_context as mp_get_context +from multiprocessing.context import BaseContext from .process import LokyProcess, LokyInitMainProcess -START_METHODS = ['loky', 'loky_init_main'] +START_METHODS = ['loky', 'loky_init_main', 'spawn'] +if sys.platform != 'win32': + START_METHODS += ['fork', 'forkserver'] + _DEFAULT_START_METHOD = None # Cache for the number of physical cores to avoid repeating subprocess calls. # It should not change during the lifetime of the program. physical_cores_cache = None -if sys.version_info[:2] >= (3, 4): - from multiprocessing import get_context as mp_get_context - from multiprocessing.context import assert_spawning, set_spawning_popen - from multiprocessing.context import get_spawning_popen, BaseContext - - START_METHODS += ['spawn'] - if sys.platform != 'win32': - START_METHODS += ['fork', 'forkserver'] - - def get_context(method=None): - # Try to overload the default context - method = method or _DEFAULT_START_METHOD or "loky" - if method == "fork": - # If 'fork' is explicitly requested, warn user about potential - # issues. - warnings.warn("`fork` start method should not be used with " - "`loky` as it does not respect POSIX. Try using " - "`spawn` or `loky` instead.", UserWarning) - try: - context = mp_get_context(method) - except ValueError: - raise ValueError("Unknown context '{}'. Value should be in {}." - .format(method, START_METHODS)) - - return context - -else: - if sys.platform != 'win32': - import threading - # Mechanism to check that the current thread is spawning a process - _tls = threading.local() - popen_attr = 'spawning_popen' - else: - from multiprocessing.forking import Popen - _tls = Popen._tls - popen_attr = 'process_handle' - - BaseContext = object - - def get_spawning_popen(): - return getattr(_tls, popen_attr, None) - - def set_spawning_popen(popen): - setattr(_tls, popen_attr, popen) - - def assert_spawning(obj): - if get_spawning_popen() is None: - raise RuntimeError( - '%s objects should only be shared between processes' - ' through inheritance' % type(obj).__name__ - ) - - def get_context(method=None): - method = method or _DEFAULT_START_METHOD or 'loky' - if method == "loky": - return LokyContext() - elif method == "loky_init_main": - return LokyInitMainContext() - else: - raise ValueError("Unknown context '{}'. Value should be in {}." - .format(method, START_METHODS)) + +def get_context(method=None): + # Try to overload the default context + method = method or _DEFAULT_START_METHOD or "loky" + if method == "fork": + # If 'fork' is explicitly requested, warn user about potential issues. + warnings.warn("`fork` start method should not be used with " + "`loky` as it does not respect POSIX. Try using " + "`spawn` or `loky` instead.", UserWarning) + try: + return mp_get_context(method) + except ValueError: + raise ValueError( + f"Unknown context '{method}'. Value should be in " + f"{START_METHODS}." + ) def set_start_method(method, force=False): @@ -97,8 +54,9 @@ def set_start_method(method, force=False): if _DEFAULT_START_METHOD is not None and not force: raise RuntimeError('context has already been set') assert method is None or method in START_METHODS, ( - "'{}' is not a valid start_method. It should be in {}" - .format(method, START_METHODS)) + f"'{method}' is not a valid start_method. It should be in " + f"{START_METHODS}" + ) _DEFAULT_START_METHOD = method @@ -114,8 +72,8 @@ def cpu_count(only_physical_cores=False): * the number of CPUs in the system, as given by ``multiprocessing.cpu_count``; * the CPU affinity settings of the current process - (available with Python 3.4+ on some Unix systems); - * CFS scheduler CPU bandwidth limit (available on Linux only, typically + (available on some Unix systems); + * Cgroup CPU bandwidth limit (available on Linux only, typically set by docker and similar container orchestration systems); * the value of the LOKY_MAX_CPU_COUNT environment variable if defined. and is given as the minimum of these constraints. @@ -123,81 +81,95 @@ def cpu_count(only_physical_cores=False): If ``only_physical_cores`` is True, return the number of physical cores instead of the number of logical cores (hyperthreading / SMT). Note that this option is not enforced if the number of usable cores is controlled in - any other way such as: process affinity, restricting CFS scheduler policy + any other way such as: process affinity, Cgroup restricted CPU bandwidth or the LOKY_MAX_CPU_COUNT environment variable. If the number of physical cores is not found, return the number of logical cores. - + It is also always larger or equal to 1. """ - # TODO: use os.cpu_count when dropping python 2 support - try: - cpu_count_mp = mp.cpu_count() - except NotImplementedError: - cpu_count_mp = 1 - - cpu_count_user = _cpu_count_user(cpu_count_mp) - aggregate_cpu_count = min(cpu_count_mp, cpu_count_user) - - if only_physical_cores: - cpu_count_physical, exception = _count_physical_cores() - if cpu_count_user < cpu_count_mp: - # Respect user setting - cpu_count = max(cpu_count_user, 1) - elif cpu_count_physical == "not found": - # Fallback to default behavior - if exception is not None: - # warns only the first time - warnings.warn( - "Could not find the number of physical cores for the " - "following reason:\n" + str(exception) + "\n" - "Returning the number of logical cores instead. You can " - "silence this warning by setting LOKY_MAX_CPU_COUNT to " - "the number of cores you want to use.") - if sys.version_info >= (3, 5): - # TODO remove the version check when dropping py2 support - traceback.print_tb(exception.__traceback__) - - cpu_count = max(aggregate_cpu_count, 1) - else: - return cpu_count_physical + # Note: os.cpu_count() is allowed to return None in its docstring + os_cpu_count = os.cpu_count() or 1 + + cpu_count_user = _cpu_count_user(os_cpu_count) + aggregate_cpu_count = max(min(os_cpu_count, cpu_count_user), 1) + + if not only_physical_cores: + return aggregate_cpu_count + + if cpu_count_user < os_cpu_count: + # Respect user setting + return max(cpu_count_user, 1) + + cpu_count_physical, exception = _count_physical_cores() + if cpu_count_physical != "not found": + return cpu_count_physical + + # Fallback to default behavior + if exception is not None: + # warns only the first time + warnings.warn( + "Could not find the number of physical cores for the " + f"following reason:\n{exception}\n" + "Returning the number of logical cores instead. You can " + "silence this warning by setting LOKY_MAX_CPU_COUNT to " + "the number of cores you want to use.") + traceback.print_tb(exception.__traceback__) + + return aggregate_cpu_count + + +def _cpu_count_cgroup(os_cpu_count): + # Cgroup CPU bandwidth limit available in Linux since 2.6 kernel + cpu_max_fname = "/sys/fs/cgroup/cpu.max" + cfs_quota_fname = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us" + cfs_period_fname = "/sys/fs/cgroup/cpu/cpu.cfs_period_us" + if os.path.exists(cpu_max_fname): + # cgroup v2 + # https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html + with open(cpu_max_fname) as fh: + cpu_quota_us, cpu_period_us = fh.read().strip().split() + elif os.path.exists(cfs_quota_fname) and os.path.exists(cfs_period_fname): + # cgroup v1 + # https://www.kernel.org/doc/html/latest/scheduler/sched-bwc.html#management + with open(cfs_quota_fname) as fh: + cpu_quota_us = fh.read().strip() + with open(cfs_period_fname) as fh: + cpu_period_us = fh.read().strip() else: - cpu_count = max(aggregate_cpu_count, 1) + # No Cgroup CPU bandwidth limit (e.g. non-Linux platform) + cpu_quota_us = "max" + cpu_period_us = 100_000 # unused, for consistency with default values - return cpu_count + if cpu_quota_us == "max": + # No active Cgroup quota on a Cgroup-capable platform + return os_cpu_count + else: + cpu_quota_us = int(cpu_quota_us) + cpu_period_us = int(cpu_period_us) + if cpu_quota_us > 0 and cpu_period_us > 0: + return math.ceil(cpu_quota_us / cpu_period_us) + else: # pragma: no cover + # Setting a negative cpu_quota_us value is a valid way to disable + # cgroup CPU bandwith limits + return os_cpu_count -def _cpu_count_user(cpu_count_mp): +def _cpu_count_user(os_cpu_count): """Number of user defined available CPUs""" - import math - # Number of available CPUs given affinity settings - cpu_count_affinity = cpu_count_mp + cpu_count_affinity = os_cpu_count if hasattr(os, 'sched_getaffinity'): try: cpu_count_affinity = len(os.sched_getaffinity(0)) except NotImplementedError: pass - # CFS scheduler CPU bandwidth limit - # available in Linux since 2.6 kernel - cpu_count_cfs = cpu_count_mp - cfs_quota_fname = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us" - cfs_period_fname = "/sys/fs/cgroup/cpu/cpu.cfs_period_us" - if os.path.exists(cfs_quota_fname) and os.path.exists(cfs_period_fname): - with open(cfs_quota_fname, 'r') as fh: - cfs_quota_us = int(fh.read()) - with open(cfs_period_fname, 'r') as fh: - cfs_period_us = int(fh.read()) - - if cfs_quota_us > 0 and cfs_period_us > 0: - # Make sure this quantity is an int as math.ceil returns a - # float in python2.7. (See issue #165) - cpu_count_cfs = int(math.ceil(cfs_quota_us / cfs_period_us)) + cpu_count_cgroup = _cpu_count_cgroup(os_cpu_count) # User defined soft-limit passed as a loky specific environment variable. - cpu_count_loky = int(os.environ.get('LOKY_MAX_CPU_COUNT', cpu_count_mp)) + cpu_count_loky = int(os.environ.get('LOKY_MAX_CPU_COUNT', os_cpu_count)) - return min(cpu_count_affinity, cpu_count_cfs, cpu_count_loky) + return min(cpu_count_affinity, cpu_count_cgroup, cpu_count_loky) def _count_physical_cores(): @@ -219,39 +191,39 @@ def _count_physical_cores(): try: if sys.platform == "linux": cpu_info = subprocess.run( - "lscpu --parse=core".split(" "), capture_output=True) - cpu_info = cpu_info.stdout.decode("utf-8").splitlines() + "lscpu --parse=core".split(), capture_output=True, text=True) + cpu_info = cpu_info.stdout.splitlines() cpu_info = {line for line in cpu_info if not line.startswith("#")} cpu_count_physical = len(cpu_info) elif sys.platform == "win32": cpu_info = subprocess.run( - "wmic CPU Get NumberOfCores /Format:csv".split(" "), - capture_output=True) - cpu_info = cpu_info.stdout.decode('utf-8').splitlines() + "wmic CPU Get NumberOfCores /Format:csv".split(), + capture_output=True, text=True) + cpu_info = cpu_info.stdout.splitlines() cpu_info = [l.split(",")[1] for l in cpu_info if (l and l != "Node,NumberOfCores")] cpu_count_physical = sum(map(int, cpu_info)) elif sys.platform == "darwin": cpu_info = subprocess.run( - "sysctl -n hw.physicalcpu".split(" "), capture_output=True) - cpu_info = cpu_info.stdout.decode('utf-8') + "sysctl -n hw.physicalcpu".split(), + capture_output=True, text=True) + cpu_info = cpu_info.stdout cpu_count_physical = int(cpu_info) else: - raise NotImplementedError( - "unsupported platform: {}".format(sys.platform)) + raise NotImplementedError(f"unsupported platform: {sys.platform}") # if cpu_count_physical < 1, we did not find a valid value if cpu_count_physical < 1: raise ValueError( - "found {} physical cores < 1".format(cpu_count_physical)) - + f"found {cpu_count_physical} physical cores < 1") + except Exception as e: exception = e cpu_count_physical = "not found" # Put the result in cache physical_cores_cache = cpu_count_physical - + return cpu_count_physical, exception @@ -272,44 +244,10 @@ def SimpleQueue(self, reducers=None): from .queues import SimpleQueue return SimpleQueue(reducers=reducers, ctx=self.get_context()) - if sys.version_info[:2] < (3, 4): - """Compat for python2.7/3.3 for necessary methods in Context""" - def get_context(self): - return self - - def get_start_method(self): - return self._name - - def Pipe(self, duplex=True): - '''Returns two connection object connected by a pipe''' - return mp.Pipe(duplex) - - if sys.platform != "win32": - """Use the compat Manager for python2.7/3.3 on UNIX to avoid - relying on fork processes - """ - def Manager(self): - """Returns a manager object""" - from .managers import LokyManager - m = LokyManager() - m.start() - return m - else: - """Compat for context on Windows and python2.7/3.3. Using regular - multiprocessing objects as it does not rely on fork. - """ - from multiprocessing import synchronize - Semaphore = staticmethod(synchronize.Semaphore) - BoundedSemaphore = staticmethod(synchronize.BoundedSemaphore) - Lock = staticmethod(synchronize.Lock) - RLock = staticmethod(synchronize.RLock) - Condition = staticmethod(synchronize.Condition) - Event = staticmethod(synchronize.Event) - Manager = staticmethod(mp.Manager) - if sys.platform != "win32": """For Unix platform, use our custom implementation of synchronize - relying on ctypes to interface with pthread semaphores. + ensuring that we use the loky.backend.resource_tracker to clean-up + the semaphores in case of a worker crash. """ def Semaphore(self, value=1): """Returns a semaphore object""" @@ -352,7 +290,7 @@ class LokyInitMainContext(LokyContext): functions and variable used from main should be out of this block. This mimics the default behavior of multiprocessing under Windows and the - behavior of the ``spawn`` start method on a posix system for python3.4+. + behavior of the ``spawn`` start method on a posix system. For more details, see the end of the following section of python doc https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming """ @@ -360,8 +298,7 @@ class LokyInitMainContext(LokyContext): Process = LokyInitMainProcess -if sys.version_info > (3, 4): - """Register loky context so it works with multiprocessing.get_context""" - ctx_loky = LokyContext() - mp.context._concrete_contexts['loky'] = ctx_loky - mp.context._concrete_contexts['loky_init_main'] = LokyInitMainContext() +# Register loky context so it works with multiprocessing.get_context +ctx_loky = LokyContext() +mp.context._concrete_contexts['loky'] = ctx_loky +mp.context._concrete_contexts['loky_init_main'] = LokyInitMainContext() diff --git a/joblib/externals/loky/backend/fork_exec.py b/joblib/externals/loky/backend/fork_exec.py index cfb68dc4e..211d1835a 100644 --- a/joblib/externals/loky/backend/fork_exec.py +++ b/joblib/externals/loky/backend/fork_exec.py @@ -7,24 +7,20 @@ import os import sys -if sys.platform == "darwin" and sys.version_info < (3, 3): - FileNotFoundError = OSError - def close_fds(keep_fds): # pragma: no cover """Close all the file descriptors except those in keep_fds.""" # Make sure to keep stdout and stderr open for logging purpose - keep_fds = set(keep_fds).union([1, 2]) + keep_fds = {*keep_fds, 1, 2} # We try to retrieve all the open fds try: - open_fds = set(int(fd) for fd in os.listdir('/proc/self/fd')) + open_fds = {int(fd) for fd in os.listdir('/proc/self/fd')} except FileNotFoundError: import resource max_nfds = resource.getrlimit(resource.RLIMIT_NOFILE)[0] - open_fds = set(fd for fd in range(3, max_nfds)) - open_fds.add(0) + open_fds = {*range(max_nfds)} for i in open_fds - keep_fds: try: @@ -34,11 +30,9 @@ def close_fds(keep_fds): # pragma: no cover def fork_exec(cmd, keep_fds, env=None): - # copy the environment variables to set in the child process - env = {} if env is None else env - child_env = os.environ.copy() - child_env.update(env) + env = env or {} + child_env = {**os.environ, **env} pid = os.fork() if pid == 0: # pragma: no cover diff --git a/joblib/externals/loky/backend/managers.py b/joblib/externals/loky/backend/managers.py deleted file mode 100644 index 081f8976e..000000000 --- a/joblib/externals/loky/backend/managers.py +++ /dev/null @@ -1,51 +0,0 @@ -############################################################################### -# compat for UNIX 2.7 and 3.3 -# Manager with LokyContext server. -# This avoids having a Manager using fork and breaks the fd. -# -# author: Thomas Moreau and Olivier Grisel -# -# based on multiprocessing/managers.py (17/02/2017) -# * Overload the start method to use LokyContext and launch a loky subprocess -# - -import multiprocessing as mp -from multiprocessing.managers import SyncManager, State -from .process import LokyProcess as Process - - -class LokyManager(SyncManager): - def start(self, initializer=None, initargs=()): - '''Spawn a server process for this manager object''' - assert self._state.value == State.INITIAL - - if (initializer is not None - and not hasattr(initializer, '__call__')): - raise TypeError('initializer must be a callable') - - # pipe over which we will retrieve address of server - reader, writer = mp.Pipe(duplex=False) - - # spawn process which runs a server - self._process = Process( - target=type(self)._run_server, - args=(self._registry, self._address, bytes(self._authkey), - self._serializer, writer, initializer, initargs), - ) - ident = ':'.join(str(i) for i in self._process._identity) - self._process.name = type(self).__name__ + '-' + ident - self._process.start() - - # get address of server - writer.close() - self._address = reader.recv() - reader.close() - - # register a finalizer - self._state.value = State.STARTED - self.shutdown = mp.util.Finalize( - self, type(self)._finalize_manager, - args=(self._process, self._address, self._authkey, - self._state, self._Client), - exitpriority=0 - ) diff --git a/joblib/externals/loky/backend/popen_loky_posix.py b/joblib/externals/loky/backend/popen_loky_posix.py index 970dead0b..37a73172e 100644 --- a/joblib/externals/loky/backend/popen_loky_posix.py +++ b/joblib/externals/loky/backend/popen_loky_posix.py @@ -8,173 +8,141 @@ import signal import pickle from io import BytesIO - -from . import reduction, spawn -from .context import get_spawning_popen, set_spawning_popen from multiprocessing import util, process +from multiprocessing.connection import wait +from multiprocessing.context import set_spawning_popen -if sys.version_info[:2] < (3, 3): - ProcessLookupError = OSError - -if sys.platform != "win32": - from . import resource_tracker - - -__all__ = [] - -if sys.platform != "win32": - # - # Wrapper for an fd used while launching a process - # - - class _DupFd(object): - def __init__(self, fd): - self.fd = reduction._mk_inheritable(fd) - - def detach(self): - return self.fd - - # - # Start child process using subprocess.Popen - # - - __all__.append('Popen') - - class Popen(object): - method = 'loky' - DupFd = _DupFd - - def __init__(self, process_obj): - sys.stdout.flush() - sys.stderr.flush() - self.returncode = None - self._fds = [] - self._launch(process_obj) - - if sys.version_info < (3, 4): - @classmethod - def duplicate_for_child(cls, fd): - popen = get_spawning_popen() - popen._fds.append(fd) - return reduction._mk_inheritable(fd) - - else: - def duplicate_for_child(self, fd): - self._fds.append(fd) - return reduction._mk_inheritable(fd) - - def poll(self, flag=os.WNOHANG): - if self.returncode is None: - while True: - try: - pid, sts = os.waitpid(self.pid, flag) - except OSError: - # Child process not yet created. See #1731717 - # e.errno == errno.ECHILD == 10 - return None - else: - break - if pid == self.pid: - if os.WIFSIGNALED(sts): - self.returncode = -os.WTERMSIG(sts) - else: - assert os.WIFEXITED(sts) - self.returncode = os.WEXITSTATUS(sts) - return self.returncode - - def wait(self, timeout=None): - if sys.version_info < (3, 3): - import time - if timeout is None: - return self.poll(0) - deadline = time.time() + timeout - delay = 0.0005 - while 1: - res = self.poll() - if res is not None: - break - remaining = deadline - time.time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, 0.05) - time.sleep(delay) - return res - - if self.returncode is None: - if timeout is not None: - from multiprocessing.connection import wait - if not wait([self.sentinel], timeout): - return None - # This shouldn't block if wait() returned successfully. - return self.poll(os.WNOHANG if timeout == 0.0 else 0) - return self.returncode - - def terminate(self): - if self.returncode is None: - try: - os.kill(self.pid, signal.SIGTERM) - except ProcessLookupError: - pass - except OSError: - if self.wait(timeout=0.1) is None: - raise +from . import reduction, resource_tracker, spawn - def _launch(self, process_obj): - tracker_fd = resource_tracker._resource_tracker.getfd() +__all__ = ['Popen'] - fp = BytesIO() - set_spawning_popen(self) - try: - prep_data = spawn.get_preparation_data( - process_obj._name, - getattr(process_obj, "init_main_module", True)) - reduction.dump(prep_data, fp) - reduction.dump(process_obj, fp) - finally: - set_spawning_popen(None) +# +# Wrapper for an fd used while launching a process +# - try: - parent_r, child_w = os.pipe() - child_r, parent_w = os.pipe() - # for fd in self._fds: - # _mk_inheritable(fd) - - cmd_python = [sys.executable] - cmd_python += ['-m', self.__module__] - cmd_python += ['--process-name', str(process_obj.name)] - cmd_python += ['--pipe', - str(reduction._mk_inheritable(child_r))] - reduction._mk_inheritable(child_w) - reduction._mk_inheritable(tracker_fd) - self._fds.extend([child_r, child_w, tracker_fd]) - if sys.version_info >= (3, 8) and os.name == 'posix': - mp_tracker_fd = prep_data['mp_tracker_args']['fd'] - self.duplicate_for_child(mp_tracker_fd) - - from .fork_exec import fork_exec - pid = fork_exec(cmd_python, self._fds, env=process_obj.env) - util.debug("launched python with pid {} and cmd:\n{}" - .format(pid, cmd_python)) - self.sentinel = parent_r - - method = 'getbuffer' - if not hasattr(fp, method): - method = 'getvalue' - with os.fdopen(parent_w, 'wb') as f: - f.write(getattr(fp, method)()) - self.pid = pid - finally: - if parent_r is not None: - util.Finalize(self, os.close, (parent_r,)) - for fd in (child_r, child_w): - if fd is not None: - os.close(fd) +class _DupFd: + def __init__(self, fd): + self.fd = reduction._mk_inheritable(fd) - @staticmethod - def thread_is_spawning(): - return True + def detach(self): + return self.fd + + +# +# Start child process using subprocess.Popen +# + +class Popen: + method = 'loky' + DupFd = _DupFd + + def __init__(self, process_obj): + sys.stdout.flush() + sys.stderr.flush() + self.returncode = None + self._fds = [] + self._launch(process_obj) + + def duplicate_for_child(self, fd): + self._fds.append(fd) + return reduction._mk_inheritable(fd) + + def poll(self, flag=os.WNOHANG): + if self.returncode is None: + while True: + try: + pid, sts = os.waitpid(self.pid, flag) + except OSError: + # Child process not yet created. See #1731717 + # e.errno == errno.ECHILD == 10 + return None + else: + break + if pid == self.pid: + if os.WIFSIGNALED(sts): + self.returncode = -os.WTERMSIG(sts) + else: + assert os.WIFEXITED(sts) + self.returncode = os.WEXITSTATUS(sts) + return self.returncode + + def wait(self, timeout=None): + if self.returncode is None: + if timeout is not None: + if not wait([self.sentinel], timeout): + return None + # This shouldn't block if wait() returned successfully. + return self.poll(os.WNOHANG if timeout == 0.0 else 0) + return self.returncode + + def terminate(self): + if self.returncode is None: + try: + os.kill(self.pid, signal.SIGTERM) + except ProcessLookupError: + pass + except OSError: + if self.wait(timeout=0.1) is None: + raise + + def _launch(self, process_obj): + + tracker_fd = resource_tracker._resource_tracker.getfd() + + fp = BytesIO() + set_spawning_popen(self) + try: + prep_data = spawn.get_preparation_data( + process_obj._name, + getattr(process_obj, "init_main_module", True)) + reduction.dump(prep_data, fp) + reduction.dump(process_obj, fp) + + finally: + set_spawning_popen(None) + + try: + parent_r, child_w = os.pipe() + child_r, parent_w = os.pipe() + # for fd in self._fds: + # _mk_inheritable(fd) + + cmd_python = [sys.executable] + cmd_python += ['-m', self.__module__] + cmd_python += ['--process-name', str(process_obj.name)] + cmd_python += ['--pipe', str(reduction._mk_inheritable(child_r))] + reduction._mk_inheritable(child_w) + reduction._mk_inheritable(tracker_fd) + self._fds += [child_r, child_w, tracker_fd] + if sys.version_info >= (3, 8) and os.name == 'posix': + mp_tracker_fd = prep_data['mp_tracker_args']['fd'] + self.duplicate_for_child(mp_tracker_fd) + + from .fork_exec import fork_exec + pid = fork_exec(cmd_python, self._fds, env=process_obj.env) + util.debug( + f"launched python with pid {pid} and cmd:\n{cmd_python}" + ) + self.sentinel = parent_r + + method = 'getbuffer' + if not hasattr(fp, method): + method = 'getvalue' + with os.fdopen(parent_w, 'wb') as f: + f.write(getattr(fp, method)()) + self.pid = pid + finally: + if parent_r is not None: + util.Finalize(self, os.close, (parent_r,)) + for fd in (child_r, child_w): + if fd is not None: + os.close(fd) + + @staticmethod + def thread_is_spawning(): + return True if __name__ == '__main__': @@ -187,8 +155,7 @@ def thread_is_spawning(): args = parser.parse_args() - info = dict() - + info = {} exitcode = 1 try: with os.fdopen(args.pipe, 'rb') as from_parent: @@ -203,7 +170,7 @@ def thread_is_spawning(): exitcode = process_obj._bootstrap() except Exception: print('\n\n' + '-' * 80) - print('{} failed with traceback: '.format(args.process_name)) + print(f'{args.process_name} failed with traceback: ') print('-' * 80) import traceback print(traceback.format_exc()) diff --git a/joblib/externals/loky/backend/popen_loky_win32.py b/joblib/externals/loky/backend/popen_loky_win32.py index e8c4fd070..e2702a724 100644 --- a/joblib/externals/loky/backend/popen_loky_win32.py +++ b/joblib/externals/loky/backend/popen_loky_win32.py @@ -1,24 +1,15 @@ import os import sys +import msvcrt +import _winapi from pickle import load from multiprocessing import process, util +from multiprocessing.context import get_spawning_popen, set_spawning_popen +from multiprocessing.popen_spawn_win32 import Popen as _Popen +from multiprocessing.reduction import duplicate -from . import spawn -from . import reduction -from .context import get_spawning_popen, set_spawning_popen +from . import reduction, spawn -if sys.platform == "win32": - # Avoid import error by code introspection tools such as test runners - # trying to import this module while running on non-Windows systems. - import msvcrt - from .compat_win32 import _winapi - from .compat_win32 import Popen as _Popen - from .reduction import duplicate -else: - _Popen = object - -if sys.version_info[:2] < (3, 3): - from os import fdopen as open __all__ = ['Popen'] @@ -26,10 +17,6 @@ # # -TERMINATE = 0x10000 -WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) -WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") - def _path_eq(p1, p2): return p1 == p2 or os.path.normcase(p1) == os.path.normcase(p2) @@ -61,13 +48,12 @@ def __init__(self, process_obj): os.close(rfd) cmd = get_command_line(parent_pid=os.getpid(), pipe_handle=rhandle) - cmd = ' '.join('"%s"' % x for x in cmd) + cmd = ' '.join(f'"{x}"' for x in cmd) python_exe = spawn.get_executable() # copy the environment variables to set in the child process - child_env = os.environ.copy() - child_env.update(process_obj.env) + child_env = {**os.environ, **process_obj.env} # bpo-35797: When running in a venv, we bypass the redirect # executor and launch our base Python. @@ -87,7 +73,7 @@ def __init__(self, process_obj): # the cleaner multiprocessing.reduction.steal_handle should # be used instead. inherit = True - hp, ht, pid, tid = _winapi.CreateProcess( + hp, ht, pid, _ = _winapi.CreateProcess( python_exe, cmd, None, None, inherit, 0, child_env, None, None) @@ -105,22 +91,19 @@ def __init__(self, process_obj): # send information to child set_spawning_popen(self) - if sys.version_info[:2] < (3, 4): - Popen._tls.process_handle = int(hp) try: reduction.dump(prep_data, to_child) reduction.dump(process_obj, to_child) finally: set_spawning_popen(None) - if sys.version_info[:2] < (3, 4): - del Popen._tls.process_handle except IOError as exc: # IOError 22 happens when the launched subprocess terminated before # wfd.close is called. Thus we can safely ignore it. if exc.errno != 22: raise - util.debug("While starting {}, ignored a IOError 22" - .format(process_obj._name)) + util.debug( + f"While starting {process_obj._name}, ignored a IOError 22" + ) def duplicate_for_child(self, handle): assert self is get_spawning_popen() @@ -132,12 +115,12 @@ def get_command_line(pipe_handle, **kwds): Returns prefix of command line used for spawning a child process ''' if getattr(sys, 'frozen', False): - return ([sys.executable, '--multiprocessing-fork', pipe_handle]) + return [sys.executable, '--multiprocessing-fork', pipe_handle] else: prog = 'from joblib.externals.loky.backend.popen_loky_win32 import main; main()' opts = util._args_from_interpreter_flags() - return [spawn.get_executable()] + opts + [ - '-c', prog, '--multiprocessing-fork', pipe_handle] + return [spawn.get_executable(), *opts, + '-c', prog, '--multiprocessing-fork', pipe_handle] def is_forking(argv): diff --git a/joblib/externals/loky/backend/process.py b/joblib/externals/loky/backend/process.py index 30a20c061..eafde66d0 100644 --- a/joblib/externals/loky/backend/process.py +++ b/joblib/externals/loky/backend/process.py @@ -4,11 +4,10 @@ # authors: Thomas Moreau and Olivier Grisel # # based on multiprocessing/process.py (17/02/2017) -# * Add some compatibility function for python2.7 and 3.3 # -import os import sys -from .compat import BaseProcess +from multiprocessing.context import assert_spawning +from multiprocessing.process import BaseProcess class LokyProcess(BaseProcess): @@ -17,15 +16,9 @@ class LokyProcess(BaseProcess): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None, init_main_module=False, env=None): - if sys.version_info < (3, 3): - super(LokyProcess, self).__init__( - group=group, target=target, name=name, args=args, - kwargs=kwargs) - self.daemon = daemon - else: - super(LokyProcess, self).__init__( - group=group, target=target, name=name, args=args, - kwargs=kwargs, daemon=daemon) + super().__init__( + group=group, target=target, name=name, args=args, + kwargs=kwargs, daemon=daemon) self.env = {} if env is None else env self.authkey = self.authkey self.init_main_module = init_main_module @@ -38,55 +31,13 @@ def _Popen(process_obj): from .popen_loky_posix import Popen return Popen(process_obj) - if sys.version_info < (3, 3): - def start(self): - ''' - Start child process - ''' - from multiprocessing.process import _current_process, _cleanup - assert self._popen is None, 'cannot start a process twice' - assert self._parent_pid == os.getpid(), \ - 'can only start a process object created by current process' - _cleanup() - self._popen = self._Popen(self) - self._sentinel = self._popen.sentinel - _current_process._children.add(self) - - @property - def sentinel(self): - ''' - Return a file descriptor (Unix) or handle (Windows) suitable for - waiting for process termination. - ''' - try: - return self._sentinel - except AttributeError: - raise ValueError("process not started") - - if sys.version_info < (3, 4): - @property - def authkey(self): - return self._authkey - - @authkey.setter - def authkey(self, authkey): - ''' - Set authorization key of process - ''' - self._authkey = AuthenticationKey(authkey) - - def _bootstrap(self): - from .context import set_start_method - set_start_method(self._start_method) - super(LokyProcess, self)._bootstrap() - class LokyInitMainProcess(LokyProcess): _start_method = 'loky_init_main' def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None): - super(LokyInitMainProcess, self).__init__( + super().__init__( group=group, target=target, name=name, args=args, kwargs=kwargs, daemon=daemon, init_main_module=True) @@ -97,7 +48,6 @@ def __init__(self, group=None, target=None, name=None, args=(), class AuthenticationKey(bytes): def __reduce__(self): - from .context import assert_spawning try: assert_spawning(self) except RuntimeError: diff --git a/joblib/externals/loky/backend/queues.py b/joblib/externals/loky/backend/queues.py index 567146429..4113b89fb 100644 --- a/joblib/externals/loky/backend/queues.py +++ b/joblib/externals/loky/backend/queues.py @@ -4,8 +4,6 @@ # authors: Thomas Moreau, Olivier Grisel # # based on multiprocessing/queues.py (16/02/2017) -# * Add some compatibility function for python2.7 and 3.3 and makes sure -# it uses the right synchronization primitive. # * Add some custom reducers for the Queues/SimpleQueue to tweak the # pickling process. (overload Queue._feed/SimpleQueue.put) # @@ -14,16 +12,16 @@ import errno import weakref import threading - from multiprocessing import util -from multiprocessing import connection -from multiprocessing.synchronize import SEM_VALUE_MAX -from multiprocessing.queues import Full -from multiprocessing.queues import _sentinel, Queue as mp_Queue -from multiprocessing.queues import SimpleQueue as mp_SimpleQueue +from multiprocessing.queues import ( + Full, + Queue as mp_Queue, + SimpleQueue as mp_SimpleQueue, + _sentinel, +) +from multiprocessing.context import assert_spawning -from .reduction import loads, dumps -from .context import assert_spawning, get_context +from .reduction import dumps __all__ = ['Queue', 'SimpleQueue', 'Full'] @@ -32,33 +30,7 @@ class Queue(mp_Queue): def __init__(self, maxsize=0, reducers=None, ctx=None): - - if sys.version_info[:2] >= (3, 4): - super().__init__(maxsize=maxsize, ctx=ctx) - else: - if maxsize <= 0: - # Can raise ImportError (see issues #3770 and #23400) - maxsize = SEM_VALUE_MAX - if ctx is None: - ctx = get_context() - self._maxsize = maxsize - self._reader, self._writer = connection.Pipe(duplex=False) - self._rlock = ctx.Lock() - self._opid = os.getpid() - if sys.platform == 'win32': - self._wlock = None - else: - self._wlock = ctx.Lock() - self._sem = ctx.BoundedSemaphore(maxsize) - - # For use by concurrent.futures - self._ignore_epipe = False - - self._after_fork() - - if sys.platform != 'win32': - util.register_after_fork(self, Queue._after_fork) - + super().__init__(maxsize=maxsize, ctx=ctx) self._reducers = reducers # Use custom queue set/get state to be able to reduce the custom reducers @@ -133,7 +105,7 @@ def _feed(buffer, notempty, send_bytes, writelock, close, reducers, else: wacquire = None - while 1: + while True: try: nacquire() try: @@ -142,7 +114,7 @@ def _feed(buffer, notempty, send_bytes, writelock, close, reducers, finally: nrelease() try: - while 1: + while True: obj = bpopleft() if obj is sentinel: util.debug('feeder thread got sentinel -- exiting') @@ -171,7 +143,7 @@ def _feed(buffer, notempty, send_bytes, writelock, close, reducers, # We ignore errors which happen after the process has # started to cleanup. if util.is_exiting(): - util.info('error in queue thread: %s', e) + util.info(f'error in queue thread: {e}') return else: queue_sem.release() @@ -185,31 +157,13 @@ def _on_queue_feeder_error(self, e, obj): import traceback traceback.print_exc() - if sys.version_info[:2] < (3, 4): - # Compat for python2.7/3.3 that use _send instead of _send_bytes - def _after_fork(self): - super(Queue, self)._after_fork() - self._send_bytes = self._writer.send_bytes - class SimpleQueue(mp_SimpleQueue): def __init__(self, reducers=None, ctx=None): - if sys.version_info[:2] >= (3, 4): - super().__init__(ctx=ctx) - else: - # Use the context to create the sync objects for python2.7/3.3 - if ctx is None: - ctx = get_context() - self._reader, self._writer = connection.Pipe(duplex=False) - self._rlock = ctx.Lock() - self._poll = self._reader.poll - if sys.platform == 'win32': - self._wlock = None - else: - self._wlock = ctx.Lock() - - # Add possibility to use custom reducers + super().__init__(ctx=ctx) + + # Add possiblity to use custom reducers self._reducers = reducers def close(self): @@ -226,15 +180,6 @@ def __setstate__(self, state): (self._reader, self._writer, self._reducers, self._rlock, self._wlock) = state - if sys.version_info[:2] < (3, 4): - # For python2.7/3.3, overload get to avoid creating deadlocks with - # unpickling errors. - def get(self): - with self._rlock: - res = self._reader.recv_bytes() - # unserialize the data after having released the lock - return loads(res) - # Overload put to use our customizable reducer def put(self, obj): # serialize the data before acquiring the lock diff --git a/joblib/externals/loky/backend/reduction.py b/joblib/externals/loky/backend/reduction.py index 4a2407c53..f1ee394bb 100644 --- a/joblib/externals/loky/backend/reduction.py +++ b/joblib/externals/loky/backend/reduction.py @@ -8,71 +8,28 @@ # * Add CustomizableLokyPickler to allow customizing pickling process # on the fly. # +import copyreg import io -import os -import sys import functools -from multiprocessing import util import types -try: - # Python 2 compat - from cPickle import loads as pickle_loads -except ImportError: - from pickle import loads as pickle_loads - import copyreg - -from pickle import HIGHEST_PROTOCOL - -if sys.platform == "win32": - if sys.version_info[:2] > (3, 3): - from multiprocessing.reduction import duplicate - else: - from multiprocessing.forking import duplicate +import sys +import os +from multiprocessing import util +from pickle import loads, HIGHEST_PROTOCOL ############################################################################### # Enable custom pickling in Loky. -# To allow instance customization of the pickling process, we use 2 classes. -# _ReducerRegistry gives module level customization and CustomizablePickler -# permits to use instance base custom reducers. Only CustomizablePickler -# should be used. - -class _ReducerRegistry(object): - """Registry for custom reducers. - HIGHEST_PROTOCOL is selected by default as this pickler is used - to pickle ephemeral datastructures for interprocess communication - hence no backward compatibility is required. +_dispatch_table = {} - """ - - # We override the pure Python pickler as its the only way to be able to - # customize the dispatch table without side effects in Python 2.6 - # to 3.2. For Python 3.3+ leverage the new dispatch_table - # feature from http://bugs.python.org/issue14166 that makes it possible - # to use the C implementation of the Pickler which is faster. - - dispatch_table = {} - - @classmethod - def register(cls, type, reduce_func): - """Attach a reducer function to a given type in the dispatch table.""" - if sys.version_info < (3,): - # Python 2 pickler dispatching is not explicitly customizable. - # Let us use a closure to workaround this limitation. - def dispatcher(cls, obj): - reduced = reduce_func(obj) - cls.save_reduce(obj=obj, *reduced) - cls.dispatch_table[type] = dispatcher - else: - cls.dispatch_table[type] = reduce_func +def register(type_, reduce_function): + _dispatch_table[type_] = reduce_function ############################################################################### # Registers extra pickling routines to improve picklization for loky -register = _ReducerRegistry.register - # make methods picklable def _reduce_method(m): @@ -157,15 +114,16 @@ def set_loky_pickler(loky_pickler=None): loky_pickler_cls = module_pickle.Pickler except (ImportError, AttributeError) as e: extra_info = ("\nThis error occurred while setting loky_pickler to" - " '{}', as required by the env variable LOKY_PICKLER" - " or the function set_loky_pickler." - .format(loky_pickler)) + f" '{loky_pickler}', as required by the env variable " + "LOKY_PICKLER or the function set_loky_pickler.") e.args = (e.args[0] + extra_info,) + e.args[1:] e.msg = e.args[0] raise e - util.debug("Using '{}' for serialization." - .format(loky_pickler if loky_pickler else "cloudpickle")) + util.debug( + f"Using '{loky_pickler if loky_pickler else 'cloudpickle'}' for " + "serialization." + ) class CustomizablePickler(loky_pickler_cls): _loky_pickler_cls = loky_pickler_cls @@ -195,43 +153,32 @@ def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL): loky_pickler_cls.__init__(self, writer, protocol=protocol) if reducers is None: reducers = {} - if sys.version_info < (3,): - self.dispatch = loky_pickler_cls.dispatch.copy() - self.dispatch.update(_ReducerRegistry.dispatch_table) + + if hasattr(self, "dispatch_table"): + # Force a copy that we will update without mutating the + # any class level defined dispatch_table. + loky_dt = dict(self.dispatch_table) else: - if hasattr(self, "dispatch_table"): - # Force a copy that we will update without mutating the - # any class level defined dispatch_table. - loky_dt = dict(self.dispatch_table) - else: - # Use standard reducers as bases - loky_dt = copyreg.dispatch_table.copy() - - # Register loky specific reducers - loky_dt.update(_ReducerRegistry.dispatch_table) - - # Set the new dispatch table, taking care of the fact that we - # need to use the member_descriptor when we inherit from a - # subclass of the C implementation of the Pickler base class - # with an class level dispatch_table attribute. - self._set_dispatch_table(loky_dt) - - # Register custom reducers + # Use standard reducers as bases + loky_dt = copyreg.dispatch_table.copy() + + # Register loky specific reducers + loky_dt.update(_dispatch_table) + + # Set the new dispatch table, taking care of the fact that we + # need to use the member_descriptor when we inherit from a + # subclass of the C implementation of the Pickler base class + # with an class level dispatch_table attribute. + self._set_dispatch_table(loky_dt) + + # Register the reducers for type, reduce_func in reducers.items(): self.register(type, reduce_func) def register(self, type, reduce_func): """Attach a reducer function to a given type in the dispatch table. """ - if sys.version_info < (3,): - # Python 2 pickler dispatching is not explicitly customizable. - # Let us use a closure to workaround this limitation. - def dispatcher(self, obj): - reduced = reduce_func(obj) - self.save_reduce(obj=obj, *reduced) - self.dispatch[type] = dispatcher - else: - self.dispatch_table[type] = reduce_func + self.dispatch_table[type] = reduce_func _LokyPickler = CustomizablePickler _loky_pickler_name = loky_pickler @@ -251,13 +198,6 @@ def get_loky_pickler(): set_loky_pickler() -def loads(buf): - # Compat for python2.7 version - if sys.version_info < (3, 3) and isinstance(buf, io.BytesIO): - buf = buf.getvalue() - return pickle_loads(buf) - - def dump(obj, file, reducers=None, protocol=None): '''Replacement for pickle.dump() using _LokyPickler.''' global _LokyPickler @@ -269,12 +209,11 @@ def dumps(obj, reducers=None, protocol=None): buf = io.BytesIO() dump(obj, buf, reducers=reducers, protocol=protocol) - if sys.version_info < (3, 3): - return buf.getvalue() return buf.getbuffer() __all__ = ["dump", "dumps", "loads", "register", "set_loky_pickler"] if sys.platform == "win32": + from multiprocessing.reduction import duplicate __all__ += ["duplicate"] diff --git a/joblib/externals/loky/backend/resource_tracker.py b/joblib/externals/loky/backend/resource_tracker.py index 151df4046..d84504e14 100644 --- a/joblib/externals/loky/backend/resource_tracker.py +++ b/joblib/externals/loky/backend/resource_tracker.py @@ -6,9 +6,13 @@ # # adapted from multiprocessing/semaphore_tracker.py (17/02/2017) # * include custom spawnv_passfds to start the process -# * use custom unlink from our own SemLock implementation # * add some VERBOSE logging # +# TODO: multiprocessing.resource_tracker was contributed to Python 3.8 so +# once loky drops support for Python 3.7 it might be possible to stop +# maintaining this loky-specific fork. As a consequence, it might also be +# possible to stop maintaining the loky.backend.synchronize fork of +# multiprocessing.synchronize. # # On Unix we run a server process which keeps track of unlinked @@ -36,7 +40,7 @@ # Note that this behavior differs from CPython's resource_tracker, which only # implements list of shared resources, and not a proper refcounting scheme. # Also, CPython's resource tracker will only attempt to cleanup those shared -# resources once all procsses connected to the resource tracker have exited. +# resources once all procsses connected to the resouce tracker have exited. import os @@ -45,23 +49,16 @@ import signal import warnings import threading +from _multiprocessing import sem_unlink +from multiprocessing import util from . import spawn -from multiprocessing import util if sys.platform == "win32": - from .compat_win32 import _winapi - from .reduction import duplicate + import _winapi import msvcrt + from multiprocessing.reduction import duplicate -try: - from _multiprocessing import sem_unlink -except ImportError: - from .semlock import sem_unlink - -if sys.version_info < (3,): - BrokenPipeError = OSError - from os import fdopen as open __all__ = ['ensure_running', 'register', 'unregister'] @@ -80,7 +77,7 @@ VERBOSE = False -class ResourceTracker(object): +class ResourceTracker: def __init__(self): self._lock = threading.Lock() @@ -118,7 +115,7 @@ def ensure_running(self): self._pid = None warnings.warn('resource_tracker: process died unexpectedly, ' - 'relaunching. Some folders/semaphores might ' + 'relaunching. Some folders/sempahores might ' 'leak.') fds_to_pass = [] @@ -133,22 +130,13 @@ def ensure_running(self): os.close(r) r = _r - cmd = 'from {} import main; main({}, {})'.format( - main.__module__, r, VERBOSE) + cmd = f'from {main.__module__} import main; main({r}, {VERBOSE})' try: fds_to_pass.append(r) # process will out live us, so no need to wait on pid exe = spawn.get_executable() - args = [exe] + util._args_from_interpreter_flags() - # In python 3.3, there is a bug which put `-RRRRR..` instead of - # `-R` in args. Replace it to get the correct flags. - # See https://github.com/python/cpython/blob/3.3/Lib/subprocess.py#L488 - if sys.version_info[:2] <= (3, 3): - import re - for i in range(1, len(args)): - args[i] = re.sub("-R+", "-R", args[i]) - args += ['-c', cmd] - util.debug("launching resource tracker: {}".format(args)) + args = [exe, *util._args_from_interpreter_flags(), '-c', cmd] + util.debug(f"launching resource tracker: {args}") # bpo-33613: Register a signal mask that will block the # signals. This signal mask will be inherited by the child # that is going to be spawned and will protect the child from a @@ -201,11 +189,11 @@ def maybe_unlink(self, name, rtype): self._send("MAYBE_UNLINK", name, rtype) def _send(self, cmd, name, rtype): - msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') if len(name) > 512: # posix guarantees that writes to a pipe of less than PIPE_BUF # bytes are atomic, and that PIPE_BUF >= 512 raise ValueError('name too long') + msg = f'{cmd}:{name}:{rtype}\n'.encode('ascii') nbytes = os.write(self._fd, msg) assert nbytes == len(msg) @@ -239,7 +227,7 @@ def main(fd, verbose=0): if verbose: util.debug("Main resource tracker is running") - registry = {rtype: dict() for rtype in _CLEANUP_FUNCS.keys()} + registry = {rtype: {} for rtype in _CLEANUP_FUNCS.keys()} try: # keep track of registered/unregistered resources if sys.platform == "win32": @@ -261,10 +249,11 @@ def main(fd, verbose=0): if rtype not in _CLEANUP_FUNCS: raise ValueError( - 'Cannot register {} for automatic cleanup: ' - 'unknown resource type ({}). Resource type should ' - 'be one of the following: {}'.format( - name, rtype, list(_CLEANUP_FUNCS.keys()))) + f'Cannot register {name} for automatic cleanup: ' + f'unknown resource type ({rtype}). Resource type ' + 'should be one of the following: ' + f'{list(_CLEANUP_FUNCS.keys())}' + ) if cmd == 'REGISTER': if name not in registry[rtype]: @@ -274,37 +263,40 @@ def main(fd, verbose=0): if verbose: util.debug( - "[ResourceTracker] incremented refcount of {} " - "{} (current {})".format( - rtype, name, registry[rtype][name])) + "[ResourceTracker] incremented refcount of " + f"{rtype} {name} " + f"(current {registry[rtype][name]})" + ) elif cmd == 'UNREGISTER': del registry[rtype][name] if verbose: util.debug( - "[ResourceTracker] unregister {} {}: " - "registry({})".format(name, rtype, len(registry))) + f"[ResourceTracker] unregister {name} {rtype}: " + f"registry({len(registry)})" + ) elif cmd == 'MAYBE_UNLINK': registry[rtype][name] -= 1 if verbose: util.debug( - "[ResourceTracker] decremented refcount of {} " - "{} (current {})".format( - rtype, name, registry[rtype][name])) + "[ResourceTracker] decremented refcount of " + f"{rtype} {name} " + f"(current {registry[rtype][name]})" + ) if registry[rtype][name] == 0: del registry[rtype][name] try: if verbose: util.debug( - "[ResourceTracker] unlink {}" - .format(name)) + f"[ResourceTracker] unlink {name}" + ) _CLEANUP_FUNCS[rtype](name) except Exception as e: warnings.warn( - 'resource_tracker: %s: %r' % (name, e)) + f'resource_tracker: {name}: {e!r}') else: - raise RuntimeError('unrecognized command %r' % cmd) + raise RuntimeError(f'unrecognized command {cmd!r}') except BaseException: try: sys.excepthook(*sys.exc_info()) @@ -315,9 +307,11 @@ def main(fd, verbose=0): def _unlink_resources(rtype_registry, rtype): if rtype_registry: try: - warnings.warn('resource_tracker: There appear to be %d ' - 'leaked %s objects to clean up at shutdown' % - (len(rtype_registry), rtype)) + warnings.warn( + 'resource_tracker: There appear to be ' + f'{len(rtype_registry)} leaked {rtype} objects to ' + 'clean up at shutdown' + ) except Exception: pass for name in rtype_registry: @@ -327,10 +321,9 @@ def _unlink_resources(rtype_registry, rtype): try: _CLEANUP_FUNCS[rtype](name) if verbose: - util.debug("[ResourceTracker] unlink {}" - .format(name)) + util.debug(f"[ResourceTracker] unlink {name}") except Exception as e: - warnings.warn('resource_tracker: %s: %r' % (name, e)) + warnings.warn(f'resource_tracker: {name}: {e!r}') for rtype, rtype_registry in registry.items(): if rtype == "folder": @@ -361,18 +354,16 @@ def spawnv_passfds(path, args, passfds): errpipe_read, errpipe_write = os.pipe() try: from .reduction import _mk_inheritable - _pass = [] - for fd in passfds: - _pass += [_mk_inheritable(fd)] from .fork_exec import fork_exec + _pass = [_mk_inheritable(fd) for fd in passfds] return fork_exec(args, _pass) finally: os.close(errpipe_read) os.close(errpipe_write) else: - cmd = ' '.join('"%s"' % x for x in args) + cmd = ' '.join(f'"{x}"' for x in args) try: - hp, ht, pid, tid = _winapi.CreateProcess( + _, ht, pid, _ = _winapi.CreateProcess( path, cmd, None, None, True, 0, None, None, None) _winapi.CloseHandle(ht) except BaseException: diff --git a/joblib/externals/loky/backend/semlock.py b/joblib/externals/loky/backend/semlock.py deleted file mode 100644 index 2d35f6a27..000000000 --- a/joblib/externals/loky/backend/semlock.py +++ /dev/null @@ -1,274 +0,0 @@ -############################################################################### -# Ctypes implementation for posix semaphore. -# -# author: Thomas Moreau and Olivier Grisel -# -# adapted from cpython/Modules/_multiprocessing/semaphore.c (17/02/2017) -# * use ctypes to access pthread semaphores and provide a full python -# semaphore management. -# * For OSX, as no sem_getvalue is not implemented, Semaphore with value > 1 -# are not guaranteed to work. -# * Only work with LokyProcess on posix -# -import os -import sys -import time -import errno -import ctypes -import tempfile -import threading -from ctypes.util import find_library - -# As we need to use ctypes return types for semlock object, failure value -# needs to be cast to proper python value. Unix failure convention is to -# return 0, whereas OSX returns -1 -SEM_FAILURE = ctypes.c_void_p(0).value -if sys.platform == 'darwin': - SEM_FAILURE = ctypes.c_void_p(-1).value - -# Semaphore types -RECURSIVE_MUTEX = 0 -SEMAPHORE = 1 - -# Semaphore constants -SEM_OFLAG = ctypes.c_int(os.O_CREAT | os.O_EXCL) -SEM_PERM = ctypes.c_int(384) - - -class timespec(ctypes.Structure): - _fields_ = [("tv_sec", ctypes.c_long), ("tv_nsec", ctypes.c_long)] - - -if sys.platform != 'win32': - pthread = ctypes.CDLL(find_library('pthread'), use_errno=True) - pthread.sem_open.restype = ctypes.c_void_p - pthread.sem_close.argtypes = [ctypes.c_void_p] - pthread.sem_wait.argtypes = [ctypes.c_void_p] - pthread.sem_trywait.argtypes = [ctypes.c_void_p] - pthread.sem_post.argtypes = [ctypes.c_void_p] - pthread.sem_getvalue.argtypes = [ctypes.c_void_p, ctypes.c_void_p] - pthread.sem_unlink.argtypes = [ctypes.c_char_p] - if sys.platform != "darwin": - pthread.sem_timedwait.argtypes = [ctypes.c_void_p, - ctypes.POINTER(timespec)] - -try: - from threading import get_ident -except ImportError: - def get_ident(): - return threading.current_thread().ident - - -if sys.version_info[:2] < (3, 3): - class FileExistsError(OSError): - pass - - class FileNotFoundError(OSError): - pass - - -def sem_unlink(name): - if pthread.sem_unlink(name.encode('ascii')) < 0: - raiseFromErrno() - - -def _sem_open(name, value=None): - """ Construct or retrieve a semaphore with the given name - - If value is None, try to retrieve an existing named semaphore. - Else create a new semaphore with the given value - """ - if value is None: - handle = pthread.sem_open(ctypes.c_char_p(name), 0) - else: - handle = pthread.sem_open(ctypes.c_char_p(name), SEM_OFLAG, SEM_PERM, - ctypes.c_int(value)) - - if handle == SEM_FAILURE: - e = ctypes.get_errno() - if e == errno.EEXIST: - raise FileExistsError("a semaphore named %s already exists" % name) - elif e == errno.ENOENT: - raise FileNotFoundError('cannot find semaphore named %s' % name) - elif e == errno.ENOSYS: - raise NotImplementedError('No semaphore implementation on this ' - 'system') - else: - raiseFromErrno() - - return handle - - -def _sem_timedwait(handle, timeout): - t_start = time.time() - if sys.platform != "darwin": - sec = int(timeout) - tv_sec = int(t_start) - nsec = int(1e9 * (timeout - sec) + .5) - tv_nsec = int(1e9 * (t_start - tv_sec) + .5) - deadline = timespec(sec+tv_sec, nsec+tv_nsec) - deadline.tv_sec += int(deadline.tv_nsec / 1000000000) - deadline.tv_nsec %= 1000000000 - return pthread.sem_timedwait(handle, ctypes.pointer(deadline)) - - # PERFORMANCE WARNING - # No sem_timedwait on OSX so we implement our own method. This method can - # degrade performances has the wait can have a latency up to 20 msecs - deadline = t_start + timeout - delay = 0 - now = time.time() - while True: - # Poll the sem file - res = pthread.sem_trywait(handle) - if res == 0: - return 0 - else: - e = ctypes.get_errno() - if e != errno.EAGAIN: - raiseFromErrno() - - # check for timeout - now = time.time() - if now > deadline: - ctypes.set_errno(errno.ETIMEDOUT) - return -1 - - # calculate how much time left and check the delay is not too long - # -- maximum is 20 msecs - difference = (deadline - now) - delay = min(delay, 20e-3, difference) - - # Sleep and increase delay - time.sleep(delay) - delay += 1e-3 - - -class SemLock(object): - """ctypes wrapper to the unix semaphore""" - - _rand = tempfile._RandomNameSequence() - - def __init__(self, kind, value, maxvalue, name=None, unlink_now=False): - self.count = 0 - self.ident = 0 - self.kind = kind - self.maxvalue = maxvalue - self.name = name - self.handle = _sem_open(self.name.encode('ascii'), value) - - def __del__(self): - try: - res = pthread.sem_close(self.handle) - assert res == 0, "Issue while closing semaphores" - except AttributeError: - pass - - def _is_mine(self): - return self.count > 0 and get_ident() == self.ident - - def acquire(self, block=True, timeout=None): - if self.kind == RECURSIVE_MUTEX and self._is_mine(): - self.count += 1 - return True - - if block and timeout is None: - res = pthread.sem_wait(self.handle) - elif not block or timeout <= 0: - res = pthread.sem_trywait(self.handle) - else: - res = _sem_timedwait(self.handle, timeout) - if res < 0: - e = ctypes.get_errno() - if e == errno.EINTR: - return None - elif e in [errno.EAGAIN, errno.ETIMEDOUT]: - return False - raiseFromErrno() - self.count += 1 - self.ident = get_ident() - return True - - def release(self): - if self.kind == RECURSIVE_MUTEX: - assert self._is_mine(), ( - "attempt to release recursive lock not owned by thread") - if self.count > 1: - self.count -= 1 - return - assert self.count == 1 - else: - if sys.platform == 'darwin': - # Handle broken get_value for mac ==> only Lock will work - # as sem_get_value do not work properly - if self.maxvalue == 1: - if pthread.sem_trywait(self.handle) < 0: - e = ctypes.get_errno() - if e != errno.EAGAIN: - raise OSError(e, errno.errorcode[e]) - else: - if pthread.sem_post(self.handle) < 0: - raiseFromErrno() - else: - raise ValueError( - "semaphore or lock released too many times") - else: - import warnings - warnings.warn("semaphore are broken on OSX, release might " - "increase its maximal value", RuntimeWarning) - else: - value = self._get_value() - if value >= self.maxvalue: - raise ValueError( - "semaphore or lock released too many times") - - if pthread.sem_post(self.handle) < 0: - raiseFromErrno() - - self.count -= 1 - - def _get_value(self): - value = ctypes.pointer(ctypes.c_int(-1)) - if pthread.sem_getvalue(self.handle, value) < 0: - raiseFromErrno() - return value.contents.value - - def _count(self): - return self.count - - def _is_zero(self): - if sys.platform == 'darwin': - # Handle broken get_value for mac ==> only Lock will work - # as sem_get_value do not work properly - if pthread.sem_trywait(self.handle) < 0: - e = ctypes.get_errno() - if e == errno.EAGAIN: - return True - raise OSError(e, errno.errorcode[e]) - else: - if pthread.sem_post(self.handle) < 0: - raiseFromErrno() - return False - else: - value = ctypes.pointer(ctypes.c_int(-1)) - if pthread.sem_getvalue(self.handle, value) < 0: - raiseFromErrno() - return value.contents.value == 0 - - def _after_fork(self): - self.count = 0 - - @staticmethod - def _rebuild(handle, kind, maxvalue, name): - self = SemLock.__new__(SemLock) - self.count = 0 - self.ident = 0 - self.kind = kind - self.maxvalue = maxvalue - self.name = name - self.handle = _sem_open(name.encode('ascii')) - return self - - -def raiseFromErrno(): - e = ctypes.get_errno() - raise OSError(e, errno.errorcode[e]) diff --git a/joblib/externals/loky/backend/spawn.py b/joblib/externals/loky/backend/spawn.py index d4a6a8c9e..3a9cc2dd1 100644 --- a/joblib/externals/loky/backend/spawn.py +++ b/joblib/externals/loky/backend/spawn.py @@ -18,7 +18,7 @@ WINSERVICE = False else: import msvcrt - from .reduction import duplicate + from multiprocessing.reduction import duplicate WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") @@ -65,19 +65,12 @@ def get_preparation_data(name, init_main_module=True): ) # Send sys_path and make sure the current directory will not be changed - sys_path = [p for p in sys.path] - try: - i = sys_path.index('') - except ValueError: - pass - else: - sys_path[i] = process.ORIGINAL_DIR - d['sys_path'] = sys_path + d['sys_path'] = [p if p != '' else process.ORIGINAL_DIR for p in sys.path] # Make sure to pass the information if the multiprocessing logger is active if util._logger is not None: d['log_level'] = util._logger.getEffectiveLevel() - if len(util._logger.handlers) > 0: + if util._logger.handlers: h = util._logger.handlers[0] d['log_fmt'] = h.formatter._fmt @@ -105,7 +98,7 @@ def get_preparation_data(name, init_main_module=True): _resource_tracker as mp_resource_tracker ) # multiprocessing's resource_tracker must be running before loky - # process is created (otherwise the child won't be able to use it if it + # process is created (othewise the child won't be able to use it if it # is created later on) mp_resource_tracker.ensure_running() d["mp_tracker_args"] = { @@ -129,8 +122,6 @@ def get_preparation_data(name, init_main_module=True): process.ORIGINAL_DIR is not None): main_path = os.path.join(process.ORIGINAL_DIR, main_path) d['init_main_from_path'] = os.path.normpath(main_path) - # Compat for python2.7 - d['main_path'] = d['init_main_from_path'] return d @@ -249,10 +240,3 @@ def _fixup_main_from_path(main_path): run_name="__mp_main__") main_module.__dict__.update(main_content) sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module - - -def import_main_path(main_path): - ''' - Set sys.modules['__main__'] to module at main_path - ''' - _fixup_main_from_path(main_path) diff --git a/joblib/externals/loky/backend/synchronize.py b/joblib/externals/loky/backend/synchronize.py index 6db77b0de..a6aefa464 100644 --- a/joblib/externals/loky/backend/synchronize.py +++ b/joblib/externals/loky/backend/synchronize.py @@ -5,10 +5,12 @@ # # adapted from multiprocessing/synchronize.py (17/02/2017) # * Remove ctx argument for compatibility reason -# * Implementation of Condition/Event are necessary for compatibility -# with python2.7/3.3, Barrier should be reimplemented to for those -# version (but it is not used in loky). +# * Registers a cleanup function with the loky resource_tracker to remove the +# semaphore when the process dies instead. # +# TODO: investigate which Python version is required to be able to use +# multiprocessing.resource_tracker and therefore multiprocessing.synchronize +# instead of a loky-specific fork. import os import sys @@ -16,11 +18,10 @@ import threading import _multiprocessing from time import time as _time +from multiprocessing import process, util +from multiprocessing.context import assert_spawning -from .context import assert_spawning from . import resource_tracker -from multiprocessing import process -from multiprocessing import util __all__ = [ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' @@ -29,26 +30,19 @@ # raise ImportError for platforms lacking a working sem_open implementation. # See issue 3770 try: - if sys.version_info < (3, 4): - from .semlock import SemLock as _SemLock - from .semlock import sem_unlink - else: - from _multiprocessing import SemLock as _SemLock - from _multiprocessing import sem_unlink -except (ImportError): + from _multiprocessing import SemLock as _SemLock + from _multiprocessing import sem_unlink +except ImportError: raise ImportError("This platform lacks a functioning sem_open" + " implementation, therefore, the required" + " synchronization primitives needed will not" + " function, see issue 3770.") -if sys.version_info[:2] < (3, 3): - FileExistsError = OSError - # # Constants # -RECURSIVE_MUTEX, SEMAPHORE = list(range(2)) +RECURSIVE_MUTEX, SEMAPHORE = range(2) SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX @@ -56,7 +50,7 @@ # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` # -class SemLock(object): +class SemLock: _rand = tempfile._RandomNameSequence() @@ -65,7 +59,7 @@ def __init__(self, kind, value, maxvalue, name=None): unlink_now = False if name is None: # Try to find an unused name for the SemLock instance. - for i in range(100): + for _ in range(100): try: self._semlock = _SemLock( kind, value, maxvalue, SemLock._make_name(), unlink_now @@ -81,8 +75,10 @@ def __init__(self, kind, value, maxvalue, name=None): kind, value, maxvalue, name, unlink_now ) self.name = name - util.debug('created semlock with handle %s and name "%s"' - % (self._semlock.handle, self.name)) + util.debug( + f'created semlock with handle {self._semlock.handle} and name ' + f'"{self.name}"' + ) self._make_methods() @@ -120,14 +116,15 @@ def __getstate__(self): def __setstate__(self, state): self._semlock = _SemLock._rebuild(*state) - util.debug('recreated blocker with handle %r and name "%s"' - % (state[0], state[3])) + util.debug( + f'recreated blocker with handle {state[0]!r} and name "{state[3]}"' + ) self._make_methods() @staticmethod def _make_name(): # OSX does not support long names for semaphores - return '/loky-%i-%s' % (os.getpid(), next(SemLock._rand)) + return f'/loky-{os.getpid()}-{next(SemLock._rand)}' # @@ -149,7 +146,7 @@ def __repr__(self): value = self._semlock._get_value() except Exception: value = 'unknown' - return '<%s(value=%s)>' % (self.__class__.__name__, value) + return f'<{self.__class__.__name__}(value={value})>' # @@ -166,8 +163,10 @@ def __repr__(self): value = self._semlock._get_value() except Exception: value = 'unknown' - return '<%s(value=%s, maxvalue=%s)>' % \ - (self.__class__.__name__, value, self._semlock.maxvalue) + return ( + f'<{self.__class__.__name__}(value={value}, ' + f'maxvalue={self._semlock.maxvalue})>' + ) # @@ -177,14 +176,14 @@ def __repr__(self): class Lock(SemLock): def __init__(self): - super(Lock, self).__init__(SEMAPHORE, 1, 1) + super().__init__(SEMAPHORE, 1, 1) def __repr__(self): try: if self._semlock._is_mine(): name = process.current_process().name if threading.current_thread().name != 'MainThread': - name += '|' + threading.current_thread().name + name = f'{name}|{threading.current_thread().name}' elif self._semlock._get_value() == 1: name = 'None' elif self._semlock._count() > 0: @@ -193,7 +192,7 @@ def __repr__(self): name = 'SomeOtherProcess' except Exception: name = 'unknown' - return '<%s(owner=%s)>' % (self.__class__.__name__, name) + return f'<{self.__class__.__name__}(owner={name})>' # @@ -203,14 +202,14 @@ def __repr__(self): class RLock(SemLock): def __init__(self): - super(RLock, self).__init__(RECURSIVE_MUTEX, 1, 1) + super().__init__(RECURSIVE_MUTEX, 1, 1) def __repr__(self): try: if self._semlock._is_mine(): name = process.current_process().name if threading.current_thread().name != 'MainThread': - name += '|' + threading.current_thread().name + name = f'{name}|{threading.current_thread().name}' count = self._semlock._count() elif self._semlock._get_value() == 1: name, count = 'None', 0 @@ -220,14 +219,14 @@ def __repr__(self): name, count = 'SomeOtherProcess', 'nonzero' except Exception: name, count = 'unknown', 'unknown' - return '<%s(%s, %s)>' % (self.__class__.__name__, name, count) + return f'<{self.__class__.__name__}({name}, {count})>' # # Condition variable # -class Condition(object): +class Condition: def __init__(self, lock=None): self._lock = lock or RLock() @@ -262,8 +261,7 @@ def __repr__(self): self._woken_count._semlock._get_value()) except Exception: num_waiters = 'unknown' - return '<%s(%s, %s)>' % (self.__class__.__name__, - self._lock, num_waiters) + return f'<{self.__class__.__name__}({self._lock}, {num_waiters})>' def wait(self, timeout=None): assert self._lock._semlock._is_mine(), \ @@ -274,7 +272,7 @@ def wait(self, timeout=None): # release lock count = self._lock._semlock._count() - for i in range(count): + for _ in range(count): self._lock.release() try: @@ -285,7 +283,7 @@ def wait(self, timeout=None): self._woken_count.release() # reacquire lock - for i in range(count): + for _ in range(count): self._lock.acquire() def notify(self): @@ -321,7 +319,7 @@ def notify_all(self): sleepers += 1 if sleepers: - for i in range(sleepers): + for _ in range(sleepers): self._woken_count.acquire() # wait for a sleeper to wake # rezero wait_semaphore in case some timeouts just happened @@ -351,7 +349,7 @@ def wait_for(self, predicate, timeout=None): # Event # -class Event(object): +class Event: def __init__(self): self._cond = Condition(Lock()) diff --git a/joblib/externals/loky/backend/utils.py b/joblib/externals/loky/backend/utils.py index 5342204c0..2956614e4 100644 --- a/joblib/externals/loky/backend/utils.py +++ b/joblib/externals/loky/backend/utils.py @@ -4,120 +4,129 @@ import errno import signal import warnings -import threading import subprocess +import traceback try: import psutil except ImportError: psutil = None -WIN32 = sys.platform == "win32" - - -def _flag_current_thread_clean_exit(): - """Put a ``_clean_exit`` flag on the current thread""" - thread = threading.current_thread() - thread._clean_exit = True +def kill_process_tree(process, use_psutil=True): + """Terminate process and its descendants with SIGKILL""" + if use_psutil and psutil is not None: + _kill_process_tree_with_psutil(process) + else: + _kill_process_tree_without_psutil(process) def recursive_terminate(process, use_psutil=True): - if use_psutil and psutil is not None: - _recursive_terminate_with_psutil(process) - else: - _recursive_terminate_without_psutil(process) + warnings.warn( + "recursive_terminate is deprecated in loky 3.2, use kill_process_tree" + "instead", + DeprecationWarning, + ) + kill_process_tree(process, use_psutil=use_psutil) -def _recursive_terminate_with_psutil(process, retries=5): +def _kill_process_tree_with_psutil(process): try: - children = psutil.Process(process.pid).children(recursive=True) + descendants = psutil.Process(process.pid).children(recursive=True) except psutil.NoSuchProcess: return - # Kill the children in reverse order to avoid killing the parents before - # the children in cases where there are more processes nested. - for child in children[::-1]: + # Kill the descendants in reverse order to avoid killing the parents before + # the descendant in cases where there are more processes nested. + for descendant in descendants[::-1]: try: - child.kill() + descendant.kill() except psutil.NoSuchProcess: pass - process.terminate() + try: + psutil.Process(process.pid).kill() + except psutil.NoSuchProcess: + pass process.join() -def _recursive_terminate_without_psutil(process): - """Terminate a process and its descendants. - """ +def _kill_process_tree_without_psutil(process): + """Terminate a process and its descendants.""" try: - _recursive_terminate(process.pid) - except OSError as e: - warnings.warn("Failed to kill subprocesses on this platform. Please" - "install psutil: https://github.com/giampaolo/psutil") - # In case we cannot introspect the children, we fall back to the - # classic Process.terminate. - process.terminate() + if sys.platform == "win32": + _windows_taskkill_process_tree(process.pid) + else: + _posix_recursive_kill(process.pid) + except Exception: # pragma: no cover + details = traceback.format_exc() + warnings.warn( + "Failed to kill subprocesses on this platform. Please install" + "psutil: https://github.com/giampaolo/psutil\n" + f"Details:\n{details}" + ) + # In case we cannot introspect or kill the descendants, we fall back to + # only killing the main process. + # + # Note: on Windows, process.kill() is an alias for process.terminate() + # which in turns calls the Win32 API function TerminateProcess(). + process.kill() process.join() -def _recursive_terminate(pid): - """Recursively kill the descendants of a process before killing it. - """ +def _windows_taskkill_process_tree(pid): + # On windows, the taskkill function with option `/T` terminate a given + # process pid and its children. + try: + subprocess.check_output( + ["taskkill", "/F", "/T", "/PID", str(pid)], stderr=None + ) + except subprocess.CalledProcessError as e: + # In Windows, taskkill returns 128, 255 for no process found. + if e.returncode not in [128, 255]: + # Let's raise to let the caller log the error details in a + # warning and only kill the root process. + raise # pragma: no cover + + +def _kill(pid): + # Not all systems (e.g. Windows) have a SIGKILL, but the C specification + # mandates a SIGTERM signal. While Windows is handled specifically above, + # let's try to be safe for other hypothetic platforms that only have + # SIGTERM without SIGKILL. + kill_signal = getattr(signal, 'SIGKILL', signal.SIGTERM) + try: + os.kill(pid, kill_signal) + except OSError as e: + # if OSError is raised with [Errno 3] no such process, the process + # is already terminated, else, raise the error and let the top + # level function raise a warning and retry to kill the process. + if e.errno != errno.ESRCH: + raise # pragma: no cover - if sys.platform == "win32": - # On windows, the taskkill function with option `/T` terminate a given - # process pid and its children. - try: - subprocess.check_output( - ["taskkill", "/F", "/T", "/PID", str(pid)], - stderr=None) - except subprocess.CalledProcessError as e: - # In windows, taskkill return 1 for permission denied and 128, 255 - # for no process found. - if e.returncode not in [1, 128, 255]: - raise - elif e.returncode == 1: - # Try to kill the process without its descendants if taskkill - # was denied permission. If this fails too, with an error - # different from process not found, let the top level function - # raise a warning and retry to kill the process. - try: - os.kill(pid, signal.SIGTERM) - except OSError as e: - if e.errno != errno.ESRCH: - raise - else: - try: - children_pids = subprocess.check_output( - ["pgrep", "-P", str(pid)], - stderr=None - ) - except subprocess.CalledProcessError as e: - # `ps` returns 1 when no child process has been found - if e.returncode == 1: - children_pids = b'' - else: - raise - - # Decode the result, split the cpid and remove the trailing line - children_pids = children_pids.decode().split('\n')[:-1] - for cpid in children_pids: - cpid = int(cpid) - _recursive_terminate(cpid) +def _posix_recursive_kill(pid): + """Recursively kill the descendants of a process before killing it.""" + try: + children_pids = subprocess.check_output( + ["pgrep", "-P", str(pid)], stderr=None, text=True + ) + except subprocess.CalledProcessError as e: + # `ps` returns 1 when no child process has been found + if e.returncode == 1: + children_pids = '' + else: + raise # pragma: no cover - try: - os.kill(pid, signal.SIGTERM) - except OSError as e: - # if OSError is raised with [Errno 3] no such process, the process - # is already terminated, else, raise the error and let the top - # level function raise a warning and retry to kill the process. - if e.errno != errno.ESRCH: - raise + # Decode the result, split the cpid and remove the trailing line + for cpid in children_pids.splitlines(): + cpid = int(cpid) + _posix_recursive_kill(cpid) + + _kill(pid) def get_exitcodes_terminated_worker(processes): - """Return a formatted string with the exitcodes of terminated workers. + """Return a formated string with the exitcodes of terminated workers. If necessary, wait (up to .25s) for the system to correctly set the exitcode of one terminated worker. @@ -129,7 +138,7 @@ def get_exitcodes_terminated_worker(processes): # the terminated worker. exitcodes = [p.exitcode for p in list(processes.values()) if p.exitcode is not None] - while len(exitcodes) == 0 and patience > 0: + while not exitcodes and patience > 0: patience -= 1 exitcodes = [p.exitcode for p in list(processes.values()) if p.exitcode is not None] @@ -140,7 +149,7 @@ def get_exitcodes_terminated_worker(processes): def _format_exitcodes(exitcodes): """Format a list of exit code with names of the signals if possible""" - str_exitcodes = ["{}({})".format(_get_exitcode_name(e), e) + str_exitcodes = [f"{_get_exitcode_name(e)}({e})" for e in exitcodes if e is not None] return "{" + ", ".join(str_exitcodes) + "}" @@ -154,14 +163,7 @@ def _get_exitcode_name(exitcode): if exitcode < 0: try: import signal - if sys.version_info > (3, 5): - return signal.Signals(-exitcode).name - - # construct an inverse lookup table - for v, k in signal.__dict__.items(): - if (v.startswith('SIG') and not v.startswith('SIG_') and - k == -exitcode): - return v + return signal.Signals(-exitcode).name except ValueError: return "UNKNOWN" elif exitcode != 255: diff --git a/joblib/externals/loky/cloudpickle_wrapper.py b/joblib/externals/loky/cloudpickle_wrapper.py index 7d2cd35c2..0b187e84e 100644 --- a/joblib/externals/loky/cloudpickle_wrapper.py +++ b/joblib/externals/loky/cloudpickle_wrapper.py @@ -1,17 +1,12 @@ import inspect from functools import partial +from joblib.externals.cloudpickle import dumps, loads -try: - from joblib.externals.cloudpickle import dumps, loads - cloudpickle = True -except ImportError: - cloudpickle = False +WRAP_CACHE = {} -WRAP_CACHE = dict() - -class CloudpickledObjectWrapper(object): +class CloudpickledObjectWrapper: def __init__(self, obj, keep_wrapper=False): self._obj = obj self._keep_wrapper = keep_wrapper @@ -24,7 +19,7 @@ def __reduce__(self): return _reconstruct_wrapper, (_pickled_object, self._keep_wrapper) def __getattr__(self, attr): - # Ensure that the wrapped object can be used seamlessly as the + # Ensure that the wrapped object can be used seemlessly as the # previous object. if attr not in ['_obj', '_keep_wrapper']: return getattr(self._obj, attr) @@ -52,9 +47,6 @@ def _reconstruct_wrapper(_pickled_object, keep_wrapper): def _wrap_objects_when_needed(obj): # Function to introspect an object and decide if it should be wrapped or # not. - if not cloudpickle: - return obj - need_wrap = "__main__" in getattr(obj, "__module__", "") if isinstance(obj, partial): return partial( @@ -92,11 +84,6 @@ def wrap_non_picklable_objects(obj, keep_wrapper=True): objects in the main scripts and to implement __reduce__ functions for complex classes. """ - if not cloudpickle: - raise ImportError("could not from joblib.externals import cloudpickle. Please install " - "cloudpickle to allow extended serialization. " - "(`pip install cloudpickle`).") - # If obj is a class, create a CloudpickledClassWrapper which instantiates # the object internally and wrap it directly in a CloudpickledObjectWrapper if inspect.isclass(obj): diff --git a/joblib/externals/loky/initializers.py b/joblib/externals/loky/initializers.py index b734c29e7..cc4b7b17c 100644 --- a/joblib/externals/loky/initializers.py +++ b/joblib/externals/loky/initializers.py @@ -23,12 +23,11 @@ def _make_viztracer_initializer_and_initargs(): except Exception as e: # In case viztracer's API evolve, we do not want to crash loky but # we want to know about it to be able to update loky. - warnings.warn("Unable to introspect viztracer state: {}" - .format(e)) + warnings.warn(f"Unable to introspect viztracer state: {e}") return None, () -class _ChainedInitializer(): +class _ChainedInitializer: """Compound worker initializer This is meant to be used in conjunction with _chain_initializers to @@ -55,7 +54,7 @@ def _chain_initializers(initializer_and_args): filtered_initializers.append(initializer) filtered_initargs.append(initargs) - if len(filtered_initializers) == 0: + if not filtered_initializers: return None, () elif len(filtered_initializers) == 1: return filtered_initializers[0], filtered_initargs[0] @@ -66,8 +65,7 @@ def _chain_initializers(initializer_and_args): def _prepare_initializer(initializer, initargs): if initializer is not None and not callable(initializer): raise TypeError( - "initializer must be a callable, got: {!r}" - .format(initializer) + f"initializer must be a callable, got: {initializer!r}" ) # Introspect runtime to determine if we need to propagate the viztracer diff --git a/joblib/externals/loky/process_executor.py b/joblib/externals/loky/process_executor.py index ffa02041e..19d951f8c 100644 --- a/joblib/externals/loky/process_executor.py +++ b/joblib/externals/loky/process_executor.py @@ -4,7 +4,6 @@ # author: Thomas Moreau and Olivier Grisel # # adapted from concurrent/futures/process_pool_executor.py (17/02/2017) -# * Backport for python2.7/3.3, # * Add an extra management thread to detect executor_manager_thread failures, # * Improve the shutdown process to avoid deadlocks, # * Add timeout for workers, @@ -62,6 +61,7 @@ import os import gc import sys +import queue import struct import weakref import warnings @@ -72,28 +72,19 @@ import multiprocessing as mp from functools import partial from pickle import PicklingError +from concurrent.futures import Executor +from concurrent.futures._base import LOGGER +from concurrent.futures.process import BrokenProcessPool as _BPPException +from multiprocessing.connection import wait -from . import _base +from ._base import Future from .backend import get_context -from .backend.compat import queue -from .backend.compat import wait -from .backend.compat import set_cause from .backend.context import cpu_count from .backend.queues import Queue, SimpleQueue from .backend.reduction import set_loky_pickler, get_loky_pickler_name -from .backend.utils import recursive_terminate, get_exitcodes_terminated_worker +from .backend.utils import kill_process_tree, get_exitcodes_terminated_worker from .initializers import _prepare_initializer -try: - from concurrent.futures.process import BrokenProcessPool as _BPPException -except ImportError: - _BPPException = RuntimeError - - -# Compatibility for python2.7 -if sys.version_info[0] == 2: - ProcessLookupError = OSError - # Mechanism to prevent infinite process spawning. When a worker of a # ProcessPoolExecutor nested in MAX_DEPTH Executor tries to create a new @@ -117,7 +108,7 @@ def _get_memory_usage(pid, force_gc=False): gc.collect() mem_size = Process(pid).memory_info().rss - mp.util.debug('psutil return memory size: {}'.format(mem_size)) + mp.util.debug(f'psutil return memory size: {mem_size}') return mem_size except ImportError: @@ -137,12 +128,7 @@ def close(self): def wakeup(self): if not self._closed: - if sys.platform == "win32" and sys.version_info[:2] < (3, 4): - # Compat for python2.7 on windows, where poll return false for - # b"" messages. Use the slightly larger message b"0". - self._writer.send_bytes(b"0") - else: - self._writer.send_bytes(b"") + self._writer.send_bytes(b"") def clear(self): if not self._closed: @@ -150,7 +136,7 @@ def clear(self): self._reader.recv_bytes() -class _ExecutorFlags(object): +class _ExecutorFlags: """necessary references to maintain executor states without preventing gc It permits to keep the information needed by executor_manager_thread @@ -199,7 +185,7 @@ def _python_exit(): _global_shutdown = True items = list(_threads_wakeups.items()) mp.util.debug("Interpreter shutting down. Waking up " - "executor_manager_thread {}".format(items)) + f"executor_manager_thread {items}") for _, (shutdown_lock, thread_wakeup) in items: with shutdown_lock: thread_wakeup.wakeup() @@ -227,7 +213,7 @@ class _RemoteTraceback(Exception): """Embed stringification of remote traceback in local traceback """ def __init__(self, tb=None): - self.tb = '\n"""\n{}"""'.format(tb) + self.tb = f'\n"""\n{tb}"""' def __str__(self): return self.tb @@ -249,11 +235,11 @@ def __reduce__(self): def _rebuild_exc(exc, tb): - exc = set_cause(exc, _RemoteTraceback(tb)) + exc.__cause__ = _RemoteTraceback(tb) return exc -class _WorkItem(object): +class _WorkItem: __slots__ = ["future", "fn", "args", "kwargs"] @@ -264,7 +250,7 @@ def __init__(self, future, fn, args, kwargs): self.kwargs = kwargs -class _ResultItem(object): +class _ResultItem: def __init__(self, work_id, exception=None, result=None): self.work_id = work_id @@ -272,7 +258,7 @@ def __init__(self, work_id, exception=None, result=None): self.result = result -class _CallItem(object): +class _CallItem: def __init__(self, work_id, fn, args, kwargs): self.work_id = work_id @@ -288,8 +274,9 @@ def __call__(self): return self.fn(*self.args, **self.kwargs) def __repr__(self): - return "CallItem({}, {}, {}, {})".format( - self.work_id, self.fn, self.args, self.kwargs) + return ( + f"CallItem({self.work_id}, {self.fn}, {self.args}, {self.kwargs})" + ) class _SafeQueue(Queue): @@ -299,7 +286,7 @@ def __init__(self, max_size=0, ctx=None, pending_work_items=None, self.thread_wakeup = thread_wakeup self.pending_work_items = pending_work_items self.running_work_items = running_work_items - super(_SafeQueue, self).__init__(max_size, reducers=reducers, ctx=ctx) + super().__init__(max_size, reducers=reducers, ctx=ctx) def _on_queue_feeder_error(self, e, obj): if isinstance(obj, _CallItem): @@ -313,8 +300,7 @@ def _on_queue_feeder_error(self, e, obj): "Could not pickle the task to send it to the workers.") tb = traceback.format_exception( type(e), e, getattr(e, "__traceback__", None)) - raised_error = set_cause(raised_error, - _RemoteTraceback(''.join(tb))) + raised_error.__cause__ = _RemoteTraceback(''.join(tb)) work_item = self.pending_work_items.pop(obj.work_id, None) self.running_work_items.remove(obj.work_id) # work_item can be None if another process terminated. In this @@ -325,15 +311,12 @@ def _on_queue_feeder_error(self, e, obj): del work_item self.thread_wakeup.wakeup() else: - super(_SafeQueue, self)._on_queue_feeder_error(e, obj) + super()._on_queue_feeder_error(e, obj) def _get_chunks(chunksize, *iterables): """Iterates over zip()ed iterables in chunks. """ - if sys.version_info < (3, 3): - it = itertools.izip(*iterables) - else: - it = zip(*iterables) + it = zip(*iterables) while True: chunk = tuple(itertools.islice(it, chunksize)) if not chunk: @@ -389,7 +372,7 @@ def _process_worker(call_queue, result_queue, initializer, initargs, try: initializer(*initargs) except BaseException: - _base.LOGGER.critical('Exception in initializer:', exc_info=True) + LOGGER.critical('Exception in initializer:', exc_info=True) # The parent will notice that the process stopped and # mark the pool broken return @@ -401,15 +384,14 @@ def _process_worker(call_queue, result_queue, initializer, initargs, _last_memory_leak_check = None pid = os.getpid() - mp.util.debug('Worker started with timeout=%s' % timeout) + mp.util.debug(f'Worker started with timeout={timeout}') while True: try: call_item = call_queue.get(block=True, timeout=timeout) if call_item is None: mp.util.info("Shutting down worker on sentinel") except queue.Empty: - mp.util.info("Shutting down worker after timeout %0.3fs" - % timeout) + mp.util.info(f"Shutting down worker after timeout {timeout:0.3f}s") if processes_management_lock.acquire(block=False): processes_management_lock.release() call_item = None @@ -477,7 +459,7 @@ def _process_worker(call_queue, result_queue, initializer, initargs, else: # if psutil is not installed, trigger gc.collect events # regularly to limit potential memory leaks due to reference cycles - if ((_last_memory_leak_check is None) or + if (_last_memory_leak_check is None or (time() - _last_memory_leak_check > _MEMORY_LEAK_CHECK_DELAY)): gc.collect() @@ -514,8 +496,12 @@ def __init__(self, executor): def weakref_cb(_, thread_wakeup=self.thread_wakeup, shutdown_lock=self.shutdown_lock): - mp.util.debug('Executor collected: triggering callback for' - ' QueueManager wakeup') + if mp is not None: + # At this point, the multiprocessing module can already be + # garbage collected. We only log debug info when still + # possible. + mp.util.debug('Executor collected: triggering callback for' + ' QueueManager wakeup') with shutdown_lock: thread_wakeup.wakeup() @@ -548,9 +534,7 @@ def weakref_cb(_, # of new processes or shut down self.processes_management_lock = executor._processes_management_lock - super(_ExecutorManagerThread, self).__init__( - name="ExecutorManagerThread" - ) + super().__init__(name="ExecutorManagerThread") if sys.version_info < (3, 9): self.daemon = True @@ -627,7 +611,7 @@ def wait_result_broken_or_wakeup(self): "A task has failed to un-serialize. Please ensure that" " the arguments of the function are all picklable." ) - set_cause(bpe, result_item) + bpe.__cause__ = result_item else: is_broken = False except BaseException as e: @@ -638,7 +622,7 @@ def wait_result_broken_or_wakeup(self): ) tb = traceback.format_exception( type(e), e, getattr(e, "__traceback__", None)) - set_cause(bpe, _RemoteTraceback(''.join(tb))) + bpe.__cause__ = _RemoteTraceback(''.join(tb)) elif wakeup_reader in ready: # This is simply a wake-up event that might either trigger putting @@ -652,8 +636,9 @@ def wait_result_broken_or_wakeup(self): # In Windows, introspecting terminated workers exitcodes seems # unstable, therefore they are not appended in the exception # message. - exit_codes = "\nThe exit codes of the workers are {}".format( - get_exitcodes_terminated_worker(self.processes) + exit_codes = ( + "\nThe exit codes of the workers are " + f"{get_exitcodes_terminated_worker(self.processes)}" ) mp.util.debug('A worker unexpectedly terminated. Workers that ' 'might have caused the breakage: ' @@ -665,7 +650,7 @@ def wait_result_broken_or_wakeup(self): "terminated. This could be caused by a segmentation fault " "while calling the function or by an excessive memory usage " "causing the Operating System to kill the worker.\n" - "{}".format(exit_codes) + f"{exit_codes}" ) self.thread_wakeup.clear() @@ -739,7 +724,7 @@ def terminate_broken(self, bpe): self.executor_flags.flag_as_broken(bpe) # Mark pending tasks as failed. - for work_id, work_item in self.pending_work_items.items(): + for work_item in self.pending_work_items.values(): work_item.future.set_exception(bpe) # Delete references to object. See issue16284 del work_item @@ -775,10 +760,9 @@ def kill_workers(self, reason=''): # nested parallelism. while self.processes: _, p = self.processes.popitem() - mp.util.debug("terminate process {}, reason: {}" - .format(p.name, reason)) + mp.util.debug(f"terminate process {p.name}, reason: {reason}") try: - recursive_terminate(p) + kill_process_tree(p) except ProcessLookupError: # pragma: no cover pass @@ -801,7 +785,7 @@ def shutdown_workers(self): n_sentinels_sent = 0 while (n_sentinels_sent < n_children_to_stop and self.get_n_children_alive() > 0): - for i in range(n_children_to_stop - n_sentinels_sent): + for _ in range(n_children_to_stop - n_sentinels_sent): try: self.call_queue.put_nowait(None) n_sentinels_sent += 1 @@ -835,7 +819,7 @@ def join_executor_internals(self): p.join() mp.util.debug("executor management thread clean shutdown of worker " - "processes: {}".format(list(self.processes))) + f"processes: {list(self.processes)}") def get_n_children_alive(self): # This is an upper bound on the number of children alive. @@ -849,9 +833,8 @@ def get_n_children_alive(self): def _check_system_limits(): global _system_limits_checked, _system_limited - if _system_limits_checked: - if _system_limited: - raise NotImplementedError(_system_limited) + if _system_limits_checked and _system_limited: + raise NotImplementedError(_system_limited) _system_limits_checked = True try: nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") @@ -866,8 +849,10 @@ def _check_system_limits(): # minimum number of semaphores available # according to POSIX return - _system_limited = ("system provides too few semaphores (%d available, " - "256 necessary)" % nsems_max) + _system_limited = ( + f"system provides too few semaphores ({nsems_max} available, " + "256 necessary)" + ) raise NotImplementedError(_system_limited) @@ -895,8 +880,8 @@ def _check_max_depth(context): if 0 < MAX_DEPTH and _CURRENT_DEPTH + 1 > MAX_DEPTH: raise LokyRecursionError( "Could not spawn extra nested processes at depth superior to " - "MAX_DEPTH={}. If this is intendend, you can change this limit " - "with the LOKY_MAX_DEPTH environment variable.".format(MAX_DEPTH)) + f"MAX_DEPTH={MAX_DEPTH}. If this is intendend, you can change " + "this limit with the LOKY_MAX_DEPTH environment variable.") class LokyRecursionError(RuntimeError): @@ -933,7 +918,7 @@ class ShutdownExecutorError(RuntimeError): """ -class ProcessPoolExecutor(_base.Executor): +class ProcessPoolExecutor(Executor): _at_exit = None @@ -962,8 +947,7 @@ def __init__(self, max_workers=None, job_reducers=None, initargs: A tuple of arguments to pass to the initializer. env: A dict of environment variable to overwrite in the child process. The environment variables are set before any module is - loaded. Note that this only works with the loky context and it - is unreliable under windows with Python < 3.6. + loaded. Note that this only works with the loky context. """ _check_system_limits() @@ -1048,17 +1032,6 @@ def _start_executor_manager_thread(self): if self._executor_manager_thread is None: mp.util.debug('_start_executor_manager_thread called') - # When the executor gets garbage collected, the weakref callback - # will wake up the queue management thread so that it can terminate - # if there is no pending work item. - def weakref_cb( - _, thread_wakeup=self._executor_manager_thread_wakeup, - shutdown_lock=self._shutdown_lock): - mp.util.debug('Executor collected: triggering callback for' - ' QueueManager wakeup') - with self._shutdown_lock: - thread_wakeup.wakeup() - # Start the processes so that their sentinels are known. self._executor_manager_thread = _ExecutorManagerThread(self) self._executor_manager_thread.start() @@ -1099,7 +1072,7 @@ def _adjust_process_count(self): p._worker_exit_lock = worker_exit_lock p.start() self._processes[p.pid] = p - mp.util.debug('Adjust process count : {}'.format(self._processes)) + mp.util.debug(f'Adjust process count : {self._processes}') def _ensure_executor_running(self): """ensures all workers and management thread are running @@ -1123,7 +1096,7 @@ def submit(self, fn, *args, **kwargs): raise RuntimeError('cannot schedule new futures after ' 'interpreter shutdown') - f = _base.Future() + f = Future() w = _WorkItem(f, fn, args, kwargs) self._pending_work_items[self._queue_count] = w @@ -1134,7 +1107,7 @@ def submit(self, fn, *args, **kwargs): self._ensure_executor_running() return f - submit.__doc__ = _base.Executor.submit.__doc__ + submit.__doc__ = Executor.submit.__doc__ def map(self, fn, *iterables, **kwargs): """Returns an iterator equivalent to map(fn, iter). @@ -1163,14 +1136,14 @@ def map(self, fn, *iterables, **kwargs): if chunksize < 1: raise ValueError("chunksize must be >= 1.") - results = super(ProcessPoolExecutor, self).map( + results = super().map( partial(_process_chunk, fn), _get_chunks(chunksize, *iterables), timeout=timeout ) return _chain_from_iterable_of_lists(results) def shutdown(self, wait=True, kill_workers=False): - mp.util.debug('shutting down executor %s' % self) + mp.util.debug(f'shutting down executor {self}') self._flags.flag_as_shutting_down(kill_workers) executor_manager_thread = self._executor_manager_thread @@ -1192,4 +1165,4 @@ def shutdown(self, wait=True, kill_workers=False): self._result_queue = None self._processes_management_lock = None - shutdown.__doc__ = _base.Executor.shutdown.__doc__ + shutdown.__doc__ = Executor.shutdown.__doc__ diff --git a/joblib/externals/loky/reusable_executor.py b/joblib/externals/loky/reusable_executor.py index 9a8e73f37..6b183a0bf 100644 --- a/joblib/externals/loky/reusable_executor.py +++ b/joblib/externals/loky/reusable_executor.py @@ -14,9 +14,6 @@ __all__ = ['get_reusable_executor'] -# Python 2 compat helper -STRING_TYPE = type("") - # Singleton executor and id management _executor_lock = threading.RLock() _next_executor_id = 0 @@ -79,7 +76,7 @@ def get_reusable_executor(max_workers=None, context=None, timeout=10, ``VAR`` are string literals to overwrite the environment variable ``ENV`` in the child processes to value ``VAL``. The environment variables are set in the children before any module is loaded. This only works with with the - ``loky`` context and it is unreliable on Windows with Python < 3.6. + ``loky`` context. """ _executor, _ = _ReusablePoolExecutor.get_reusable_executor( max_workers=max_workers, context=context, timeout=timeout, @@ -95,7 +92,7 @@ def __init__(self, submit_resize_lock, max_workers=None, context=None, timeout=None, executor_id=0, job_reducers=None, result_reducers=None, initializer=None, initargs=(), env=None): - super(_ReusablePoolExecutor, self).__init__( + super().__init__( max_workers=max_workers, context=context, timeout=timeout, job_reducers=job_reducers, result_reducers=result_reducers, initializer=initializer, initargs=initargs, env=env) @@ -118,10 +115,10 @@ def get_reusable_executor(cls, max_workers=None, context=None, timeout=10, max_workers = cpu_count() elif max_workers <= 0: raise ValueError( - "max_workers must be greater than 0, got {}." - .format(max_workers)) + f"max_workers must be greater than 0, got {max_workers}." + ) - if isinstance(context, STRING_TYPE): + if isinstance(context, str): context = get_context(context) if context is not None and context.get_start_method() == "fork": raise ValueError( @@ -135,8 +132,9 @@ def get_reusable_executor(cls, max_workers=None, context=None, timeout=10, env=env) if executor is None: is_reused = False - mp.util.debug("Create a executor with max_workers={}." - .format(max_workers)) + mp.util.debug( + f"Create a executor with max_workers={max_workers}." + ) executor_id = _get_next_executor_id() _executor_kwargs = kwargs _executor = executor = cls( @@ -154,9 +152,10 @@ def get_reusable_executor(cls, max_workers=None, context=None, timeout=10, else: reason = "arguments have changed" mp.util.debug( - "Creating a new executor with max_workers={} as the " - "previous instance cannot be reused ({})." - .format(max_workers, reason)) + "Creating a new executor with max_workers= " + f"{max_workers} as the previous instance cannot be " + f"reused ({reason})." + ) executor.shutdown(wait=True, kill_workers=kill_workers) _executor = executor = _executor_kwargs = None # Recursive call to build a new instance @@ -164,8 +163,8 @@ def get_reusable_executor(cls, max_workers=None, context=None, timeout=10, **kwargs) else: mp.util.debug( - "Reusing existing executor with max_workers={}." - .format(executor._max_workers) + "Reusing existing executor with " + f"max_workers={executor._max_workers}." ) is_reused = True executor._resize(max_workers) @@ -174,8 +173,7 @@ def get_reusable_executor(cls, max_workers=None, context=None, timeout=10, def submit(self, fn, *args, **kwargs): with self._submit_resize_lock: - return super(_ReusablePoolExecutor, self).submit( - fn, *args, **kwargs) + return super().submit(fn, *args, **kwargs) def _resize(self, max_workers): with self._submit_resize_lock: @@ -208,25 +206,28 @@ def _resize(self, max_workers): self._adjust_process_count() processes = list(self._processes.values()) - while not all([p.is_alive() for p in processes]): + while not all(p.is_alive() for p in processes): time.sleep(1e-3) def _wait_job_completion(self): """Wait for the cache to be empty before resizing the pool.""" # Issue a warning to the user about the bad effect of this usage. - if len(self._pending_work_items) > 0: + if self._pending_work_items: warnings.warn("Trying to resize an executor with running jobs: " "waiting for jobs completion before resizing.", UserWarning) - mp.util.debug("Executor {} waiting for jobs completion before" - " resizing".format(self.executor_id)) + mp.util.debug( + f"Executor {self.executor_id} waiting for jobs completion " + "before resizing" + ) # Wait for the completion of the jobs - while len(self._pending_work_items) > 0: + while self._pending_work_items: time.sleep(1e-3) def _setup_queues(self, job_reducers, result_reducers): # As this executor can be resized, use a large queue size to avoid # underestimating capacity and introducing overhead queue_size = 2 * cpu_count() + EXTRA_QUEUED_CALLS - super(_ReusablePoolExecutor, self)._setup_queues( - job_reducers, result_reducers, queue_size=queue_size) + super()._setup_queues( + job_reducers, result_reducers, queue_size=queue_size + ) From 3ba5bde27a7507977cd030560641768d9c2801e8 Mon Sep 17 00:00:00 2001 From: Olivier Grisel Date: Wed, 23 Feb 2022 10:16:25 +0100 Subject: [PATCH 2/2] update changelog --- CHANGES.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index d38ea409a..c5c428a92 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,6 +12,10 @@ Development version the temporary memmap folder contents concurrently. https://github.com/joblib/joblib/pull/1263 +- Vendor loky 3.1.0 with several fixes to more robustly forcibly terminate + worker processes in case of a crash. + https://github.com/joblib/joblib/pull/1269 + Release 1.1.0 --------------