diff --git a/CHANGES.rst b/CHANGES.rst index a77ab0ada..927790a50 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,8 +1,12 @@ Latest changes ============== -In development --------------- +Release 1.3.1 -- 2023/06/29 +--------------------------- + +- Fix compatibility with python 3.7 by vendor loky 3.4.1 + which is compatible with this version. + https://github.com/joblib/joblib/pull/1472 Release 1.3.0 -- 2023/06/28 diff --git a/joblib/__init__.py b/joblib/__init__.py index 730400844..fe32c547c 100644 --- a/joblib/__init__.py +++ b/joblib/__init__.py @@ -106,7 +106,7 @@ # Dev branch marker is: 'X.Y.dev' or 'X.Y.devN' where N is an integer. # 'X.Y.dev0' is the canonical version of 'X.Y.dev' # -__version__ = '1.4.dev0' +__version__ = '1.3.1' import os diff --git a/joblib/_parallel_config.py.bck b/joblib/_parallel_config.py.bck new file mode 100644 index 000000000..01ad6a485 --- /dev/null +++ b/joblib/_parallel_config.py.bck @@ -0,0 +1,542 @@ + +import warnings +import threading + +from ._utils import _Sentinel +from ._multiprocessing_helpers import mp + +from ._parallel_backends import LokyBackend +from ._parallel_backends import ThreadingBackend +from ._parallel_backends import SequentialBackend +from ._parallel_backends import MultiprocessingBackend + +BACKENDS = { + 'threading': ThreadingBackend, + 'sequential': SequentialBackend, +} +# name of the backend used by default by Parallel outside of any context +# managed by ``parallel_config`` or ``parallel_backend``. + +# threading is the only backend that is always everywhere +DEFAULT_BACKEND = 'threading' + +MAYBE_AVAILABLE_BACKENDS = {'multiprocessing', 'loky'} + +# if multiprocessing is available, so is loky, we set it as the default +# backend +if mp is not None: + BACKENDS['multiprocessing'] = MultiprocessingBackend + BACKENDS['loky'] = LokyBackend + DEFAULT_BACKEND = 'loky' + + +DEFAULT_THREAD_BACKEND = 'threading' + + +############################################################################### +def register_parallel_backend(name, factory, make_default=False): + """Register a new Parallel backend factory. + + The new backend can then be selected by passing its name as the backend + argument to the :class:`~Parallel` class. Moreover, the default backend can + be overwritten globally by setting make_default=True. + + The factory can be any callable that takes no argument and return an + instance of ``ParallelBackendBase``. + + Warning: this function is experimental and subject to change in a future + version of joblib. + + .. versionadded:: 0.10 + """ + BACKENDS[name] = factory + if make_default: + global DEFAULT_BACKEND + DEFAULT_BACKEND = name + + +def _register_dask(): + """Register Dask Backend if called with parallel_config(backend="dask")""" + try: + from ._dask import DaskDistributedBackend + register_parallel_backend('dask', DaskDistributedBackend) + except ImportError as e: + msg = ("To use the dask.distributed backend you must install both " + "the `dask` and distributed modules.\n\n" + "See https://dask.pydata.org/en/latest/install.html for more " + "information.") + raise ImportError(msg) from e + + +EXTERNAL_BACKENDS = { + 'dask': _register_dask, +} + +VALID_BACKEND_HINTS = ('processes', 'threads', None) +VALID_BACKEND_CONSTRAINTS = ('sharedmem', None) + + +# Thread local value that can be overridden by the ``parallel_config`` context +# manager +_backend = threading.local() + +# Sentinels for the default values of the Parallel constructor and +# the parallel_config and parallel_backend context managers +default_parallel_config = { + "backend": _Sentinel(default_value=None), + "n_jobs": _Sentinel(default_value=None), + "verbose": _Sentinel(default_value=0), + "temp_folder": _Sentinel(default_value=None), + "max_nbytes": _Sentinel(default_value="1M"), + "mmap_mode": _Sentinel(default_value="r"), + "prefer": _Sentinel(default_value=None), + "require": _Sentinel(default_value=None), + "_parent_config": None, +} + + +def _get_config_param(param, context_config, key): + """Return the value of a parallel config parameter + + Explicitly setting it in Parallel has priority over setting in a + parallel_(config/backend) context manager. + """ + if param is not default_parallel_config[key]: + # param is explicitely set, return it + return param + + if (context_config is not None and key in context_config + and context_config[key] is not default_parallel_config[key]): + # there's a context manager and the key is set, return it + return context_config[key] + + # There is either no context manager or a context manager with the key + # not set. Just return the value from the parent_config if it exists. + if context_config['_parent_config'] is not None: + return _get_config_param(param, context_config['_parent_config'], key) + + # Otherwise, we are in the default_parallel_config, + # return the default value + return param.default_value + + +def get_active_backend( + prefer=default_parallel_config["prefer"], + require=default_parallel_config["require"], + verbose=default_parallel_config["verbose"], +): + """Return the active default backend""" + backend, config = _get_active_backend(prefer, require, verbose) + n_jobs = _get_config_param( + default_parallel_config['n_jobs'], config, "n_jobs" + ) + return backend, n_jobs + + +def _get_active_backend( + prefer=default_parallel_config["prefer"], + require=default_parallel_config["require"], + verbose=default_parallel_config["verbose"], +): + """Return the active default backend""" + + backend_config = getattr(_backend, "config", default_parallel_config) + + backend = _get_config_param( + default_parallel_config['backend'], backend_config, "backend" + ) + prefer = _get_config_param(prefer, backend_config, "prefer") + require = _get_config_param(require, backend_config, "require") + verbose = _get_config_param(verbose, backend_config, "verbose") + + if prefer not in VALID_BACKEND_HINTS: + raise ValueError( + f"prefer={prefer} is not a valid backend hint, " + f"expected one of {VALID_BACKEND_HINTS}" + ) + if require not in VALID_BACKEND_CONSTRAINTS: + raise ValueError( + f"require={require} is not a valid backend constraint, " + f"expected one of {VALID_BACKEND_CONSTRAINTS}" + ) + if prefer == 'processes' and require == 'sharedmem': + raise ValueError( + "prefer == 'processes' and require == 'sharedmem'" + " are inconsistent settings" + ) + + explicit_backend = True + if backend is None: + + # We are either outside of the scope of any parallel_(config/backend) + # context manager or the context manager did not set a backend. + # create the default backend instance now. + backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0) + explicit_backend = False + + # Try to use the backend set by the user with the context manager. + + nesting_level = backend.nesting_level + uses_threads = getattr(backend, 'uses_threads', False) + supports_sharedmem = getattr(backend, 'supports_sharedmem', False) + # Force to use thread-based backend if the provided backend does not + # match the shared memory constraint or if the backend is not explicitely + # given and threads are prefered. + force_threads = (require == 'sharedmem' and not supports_sharedmem) + force_threads |= ( + not explicit_backend and prefer == 'threads' and not uses_threads + ) + if force_threads: + # This backend does not match the shared memory constraint: + # fallback to the default thead-based backend. + sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND]( + nesting_level=nesting_level + ) + # Warn the user if we forced the backend to thread-based, while the + # user explicitely specified a non-thread-based backend. + if verbose >= 10 and explicit_backend: + print( + f"Using {sharedmem_backend.__class__.__name__} as " + f"joblib backend instead of {backend.__class__.__name__} " + "as the latter does not provide shared memory semantics." + ) + return sharedmem_backend, { + 'n_jobs': 1, '_parent_config': backend_config + } + + return backend, backend_config + + +class parallel_config: + """Set the default backend or configuration for :class:`~joblib.Parallel`. + + This is an alternative to directly passing keyword arguments to the + :class:`~joblib.Parallel` class constructor. It is particularly useful when + calling into library code that uses joblib internally but does not expose + the various parallel configuration arguments in its own API. + + Parameters + ---------- + backend : str or ParallelBackendBase instance, default=None + If ``backend`` is a string it must match a previously registered + implementation using the :func:`~register_parallel_backend` function. + + By default the following backends are available: + + - 'loky': single-host, process-based parallelism (used by default), + - 'threading': single-host, thread-based parallelism, + - 'multiprocessing': legacy single-host, process-based parallelism. + + 'loky' is recommended to run functions that manipulate Python objects. + 'threading' is a low-overhead alternative that is most efficient for + functions that release the Global Interpreter Lock: e.g. I/O-bound + code or CPU-bound code in a few calls to native code that explicitly + releases the GIL. Note that on some rare systems (such as pyodide), + multiprocessing and loky may not be available, in which case joblib + defaults to threading. + + In addition, if the ``dask`` and ``distributed`` Python packages are + installed, it is possible to use the 'dask' backend for better + scheduling of nested parallel calls without over-subscription and + potentially distribute parallel calls over a networked cluster of + several hosts. + + It is also possible to use the distributed 'ray' backend for + distributing the workload to a cluster of nodes. See more details + in the Examples section below. + + Alternatively the backend can be passed directly as an instance. + + n_jobs : int, default=None + The maximum number of concurrently running jobs, such as the number + of Python worker processes when ``backend="loky"`` or the size of the + thread-pool when ``backend="threading"``. + If -1 all CPUs are used. If 1 is given, no parallel computing code + is used at all, which is useful for debugging. For ``n_jobs`` below -1, + (n_cpus + 1 + n_jobs) are used. Thus for ``n_jobs=-2``, all + CPUs but one are used. + ``None`` is a marker for 'unset' that will be interpreted as + ``n_jobs=1`` in most backends. + + verbose : int, default=0 + The verbosity level: if non zero, progress messages are + printed. Above 50, the output is sent to stdout. + The frequency of the messages increases with the verbosity level. + If it more than 10, all iterations are reported. + + temp_folder : str, default=None + Folder to be used by the pool for memmapping large arrays + for sharing memory with worker processes. If None, this will try in + order: + + - a folder pointed by the ``JOBLIB_TEMP_FOLDER`` environment + variable, + - ``/dev/shm`` if the folder exists and is writable: this is a + RAM disk filesystem available by default on modern Linux + distributions, + - the default system temporary folder that can be + overridden with ``TMP``, ``TMPDIR`` or ``TEMP`` environment + variables, typically ``/tmp`` under Unix operating systems. + + max_nbytes int, str, or None, optional, default='1M' + Threshold on the size of arrays passed to the workers that + triggers automated memory mapping in temp_folder. Can be an int + in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte. + Use None to disable memmapping of large arrays. + + mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, default='r' + Memmapping mode for numpy arrays passed to workers. None will + disable memmapping, other modes defined in the numpy.memmap doc: + https://numpy.org/doc/stable/reference/generated/numpy.memmap.html + Also, see 'max_nbytes' parameter documentation for more details. + + prefer: str in {'processes', 'threads'} or None, default=None + Soft hint to choose the default backend. + The default process-based backend is 'loky' and the default + thread-based backend is 'threading'. Ignored if the ``backend`` + parameter is specified. + + require: 'sharedmem' or None, default=None + Hard constraint to select the backend. If set to 'sharedmem', + the selected backend will be single-host and thread-based. + + inner_max_num_threads : int, default=None + If not None, overwrites the limit set on the number of threads + usable in some third-party library threadpools like OpenBLAS, + MKL or OpenMP. This is only used with the ``loky`` backend. + + backend_params : dict + Additional parameters to pass to the backend constructor when + backend is a string. + + Notes + ----- + Joblib tries to limit the oversubscription by limiting the number of + threads usable in some third-party library threadpools like OpenBLAS, MKL + or OpenMP. The default limit in each worker is set to + ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be + overwritten with the ``inner_max_num_threads`` argument which will be used + to set this limit in the child processes. + + .. versionadded:: 1.3 + + Examples + -------- + >>> from operator import neg + >>> with parallel_config(backend='threading'): + ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) + ... + [-1, -2, -3, -4, -5] + + To use the 'ray' joblib backend add the following lines: + + >>> from ray.util.joblib import register_ray # doctest: +SKIP + >>> register_ray() # doctest: +SKIP + >>> with parallel_config(backend="ray"): # doctest: +SKIP + ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) + [-1, -2, -3, -4, -5] + + """ + def __init__( + self, + backend=default_parallel_config["backend"], + *, + n_jobs=default_parallel_config["n_jobs"], + verbose=default_parallel_config["verbose"], + temp_folder=default_parallel_config["temp_folder"], + max_nbytes=default_parallel_config["max_nbytes"], + mmap_mode=default_parallel_config["mmap_mode"], + prefer=default_parallel_config["prefer"], + require=default_parallel_config["require"], + inner_max_num_threads=None, + **backend_params + ): + # Save the parallel info and set the active parallel config + old_parallel_config = getattr( + _backend, "config", default_parallel_config + ) + + self.parallel_config = { + "n_jobs": n_jobs, + "verbose": verbose, + "temp_folder": temp_folder, + "max_nbytes": max_nbytes, + "mmap_mode": mmap_mode, + "prefer": prefer, + "require": require, + "_parent_config": old_parallel_config, + } + + backend = self._check_backend( + backend, inner_max_num_threads, **backend_params + ) + + self.parallel_config["backend"] = backend + setattr(_backend, "config", self.parallel_config) + + def _check_backend(self, backend, inner_max_num_threads, **backend_params): + if backend is default_parallel_config['backend']: + if inner_max_num_threads is not None or len(backend_params) > 0: + raise ValueError( + "inner_max_num_threads and other constructor " + "parameters backend_params are only supported " + "when backend is not None." + ) + return backend + + if isinstance(backend, str): + # Handle non-registered or missing backends + if backend not in BACKENDS: + if backend in EXTERNAL_BACKENDS: + register = EXTERNAL_BACKENDS[backend] + register() + elif backend in MAYBE_AVAILABLE_BACKENDS: + warnings.warn( + f"joblib backend '{backend}' is not available on " + f"your system, falling back to {DEFAULT_BACKEND}.", + UserWarning, + stacklevel=2 + ) + BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND] + else: + raise ValueError( + f"Invalid backend: {backend}, expected one of " + f"{sorted(BACKENDS.keys())}" + ) + + backend = BACKENDS[backend](**backend_params) + + if inner_max_num_threads is not None: + msg = ( + f"{backend.__class__.__name__} does not accept setting the " + "inner_max_num_threads argument." + ) + assert backend.supports_inner_max_num_threads, msg + backend.inner_max_num_threads = inner_max_num_threads + + # If the nesting_level of the backend is not set previously, use the + # nesting level from the previous active_backend to set it + if backend.nesting_level is None: + parent_backend = self.parallel_config['_parent_config']['backend'] + if parent_backend is default_parallel_config['backend']: + nesting_level = 0 + else: + nesting_level = parent_backend.nesting_level + backend.nesting_level = nesting_level + + return backend + + def __enter__(self): + return self.parallel_config + + def __exit__(self, type, value, traceback): + self.unregister() + + def unregister(self): + setattr(_backend, "config", self.parallel_config['_parent_config']) + + +class parallel_backend(parallel_config): + """Change the default backend used by Parallel inside a with block. + + It is advised to use the :class:`~joblib.parallel.parallel_config` context + manager instead, which allows more fine-grained control over the backend + configuration. + + If ``backend`` is a string it must match a previously registered + implementation using the :func:`~register_parallel_backend` function. + + By default the following backends are available: + + - 'loky': single-host, process-based parallelism (used by default), + - 'threading': single-host, thread-based parallelism, + - 'multiprocessing': legacy single-host, process-based parallelism. + + 'loky' is recommended to run functions that manipulate Python objects. + 'threading' is a low-overhead alternative that is most efficient for + functions that release the Global Interpreter Lock: e.g. I/O-bound code or + CPU-bound code in a few calls to native code that explicitly releases the + GIL. Note that on some rare systems (such as Pyodide), + multiprocessing and loky may not be available, in which case joblib + defaults to threading. + + You can also use the `Dask `_ joblib + backend to distribute work across machines. This works well with + scikit-learn estimators with the ``n_jobs`` parameter, for example:: + + >>> import joblib # doctest: +SKIP + >>> from sklearn.model_selection import GridSearchCV # doctest: +SKIP + >>> from dask.distributed import Client, LocalCluster # doctest: +SKIP + + >>> # create a local Dask cluster + >>> cluster = LocalCluster() # doctest: +SKIP + >>> client = Client(cluster) # doctest: +SKIP + >>> grid_search = GridSearchCV(estimator, param_grid, n_jobs=-1) + ... # doctest: +SKIP + >>> with joblib.parallel_backend("dask", scatter=[X, y]): # doctest: +SKIP + ... grid_search.fit(X, y) + + It is also possible to use the distributed 'ray' backend for distributing + the workload to a cluster of nodes. To use the 'ray' joblib backend add + the following lines:: + + >>> from ray.util.joblib import register_ray # doctest: +SKIP + >>> register_ray() # doctest: +SKIP + >>> with parallel_backend("ray"): # doctest: +SKIP + ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) + [-1, -2, -3, -4, -5] + + Alternatively the backend can be passed directly as an instance. + + By default all available workers will be used (``n_jobs=-1``) unless the + caller passes an explicit value for the ``n_jobs`` parameter. + + This is an alternative to passing a ``backend='backend_name'`` argument to + the :class:`~Parallel` class constructor. It is particularly useful when + calling into library code that uses joblib internally but does not expose + the backend argument in its own API. + + >>> from operator import neg + >>> with parallel_backend('threading'): + ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) + ... + [-1, -2, -3, -4, -5] + + Joblib also tries to limit the oversubscription by limiting the number of + threads usable in some third-party library threadpools like OpenBLAS, MKL + or OpenMP. The default limit in each worker is set to + ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be + overwritten with the ``inner_max_num_threads`` argument which will be used + to set this limit in the child processes. + + .. versionadded:: 0.10 + + See Also + -------- + joblib.parallel.parallel_config : context manager to change the backend + configuration + """ + def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None, + **backend_params): + + super().__init__( + backend=backend, + n_jobs=n_jobs, + inner_max_num_threads=inner_max_num_threads, + **backend_params + ) + + if self.parallel_config['_parent_config'] is None: + self.old_backend_and_jobs = None + else: + self.old_backend_and_jobs = ( + self.parallel_config['_parent_config']["backend"], + self.parallel_config['_parent_config']["n_jobs"], + ) + self.new_backend_and_jobs = ( + self.parallel_config["backend"], + self.parallel_config["n_jobs"], + ) + + def __enter__(self): + return self.new_backend_and_jobs diff --git a/joblib/externals/loky/__init__.py b/joblib/externals/loky/__init__.py index e9df6869f..5886d2a62 100644 --- a/joblib/externals/loky/__init__.py +++ b/joblib/externals/loky/__init__.py @@ -41,4 +41,4 @@ ] -__version__ = "3.4.0" +__version__ = "3.4.1" diff --git a/joblib/externals/loky/backend/context.py b/joblib/externals/loky/backend/context.py index 433d6f5d9..d0f590317 100644 --- a/joblib/externals/loky/backend/context.py +++ b/joblib/externals/loky/backend/context.py @@ -18,14 +18,20 @@ import multiprocessing as mp from multiprocessing import get_context as mp_get_context from multiprocessing.context import BaseContext -from concurrent.futures.process import _MAX_WINDOWS_WORKERS + from .process import LokyProcess, LokyInitMainProcess # Apparently, on older Python versions, loky cannot work 61 workers on Windows # but instead 60: ¯\_(ツ)_/¯ -if sys.version_info < (3, 10): - _MAX_WINDOWS_WORKERS = _MAX_WINDOWS_WORKERS - 1 +if sys.version_info >= (3, 8): + from concurrent.futures.process import _MAX_WINDOWS_WORKERS + + if sys.version_info < (3, 10): + _MAX_WINDOWS_WORKERS = _MAX_WINDOWS_WORKERS - 1 +else: + # compat for versions before 3.8 which do not define this. + _MAX_WINDOWS_WORKERS = 60 START_METHODS = ["loky", "loky_init_main", "spawn"] if sys.platform != "win32":