Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No multiprocessing #1256

Merged
merged 30 commits into from Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4b836c3
Add test for missing _multiprocessing module
hoodmane Jan 10, 2022
d515fd6
Run formatter, update comments
hoodmane Jan 10, 2022
9552ef2
Ignore missing_multiprocessing folder in pytest
hoodmane Jan 10, 2022
0b78306
Fix test to be independent of which directory pytest is run in
hoodmane Jan 10, 2022
a228f3f
Tidy up
hoodmane Jan 10, 2022
afe4bba
Apply formatter
hoodmane Jan 10, 2022
0bb2f0d
Update test according to rth's suggestion
hoodmane Jan 10, 2022
473ccac
Fix file name
hoodmane Jan 10, 2022
cfd27a0
Apply formatter
hoodmane Jan 10, 2022
045cfd2
Revert change to setup.cfg
hoodmane Jan 10, 2022
f50c2d3
ENH: make joblib robust to non-working multiprocessing
GaelVaroquaux Feb 3, 2022
8e838a1
Fix stupid error
GaelVaroquaux Feb 3, 2022
b56e6d5
Fix all test not to depend on multiprocessing
GaelVaroquaux Feb 3, 2022
7bd1771
Fix linting
GaelVaroquaux Feb 3, 2022
477949c
Changelog and documentation
GaelVaroquaux Feb 4, 2022
439c60c
Backend fallback if multiprocessing is not available
GaelVaroquaux Feb 5, 2022
a730ce8
Fix prior commit
GaelVaroquaux Feb 5, 2022
3cbf561
Fix tests
GaelVaroquaux Feb 5, 2022
cac6e90
Merge branch 'master' into no_multiprocessing
ogrisel Feb 7, 2022
fce2c7a
Update joblib/parallel.py
GaelVaroquaux Feb 7, 2022
6949cd8
Update doc/parallel.rst
GaelVaroquaux Feb 7, 2022
3496274
fix tests
GaelVaroquaux Feb 7, 2022
928909c
fix tests
GaelVaroquaux Feb 7, 2022
b94141b
Better tests
GaelVaroquaux Feb 7, 2022
9c2d03e
Fix tests
GaelVaroquaux Feb 7, 2022
0623569
more test fixing
GaelVaroquaux Feb 7, 2022
60629a5
More robust tests
GaelVaroquaux Feb 7, 2022
eb370e4
Tmp to find test called
GaelVaroquaux Feb 7, 2022
93d2bd1
more debugging
GaelVaroquaux Feb 7, 2022
b0b3663
This should fix things
GaelVaroquaux Feb 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.rst
Expand Up @@ -4,6 +4,10 @@ Latest changes
Development version
-------------------

- Make sure that joblib works even when multiprocessing is not available,
for instance with Pyodide
https://github.com/joblib/joblib/pull/1256

- Avoid unnecessary warnings when workers and main process delete
the temporary memmap folder contents concurrently.
https://github.com/joblib/joblib/pull/1263
Expand Down
7 changes: 7 additions & 0 deletions doc/parallel.rst
Expand Up @@ -71,6 +71,13 @@ call to :class:`joblib.Parallel` but this is now considered a bad pattern
(when done in a library) as it does not make it possible to override that
choice with the ``parallel_backend`` context manager.


.. topic:: The loky backend may not always be available

Some rare systems do not support multiprocessing (for instance
pyiodine). In this case the loky backend is not availble and the
GaelVaroquaux marked this conversation as resolved.
Show resolved Hide resolved
default backend falls back to threading.

Besides builtin joblib backends, we can use
`Joblib Apache Spark Backend <https://github.com/joblib/joblib-spark>`_
to distribute joblib tasks on a Spark cluster.
Expand Down
3 changes: 1 addition & 2 deletions joblib/__init__.py
Expand Up @@ -123,8 +123,7 @@
from .parallel import register_parallel_backend
from .parallel import parallel_backend
from .parallel import effective_n_jobs

from .externals.loky import wrap_non_picklable_objects
from ._cloudpickle_wrapper import wrap_non_picklable_objects


__all__ = ['Memory', 'MemorizedResult', 'PrintTime', 'Logger', 'hash', 'dump',
Expand Down
17 changes: 17 additions & 0 deletions joblib/_cloudpickle_wrapper.py
@@ -0,0 +1,17 @@
"""
Small shim of loky's cloudpickle_wrapper to avoid failure when
multiprocessing is not available.
"""


from ._multiprocessing_helpers import mp


def my_wrap_non_picklable_objects(obj, keep_wrapper=True):
return obj


if mp is None:
wrap_non_picklable_objects = my_wrap_non_picklable_objects
else:
from .externals.loky import wrap_non_picklable_objects # noqa
1 change: 1 addition & 0 deletions joblib/_multiprocessing_helpers.py
Expand Up @@ -14,6 +14,7 @@
if mp:
try:
import multiprocessing as mp
import _multiprocessing # noqa
except ImportError:
mp = None

Expand Down
52 changes: 43 additions & 9 deletions joblib/parallel.py
Expand Up @@ -27,7 +27,6 @@
ThreadingBackend, SequentialBackend,
LokyBackend)
from .externals.cloudpickle import dumps, loads
from .externals import loky

# Make sure that those two classes are part of the public joblib.parallel API
# so that 3rd party backend implementers can import them from here.
Expand All @@ -36,15 +35,28 @@


BACKENDS = {
'multiprocessing': MultiprocessingBackend,
'threading': ThreadingBackend,
'sequential': SequentialBackend,
'loky': LokyBackend,
}
# name of the backend used by default by Parallel outside of any context
# managed by ``parallel_backend``.
DEFAULT_BACKEND = 'loky'

# threading is the only backend that is always everywhere
DEFAULT_BACKEND = 'threading'

DEFAULT_N_JOBS = 1

MAYBE_AVAILABLE_BACKENDS = {'multiprocessing', 'loky'}

# if multiprocessing is available, so is loky, we set it as the default
# backend
if mp is not None:
BACKENDS['multiprocessing'] = MultiprocessingBackend
from .externals import loky
BACKENDS['loky'] = LokyBackend
DEFAULT_BACKEND = 'loky'


DEFAULT_THREAD_BACKEND = 'threading'

# Thread local value that can be overridden by the ``parallel_backend`` context
Expand Down Expand Up @@ -135,7 +147,9 @@ class parallel_backend(object):
'threading' is a low-overhead alternative that is most efficient for
functions that release the Global Interpreter Lock: e.g. I/O-bound code or
CPU-bound code in a few calls to native code that explicitly releases the
GIL.
GIL. Note that on some rare systems (such as pyiodine),
multiprocessing and loky may not be available, in which case joblib
defaults to threading.

In addition, if the `dask` and `distributed` Python packages are installed,
it is possible to use the 'dask' backend for better scheduling of nested
Expand Down Expand Up @@ -184,9 +198,20 @@ class parallel_backend(object):
def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
**backend_params):
if isinstance(backend, str):
if backend not in BACKENDS and backend in EXTERNAL_BACKENDS:
register = EXTERNAL_BACKENDS[backend]
register()
if backend not in BACKENDS:
if backend in EXTERNAL_BACKENDS:
register = EXTERNAL_BACKENDS[backend]
register()
elif backend in MAYBE_AVAILABLE_BACKENDS:
warnings.warn(
UserWarning,
f"joblib backend '{backend}' is not available on ",
f"your system, falling back to {DEFAULT_BACKEND}.",
stacklevel=2)
BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
else:
raise ValueError("Invalid backend: %s, expected one of %r"
% (backend, sorted(BACKENDS.keys())))

backend = BACKENDS[backend](**backend_params)

Expand Down Expand Up @@ -436,7 +461,9 @@ class Parallel(Logger):

- "loky" used by default, can induce some
communication and memory overhead when exchanging input and
output data with the worker Python processes.
output data with the worker Python processes. On some rare
systems (such as pyiodine), the loky backend may not be
GaelVaroquaux marked this conversation as resolved.
Show resolved Hide resolved
available.
- "multiprocessing" previous process-based backend based on
`multiprocessing.Pool`. Less robust than `loky`.
- "threading" is a very low-overhead backend but it suffers
Expand Down Expand Up @@ -690,6 +717,13 @@ def __init__(self, n_jobs=None, backend=None, verbose=0, timeout=None,
# preload modules on the forkserver helper process.
self._backend_args['context'] = backend
backend = MultiprocessingBackend(nesting_level=nesting_level)
elif backend not in BACKENDS and backend in MAYBE_AVAILABLE_BACKENDS:
warnings.warn(
UserWarning,
f"joblib backend '{backend}' is not available on ",
f"your system, falling back to {DEFAULT_BACKEND}.",
stacklevel=2)
BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
else:
try:
backend_factory = BACKENDS[backend]
Expand Down
27 changes: 27 additions & 0 deletions joblib/test/test_cloudpickle_wrapper.py
@@ -0,0 +1,27 @@
"""
Test that our implementation of wrap_non_picklable_objects mimics
properly the loky implementation.
"""

from .._cloudpickle_wrapper import wrap_non_picklable_objects
from .._cloudpickle_wrapper import my_wrap_non_picklable_objects


def a_function(x):
return x


class AClass(object):

def __call__(self, x):
return x


def test_wrap_non_picklable_objects():
# Mostly a smoke test: test that we can use callable in the same way
# with both our implementation of wrap_non_picklable_objects and the
# upstream one
for obj in (a_function, AClass()):
wrapped_obj = wrap_non_picklable_objects(obj)
my_wrapped_obj = my_wrap_non_picklable_objects(obj)
assert wrapped_obj(1) == my_wrapped_obj(1)
5 changes: 4 additions & 1 deletion joblib/test/test_memmapping.py
Expand Up @@ -148,7 +148,8 @@ def reconstruct_array_or_memmap(x):
assert_array_equal(b3_reconstructed, b3)


@skipif(sys.platform != "win32",
@with_multiprocessing
@skipif((sys.platform != "win32") or (),
ogrisel marked this conversation as resolved.
Show resolved Hide resolved
reason="PermissionError only easily triggerable on Windows")
def test_resource_tracker_retries_when_permissionerror(tmpdir):
# Test resource_tracker retry mechanism when unlinking memmaps. See more
Expand Down Expand Up @@ -357,6 +358,7 @@ def test_pool_with_memmap_array_view(factory, tmpdir):


@with_numpy
@with_multiprocessing
@parametrize("backend", ["multiprocessing", "loky"])
def test_permission_error_windows_reference_cycle(backend):
# Non regression test for:
Expand Down Expand Up @@ -391,6 +393,7 @@ def test_permission_error_windows_reference_cycle(backend):


@with_numpy
@with_multiprocessing
@parametrize("backend", ["multiprocessing", "loky"])
def test_permission_error_windows_memmap_sent_to_parent(backend):
# Second non-regression test for:
Expand Down
32 changes: 32 additions & 0 deletions joblib/test/test_missing_multiprocessing.py
@@ -0,0 +1,32 @@
"""
Pyodide and other single-threaded Python builds will be missing the
_multiprocessing module. Test that joblib still works in this environment.
"""

import os
import subprocess
import sys


def test_missing_multiprocessing(tmp_path):
"""
Test that import joblib works even if _multiprocessing is missing.

pytest has already imported everything from joblib. The most reasonable way
to test importing joblib with modified environment is to invoke a separate
Python process. This also ensures that we don't break other tests by
importing a bad `_multiprocessing` module.
"""
(tmp_path / "_multiprocessing.py").write_text(
'raise ImportError("No _multiprocessing module!")'
)
env = dict(os.environ)
# For subprocess, use current sys.path with our custom version of
# multiprocessing inserted.
env["PYTHONPATH"] = ":".join([str(tmp_path)] + sys.path)
subprocess.check_call(
[sys.executable, "-c",
"import joblib, math; "
"joblib.Parallel(n_jobs=1)("
"joblib.delayed(math.sqrt)(i**2) for i in range(10))"
], env=env)
5 changes: 4 additions & 1 deletion joblib/test/test_module.py
@@ -1,14 +1,15 @@
import sys
import joblib
import pytest
from joblib.testing import check_subprocess_call
from joblib.test.common import with_multiprocessing


def test_version():
assert hasattr(joblib, '__version__'), (
"There are no __version__ argument on the joblib module")


@with_multiprocessing
def test_no_start_method_side_effect_on_import():
# check that importing joblib does not implicitly set the global
# start_method for multiprocessing.
Expand All @@ -22,6 +23,7 @@ def test_no_start_method_side_effect_on_import():
check_subprocess_call([sys.executable, '-c', code])


@with_multiprocessing
def test_no_semaphore_tracker_on_import():
# check that importing joblib does not implicitly spawn a resource tracker
# or a semaphore tracker
Expand All @@ -38,6 +40,7 @@ def test_no_semaphore_tracker_on_import():
check_subprocess_call([sys.executable, '-c', code])


@with_multiprocessing
def test_no_resource_tracker_on_import():
code = """if True:
import joblib
Expand Down
55 changes: 42 additions & 13 deletions joblib/test/test_parallel.py
Expand Up @@ -24,14 +24,17 @@
import joblib
from joblib import parallel
from joblib import dump, load
from joblib.externals.loky import get_reusable_executor

from joblib._multiprocessing_helpers import mp

from joblib.test.common import np, with_numpy
from joblib.test.common import with_multiprocessing
from joblib.testing import (parametrize, raises, check_subprocess_call,
skipif, SkipTest, warns)

from joblib.externals.loky.process_executor import TerminatedWorkerError
if mp is not None:
# Loky is not available if multiprocessing is not
from joblib.externals.loky import get_reusable_executor

from queue import Queue

Expand Down Expand Up @@ -69,7 +72,10 @@
ALL_VALID_BACKENDS = [None] + sorted(BACKENDS.keys())
# Add instances of backend classes deriving from ParallelBackendBase
ALL_VALID_BACKENDS += [BACKENDS[backend_str]() for backend_str in BACKENDS]
PROCESS_BACKENDS = ['multiprocessing', 'loky']
if mp is None:
PROCESS_BACKENDS = []
else:
PROCESS_BACKENDS = ['multiprocessing', 'loky']
PARALLEL_BACKENDS = PROCESS_BACKENDS + ['threading']

if hasattr(mp, 'get_context'):
Expand Down Expand Up @@ -569,8 +575,14 @@ def effective_n_jobs(self, n_jobs=1):


def test_invalid_backend():
with raises(ValueError):
with raises(ValueError) as excinfo:
Parallel(backend='unit-testing')
assert "Invalid backend:" in str(excinfo.value)

with raises(ValueError) as excinfo:
with parallel_backend('unit-testing'):
pass
assert "Invalid backend:" in str(excinfo.value)


@parametrize('backend', ALL_VALID_BACKENDS)
Expand Down Expand Up @@ -601,6 +613,13 @@ def test_overwrite_default_backend():
assert _active_backend_type() == DefaultBackend


@skipif(mp is not None, reason="Only without multiprocessing")
def test_backend_no_multiprocessing():
with warns(UserWarning,
match="joblib backend '.*' is not available on.*"):
Parallel(backend='loky')(delayed(square)(i) for i in range(3))


def check_backend_context_manager(backend_name):
with parallel_backend(backend_name, n_jobs=3):
active_backend, active_n_jobs = parallel.get_active_backend()
Expand Down Expand Up @@ -1208,7 +1227,10 @@ def test_memmapping_leaks(backend, tmpdir):
raise AssertionError('temporary directory of Parallel was not removed')


@parametrize('backend', [None, 'loky', 'threading'])
@parametrize('backend',
([None, 'threading'] if mp is None
else [None, 'loky', 'threading'])
)
def test_lambda_expression(backend):
# cloudpickle is used to pickle delayed callables
results = Parallel(n_jobs=2, backend=backend)(
Expand Down Expand Up @@ -1238,6 +1260,7 @@ def test_backend_batch_statistics_reset(backend):
p._backend._DEFAULT_SMOOTHED_BATCH_DURATION)


@with_multiprocessing
def test_backend_hinting_and_constraints():
for n_jobs in [1, 2, -1]:
assert type(Parallel(n_jobs=n_jobs)._backend) == LokyBackend
Expand Down Expand Up @@ -1438,7 +1461,8 @@ def _recursive_parallel(nesting_limit=None):
return Parallel()(delayed(_recursive_parallel)() for i in range(2))


@parametrize('backend', ['loky', 'threading'])
@parametrize('backend',
(['threading'] if mp is None else ['loky', 'threading']))
def test_thread_bomb_mitigation(backend):
# Test that recursive parallelism raises a recursion rather than
# saturating the operating system resources by creating a unbounded number
Expand All @@ -1447,13 +1471,18 @@ def test_thread_bomb_mitigation(backend):
with raises(BaseException) as excinfo:
_recursive_parallel()
exc = excinfo.value
if backend == "loky" and isinstance(exc, TerminatedWorkerError):
# The recursion exception can itself cause an error when pickling it to
# be send back to the parent process. In this case the worker crashes
# but the original traceback is still printed on stderr. This could be
# improved but does not seem simple to do and this is is not critical
# for users (as long as there is no process or thread bomb happening).
pytest.xfail("Loky worker crash when serializing RecursionError")
if backend == "loky":
# Local import because loky may not be importable for lack of
# multiprocessing
from joblib.externals.loky.process_executor import TerminatedWorkerError # noqa
if isinstance(exc, TerminatedWorkerError):
# The recursion exception can itself cause an error when
# pickling it to be send back to the parent process. In this
# case the worker crashes but the original traceback is still
# printed on stderr. This could be improved but does not seem
# simple to do and this is is not critical for users (as long
# as there is no process or thread bomb happening).
pytest.xfail("Loky worker crash when serializing RecursionError")
else:
assert isinstance(exc, RecursionError)

Expand Down