Skip to content

Commit

Permalink
No multiprocessing (#1256)
Browse files Browse the repository at this point in the history
Co-authored-by: Hood Chatham <roberthoodchatham@gmail.com>
Co-authored-by: Olivier Grisel <olivier.grisel@ensta.org>
  • Loading branch information
3 people committed Feb 8, 2022
1 parent 978bd90 commit a64b808
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 33 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Expand Up @@ -4,6 +4,10 @@ Latest changes
Development version
-------------------

- Make sure that joblib works even when multiprocessing is not available,
for instance with Pyodide
https://github.com/joblib/joblib/pull/1256

- Avoid unnecessary warnings when workers and main process delete
the temporary memmap folder contents concurrently.
https://github.com/joblib/joblib/pull/1263
Expand Down
7 changes: 7 additions & 0 deletions doc/parallel.rst
Expand Up @@ -71,6 +71,13 @@ call to :class:`joblib.Parallel` but this is now considered a bad pattern
(when done in a library) as it does not make it possible to override that
choice with the ``parallel_backend`` context manager.


.. topic:: The loky backend may not always be available

Some rare systems do not support multiprocessing (for instance
Pyodide). In this case the loky backend is not available and the
default backend falls back to threading.

Besides builtin joblib backends, we can use
`Joblib Apache Spark Backend <https://github.com/joblib/joblib-spark>`_
to distribute joblib tasks on a Spark cluster.
Expand Down
3 changes: 1 addition & 2 deletions joblib/__init__.py
Expand Up @@ -123,8 +123,7 @@
from .parallel import register_parallel_backend
from .parallel import parallel_backend
from .parallel import effective_n_jobs

from .externals.loky import wrap_non_picklable_objects
from ._cloudpickle_wrapper import wrap_non_picklable_objects


__all__ = ['Memory', 'MemorizedResult', 'PrintTime', 'Logger', 'hash', 'dump',
Expand Down
17 changes: 17 additions & 0 deletions joblib/_cloudpickle_wrapper.py
@@ -0,0 +1,17 @@
"""
Small shim of loky's cloudpickle_wrapper to avoid failure when
multiprocessing is not available.
"""


from ._multiprocessing_helpers import mp


def my_wrap_non_picklable_objects(obj, keep_wrapper=True):
return obj


if mp is None:
wrap_non_picklable_objects = my_wrap_non_picklable_objects
else:
from .externals.loky import wrap_non_picklable_objects # noqa
1 change: 1 addition & 0 deletions joblib/_multiprocessing_helpers.py
Expand Up @@ -14,6 +14,7 @@
if mp:
try:
import multiprocessing as mp
import _multiprocessing # noqa
except ImportError:
mp = None

Expand Down
55 changes: 46 additions & 9 deletions joblib/parallel.py
Expand Up @@ -27,7 +27,6 @@
ThreadingBackend, SequentialBackend,
LokyBackend)
from .externals.cloudpickle import dumps, loads
from .externals import loky

# Make sure that those two classes are part of the public joblib.parallel API
# so that 3rd party backend implementers can import them from here.
Expand All @@ -36,15 +35,28 @@


BACKENDS = {
'multiprocessing': MultiprocessingBackend,
'threading': ThreadingBackend,
'sequential': SequentialBackend,
'loky': LokyBackend,
}
# name of the backend used by default by Parallel outside of any context
# managed by ``parallel_backend``.
DEFAULT_BACKEND = 'loky'

# threading is the only backend that is always everywhere
DEFAULT_BACKEND = 'threading'

DEFAULT_N_JOBS = 1

MAYBE_AVAILABLE_BACKENDS = {'multiprocessing', 'loky'}

# if multiprocessing is available, so is loky, we set it as the default
# backend
if mp is not None:
BACKENDS['multiprocessing'] = MultiprocessingBackend
from .externals import loky
BACKENDS['loky'] = LokyBackend
DEFAULT_BACKEND = 'loky'


DEFAULT_THREAD_BACKEND = 'threading'

# Thread local value that can be overridden by the ``parallel_backend`` context
Expand Down Expand Up @@ -135,7 +147,9 @@ class parallel_backend(object):
'threading' is a low-overhead alternative that is most efficient for
functions that release the Global Interpreter Lock: e.g. I/O-bound code or
CPU-bound code in a few calls to native code that explicitly releases the
GIL.
GIL. Note that on some rare systems (such as pyiodine),
multiprocessing and loky may not be available, in which case joblib
defaults to threading.
In addition, if the `dask` and `distributed` Python packages are installed,
it is possible to use the 'dask' backend for better scheduling of nested
Expand Down Expand Up @@ -184,9 +198,20 @@ class parallel_backend(object):
def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
**backend_params):
if isinstance(backend, str):
if backend not in BACKENDS and backend in EXTERNAL_BACKENDS:
register = EXTERNAL_BACKENDS[backend]
register()
if backend not in BACKENDS:
if backend in EXTERNAL_BACKENDS:
register = EXTERNAL_BACKENDS[backend]
register()
elif backend in MAYBE_AVAILABLE_BACKENDS:
warnings.warn(
f"joblib backend '{backend}' is not available on "
f"your system, falling back to {DEFAULT_BACKEND}.",
UserWarning,
stacklevel=2)
BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
else:
raise ValueError("Invalid backend: %s, expected one of %r"
% (backend, sorted(BACKENDS.keys())))

backend = BACKENDS[backend](**backend_params)

Expand Down Expand Up @@ -436,7 +461,9 @@ class Parallel(Logger):
- "loky" used by default, can induce some
communication and memory overhead when exchanging input and
output data with the worker Python processes.
output data with the worker Python processes. On some rare
systems (such as Pyiodide), the loky backend may not be
available.
- "multiprocessing" previous process-based backend based on
`multiprocessing.Pool`. Less robust than `loky`.
- "threading" is a very low-overhead backend but it suffers
Expand Down Expand Up @@ -690,6 +717,16 @@ def __init__(self, n_jobs=None, backend=None, verbose=0, timeout=None,
# preload modules on the forkserver helper process.
self._backend_args['context'] = backend
backend = MultiprocessingBackend(nesting_level=nesting_level)

elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS:
warnings.warn(
f"joblib backend '{backend}' is not available on "
f"your system, falling back to {DEFAULT_BACKEND}.",
UserWarning,
stacklevel=2)
BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
backend = BACKENDS[DEFAULT_BACKEND](nesting_level=nesting_level)

else:
try:
backend_factory = BACKENDS[backend]
Expand Down
27 changes: 27 additions & 0 deletions joblib/test/test_cloudpickle_wrapper.py
@@ -0,0 +1,27 @@
"""
Test that our implementation of wrap_non_picklable_objects mimics
properly the loky implementation.
"""

from .._cloudpickle_wrapper import wrap_non_picklable_objects
from .._cloudpickle_wrapper import my_wrap_non_picklable_objects


def a_function(x):
return x


class AClass(object):

def __call__(self, x):
return x


def test_wrap_non_picklable_objects():
# Mostly a smoke test: test that we can use callable in the same way
# with both our implementation of wrap_non_picklable_objects and the
# upstream one
for obj in (a_function, AClass()):
wrapped_obj = wrap_non_picklable_objects(obj)
my_wrapped_obj = my_wrap_non_picklable_objects(obj)
assert wrapped_obj(1) == my_wrapped_obj(1)
5 changes: 4 additions & 1 deletion joblib/test/test_memmapping.py
Expand Up @@ -148,7 +148,8 @@ def reconstruct_array_or_memmap(x):
assert_array_equal(b3_reconstructed, b3)


@skipif(sys.platform != "win32",
@with_multiprocessing
@skipif((sys.platform != "win32") or (),
reason="PermissionError only easily triggerable on Windows")
def test_resource_tracker_retries_when_permissionerror(tmpdir):
# Test resource_tracker retry mechanism when unlinking memmaps. See more
Expand Down Expand Up @@ -357,6 +358,7 @@ def test_pool_with_memmap_array_view(factory, tmpdir):


@with_numpy
@with_multiprocessing
@parametrize("backend", ["multiprocessing", "loky"])
def test_permission_error_windows_reference_cycle(backend):
# Non regression test for:
Expand Down Expand Up @@ -391,6 +393,7 @@ def test_permission_error_windows_reference_cycle(backend):


@with_numpy
@with_multiprocessing
@parametrize("backend", ["multiprocessing", "loky"])
def test_permission_error_windows_memmap_sent_to_parent(backend):
# Second non-regression test for:
Expand Down
32 changes: 32 additions & 0 deletions joblib/test/test_missing_multiprocessing.py
@@ -0,0 +1,32 @@
"""
Pyodide and other single-threaded Python builds will be missing the
_multiprocessing module. Test that joblib still works in this environment.
"""

import os
import subprocess
import sys


def test_missing_multiprocessing(tmp_path):
"""
Test that import joblib works even if _multiprocessing is missing.
pytest has already imported everything from joblib. The most reasonable way
to test importing joblib with modified environment is to invoke a separate
Python process. This also ensures that we don't break other tests by
importing a bad `_multiprocessing` module.
"""
(tmp_path / "_multiprocessing.py").write_text(
'raise ImportError("No _multiprocessing module!")'
)
env = dict(os.environ)
# For subprocess, use current sys.path with our custom version of
# multiprocessing inserted.
env["PYTHONPATH"] = ":".join([str(tmp_path)] + sys.path)
subprocess.check_call(
[sys.executable, "-c",
"import joblib, math; "
"joblib.Parallel(n_jobs=1)("
"joblib.delayed(math.sqrt)(i**2) for i in range(10))"
], env=env)
5 changes: 4 additions & 1 deletion joblib/test/test_module.py
@@ -1,14 +1,15 @@
import sys
import joblib
import pytest
from joblib.testing import check_subprocess_call
from joblib.test.common import with_multiprocessing


def test_version():
assert hasattr(joblib, '__version__'), (
"There are no __version__ argument on the joblib module")


@with_multiprocessing
def test_no_start_method_side_effect_on_import():
# check that importing joblib does not implicitly set the global
# start_method for multiprocessing.
Expand All @@ -22,6 +23,7 @@ def test_no_start_method_side_effect_on_import():
check_subprocess_call([sys.executable, '-c', code])


@with_multiprocessing
def test_no_semaphore_tracker_on_import():
# check that importing joblib does not implicitly spawn a resource tracker
# or a semaphore tracker
Expand All @@ -38,6 +40,7 @@ def test_no_semaphore_tracker_on_import():
check_subprocess_call([sys.executable, '-c', code])


@with_multiprocessing
def test_no_resource_tracker_on_import():
code = """if True:
import joblib
Expand Down

0 comments on commit a64b808

Please sign in to comment.