Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No simple way to pass initializer for process #381

Open
cjw296 opened this issue Jul 28, 2016 · 12 comments · May be fixed by #1525
Open

No simple way to pass initializer for process #381

cjw296 opened this issue Jul 28, 2016 · 12 comments · May be fixed by #1525

Comments

@cjw296
Copy link

cjw296 commented Jul 28, 2016

multiprocessing.Pool has a handy initializer parameter to pass a callable for setting up per-process resources (database connections, loggers, etc) but joblib doesn't expose a way to pass this.

I see in 0.10 I can pass a custom multiprocessing context, which I hope I can use to achieve this, but per-process setup is likely something that many users will likely need, so would be good if there was an easier way.

(I'd suggest an initializer parameter to Parallel that's picked up by MultiprocessingBackend)

@cjw296
Copy link
Author

cjw296 commented Jul 28, 2016

...and sadly, context is only available on Python 3.4+, the project I'm working on is Python 2.7 :-(

@cjw296
Copy link
Author

cjw296 commented Jul 28, 2016

Hmm, actually, not even sure passing a context will let you do per-process initialisation...

@astromancer
Copy link

astromancer commented Dec 7, 2017

Custom initializers for processes is a must have feature!

@gdlmx
Copy link

gdlmx commented Apr 8, 2019

For who is searching for a dirty workaround, I wrote a simple function to inject additional initializer to Parallel instance with a Loky backend.

def with_initializer(self, f_init):
    hasattr(self._backend, '_workers') or self.__enter__()
    origin_init = self._backend._workers._initializer
    def new_init():
        origin_init()
        f_init()
    self._backend._workers._initializer = new_init if callable(origin_init) else f_init
    return self

Example usage:

import matlab
from joblib import Parallel, delayed

x = matlab.double([[0.0]]) # this object can only be loaded after importing matlab

def f(i):
    print(i, x)

def _init_matlab():
    import matlab

with Parallel(4) as para:
    for _ in with_initializer(para, _init_matlab)(delayed(f)(i) for i in range(10)):
        pass

Data objects of some complex libraries such as matlab can only be loaded after importing the python module. An initializer seems to be the only way to guarantee to load a third party module before the child processes try to unpickle those global data objects.

@cwindolf
Copy link

cwindolf commented Apr 8, 2019

@shwina
Copy link

shwina commented Nov 14, 2023

Hi -- is this is possible today? If not, would a PR implementing the following suggestion be welcome?

(I'd suggest an initializer parameter to Parallel that's picked up by MultiprocessingBackend)

Alternatively, I imagine it's possible to expose the parameter in joblib.parallel_config?

@jakirkham
Copy link

What do you think @ogrisel ? 🙂

@astromancer
Copy link

astromancer commented Nov 15, 2023

I've been using this recipe, adapted from the comment above:

Code

from joblib._parallel_backends import MultiprocessingBackend, SequentialBackend


def noop(*_, **__):
    """Do nothing."""


def initialized(self, initializer=noop, args=(), **kws):
    """
    Custom per-process process initialization for `joblib.Parallel`.

    Parameters
    ----------
    initializer : callable, optional
        Your process initializer, by default noop, which does nothing.
    args : tuple, optional
        Parameters for your initializer, by default ()

    Returns
    -------
    joblib.Parallel
    """
    if isinstance(self._backend, SequentialBackend):
        return self

    if isinstance(self._backend, MultiprocessingBackend):
        self._backend_args.update(initializer=initializer, initargs=args)
        return self

    if not hasattr(self._backend, '_workers'):
        self.__enter__()

    workers = self._backend._workers
    original_init = workers._initializer

    def new_init():
        if callable(original_init):
            original_init()

        initializer(*args, **kws)

    workers._initializer = new_init

    return self

Usage

import contextlib as ctx
import multiprocessing as mp
from joblib.parallel import Parallel, delayed


class ContextStack(ctx.ExitStack):
    """Manage nested contexts."""

    def __init__(self, contexts=()):
        super().__init__()
        self.contexts = list(contexts)

    def __enter__(self):
        return next(filter(None, map(self.enter_context, self.contexts)), None)

    def add(self, context):
        # assert isinstance(context, ctx.AbstractContextManager)
        self.contexts.append(context)


# ---------------------------------------------------------------------------- #
# main
memory_lock = mp.Lock()


def set_lock(lock):
    # Initialize each process with a global variable lock.
    print('process setup')
    global memory_lock
    memory_lock = lock


def work(*args, **kws):
    print('doing work:', args, kws)


def get_workload():
    yield from range(10)


njobs = 10
worker = delayed(work)
context = ContextStack()
executor = Parallel(njobs, verbose=10)
context.add(initialized(executor, set_lock, (memory_lock, )))
with context as compute:
    compute(worker(data) for data in get_workload())
[Parallel(n_jobs=10)]: Using backend LokyBackend with 10 concurrent workers.
process setup
doing work: (0,) {}
doing work: (1,) {}
doing work: (2,) {}
[Parallel(n_jobs=10)]: Done   3 out of  10 | elapsed:    2.6s remaining:    6.0s
doing work: (3,) {}
doing work: (4,) {}
[Parallel(n_jobs=10)]: Done   5 out of  10 | elapsed:    2.6s remaining:    2.6s
doing work: (5,) {}
doing work: (6,) {}
[Parallel(n_jobs=10)]: Done   7 out of  10 | elapsed:    2.6s remaining:    1.1s
doing work: (7,) {}
process setup
doing work: (8,) {}
doing work: (9,) {}
[Parallel(n_jobs=10)]: Done  10 out of  10 | elapsed:    2.6s finished
process setup
process setup
process setup
process setup
process setup
process setup
process setup
process setup

@shwina
Copy link

shwina commented Nov 15, 2023

Thank you @astromancer , that's slick!

I'm hoping to find a solution for when I don't have access to the Parallel() instance. This is the case when, say you're not using joblib directly, but rather using a library that itself uses joblib.

I suppose monkeypatching Parallel is a possibility but as others have said, this would be great to have as part of the "official" joblib API (parallel_config).

@ogrisel
Copy link
Contributor

ogrisel commented Nov 15, 2023

If someone want to work on a PR with at least support for the most common backends (e.g. loky, multiprocessing and maybe dask), I might find the time to review it.

@shwina
Copy link

shwina commented Nov 15, 2023

Thanks! I'll work on that.

@ogrisel
Copy link
Contributor

ogrisel commented Nov 15, 2023

Note however, that contrary to the multiprocessing backend, the loky and dask backends can reuse workers across consecutive calls to different Parallel instances. So the worker initialization semantics and the documentation should be aware of that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants