Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add an example of self sustaining parallel compute #1485

Open
wants to merge 25 commits into
base: main
Choose a base branch
from

Conversation

fcharras
Copy link
Contributor

@fcharras fcharras commented Jul 27, 2023

I initiated the return_as project with this particular use-case in mind: creating a compute model where the outputs of the tasks that are processed in parallel are used to feed new tasks to the compute, and so on until some stopping criterion is met. The final purpose was implementing a bayesian grid search of hyper-parameters, where the cross validated metrics for previous combination of hyper-parameters are used to choose the next area of the parameter space to explore.

It occured to me that it becomes possible using joblib if #1463 gets merged. So this PR builds on #1463 and add an example that introduces the recipe for implementing such compute model.

Please refer to the diff between this branch and the branch in #1463 to access the net diff that is actually added by this PR at fcharras/joblib@fea/generator_unordered...fcharras:joblib:example/self_sustaining_parallel

(NB: joblib CI pipelines are currently down, it seems there's an issue with dependencies during the setup instructions ~ not related with this PR)

@codecov
Copy link

codecov bot commented Jul 27, 2023

Codecov Report

Patch coverage: 100.00% and project coverage change: -0.30% ⚠️

Comparison is base (83f9169) 94.96% compared to head (5694f23) 94.66%.
Report is 7 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1485      +/-   ##
==========================================
- Coverage   94.96%   94.66%   -0.30%     
==========================================
  Files          45       45              
  Lines        7474     7523      +49     
==========================================
+ Hits         7098     7122      +24     
- Misses        376      401      +25     
Files Changed Coverage Δ
joblib/parallel.py 95.17% <100.00%> (-1.73%) ⬇️
joblib/test/test_parallel.py 96.11% <100.00%> (-0.12%) ⬇️

... and 4 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

# default hence the use of the lock
with self._lock:
batched_results = self._jobs.popleft()
batched_results = self._jobs.popleft()
Copy link
Contributor Author

@fcharras fcharras Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that this lock was never needed since deque pop and append methods are thread-safe, but we didn't try to remove it yet. Yet it must be removed in order to make the example work, since we need threads to be able to concurrently retrieve results and submit new tasks, to enable a feedback loop.

This example introduces a recipe for implementing a feedback loop where the
output generator of joblib.Parallel is used to fuel the input generator with
new tasks, based on the value that are returned by the tasks that have been
already completed, until some stopping criterion is met.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in this example the stopping criterion must be implemented within the function that generate new tasks from results, and the first time it is triggered by a task, it will stop the generation of new tasks for the current callback threads and all future callback threads, so the remaining compute left will only consist in completing pending tasks and retrieving the outputs. It is probably enough to cover most usecases I can think of.

It could be extended to an implementation where callbacks can arbitrarily choose to not create a new task, independently from other callbacks, or create several tasks, thus resulting in a flow of task of variable intensity during the compute, but implementing such agency either requires deeper changes to joblib.Parallel (at least adding one additional lock and decouple output retrieval and task submission steps), either hacky workarounds (submitting dummy tasks for the sole purpose of breaking the locks).

_output_generator = self.parallel(input_generator)

# ... start retrieving outputs...
for output in _output_generator:
Copy link
Contributor Author

@fcharras fcharras Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterating on the outputs must be done in the calling thread, to benefit from joblib.Parallel safe and elegant termination logic and error reporting. From there the new tasks can be sent to the intput generator using the task queue.

Implementing it the other way around (iterating on the outputs in the input generator and sending through the queue from the input generator to the calling thread) can work too but it requires tedious error management code and there's no particular gain I can think of.

Copy link
Member

@GaelVaroquaux GaelVaroquaux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very very cool.

My view is that we should provide a first-class citizen class in the joblib API to do the Sel-Sustaining / Self-loop mechanism. @tomMoral : what do you think?

@@ -0,0 +1,205 @@
"""
===========================================================
Running a self-sustained flow of parallel tasks with joblib
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not guess what the example is about based on this title. How about:

Suggested change
Running a self-sustained flow of parallel tasks with joblib
Adding dynamically new tasks to a parallel iteration

===========================================================

This example introduces a recipe for implementing a feedback loop where the
output generator of joblib.Parallel is used to fuel the input generator with
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
output generator of joblib.Parallel is used to fuel the input generator with
output generator of :class:`joblib.Parallel` is used to fuel the input generator with

from queue import SimpleQueue


class SelfSustainingParallelWork:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fairly non trivial code. It makes me wonder if we do not want to add a fully-supported API (probably with a dedicated class) for this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have applications in mind for this feature that could weight in favor of having full-support of it in joblib ? would the application to a more complicated grid search be actually valuable compared to what GridSearchCV / RandomSearchCV already offer in scikit-learn ?

from queue import SimpleQueue


class SelfSustainingParallelWork:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be called

Suggested change
class SelfSustainingParallelWork:
class ParallelSelfLoop:

?

I'm trying to find a name that makes this functionality as explicit as possible (not sure that "SelfLoop" is best thought, I'm really in search of ideas)

# NB: in our example, the number of tasks done can change between runs since
# it depends on concurrency between workers until one of them meets the
# stopping criterion first.
print(f"{callback.get_count()} tasks done.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we implement verbosity in the class to avoid having to add a callback in the example and thus making it simpler (I always try to make examples as simple as possible)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that the usability of the class can be refined. Other points to note with this first draft:

  • the callback feature is actually not required since the user could directly add callbacks inside the `task_from_output_fn

  • if anything the callback should be moved inside the try/except block in __call__, to avoid a bug where the backend would deadlock if an exception is raised in the callback.

# ...create a new task based on the outputs...
try:
new_task = task_from_output_fn(output)
except BaseException:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make this a first-class class of joblib, maybe we should support stopping the iteration via a "raise StopIteration" and thus capture it here. It is a somewhat canonical way of stopping an iteration (though I suspect that many do not know it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stopping criterion is actually expected to be triggered by returning None rather than a joblib.delayed-like task in task_from_output_fn, so the current purpose from this try/except block is not to allow the user to stop the compute by raising an exception. The purpose is to avoid a bug that deadlocks the backend because the input iterator would not be closed if an exception is raised in __call__.

Two follow-up point from this:

  • maybe stopping the compute by returning None in task_from_output_fn is not so good design 💭
  • the whole loop should be wrapped in this try/except block to also catch KeyboardInterrupt.

Like in a previous conversation, let's iterate to refine the api.


class SelfSustainingParallelWork:

def __init__(self, parallel_n_jobs, parallel_batch_size=1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not call the argument "n_jobs", rather than "parallel_n_jobs", to make it as easy as possible to move from Parallel to this object.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants