Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSoC] Parallelisation of AnalysisBase with multiprocessing and dask #4162

Open
wants to merge 267 commits into
base: develop
Choose a base branch
from

Conversation

marinegor
Copy link
Contributor

@marinegor marinegor commented Jun 5, 2023

Fixes #4158
Also fixes # 4259 as a check in AnalysisBase.run() no it doesn't, I'll do another PR that does that.

Related to #4158, does not help f-i-x-i-n-g (cudos to github bot) the issue per se but paves the way towards that.

Changes made in this Pull Request:

  • several methods added analysis.base.AnalysisBase, implementing backend configuration, splitting of the frames for analysis, computation, and results aggregation
  • module analysis.backends introduces BackendBase class, as well as built-in backends BackendMultiprocessing, BackendSerial and BackendDask, implementing the apply method for computations using various backends
  • module analysis.results introduces ResultsGroup class that allows for merging of multiple uniform Results objects from the same module, given appropriate aggregation functions

PR Checklist

  • Tests?
  • Docs?
  • CHANGELOG updated?
  • Issue raised/referenced?

📚 Documentation preview 📚: https://mdanalysis--4162.org.readthedocs.build/en/4162/

@github-actions
Copy link

github-actions bot commented Jun 5, 2023

Linter Bot Results:

Hi @marinegor! Thanks for making this PR. We linted your code and found the following:

Some issues were found with the formatting of your code.

Code Location Outcome
main package ⚠️ Possible failure
testsuite ⚠️ Possible failure

Please have a look at the darker-main-code and darker-test-code steps here for more details: https://github.com/MDAnalysis/mdanalysis/actions/runs/9196760083/job/25295601960


Please note: The black linter is purely informational, you can safely ignore these outcomes if there are no flake8 failures!

@codecov
Copy link

codecov bot commented Jun 5, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 93.23%. Comparing base (5fa2e57) to head (d63a38b).
Report is 24 commits behind head on develop.

Additional details and impacted files
@@             Coverage Diff             @@
##           develop    #4162      +/-   ##
===========================================
- Coverage    93.38%   93.23%   -0.15%     
===========================================
  Files          171       12     -159     
  Lines        21744     1079   -20665     
  Branches      4014        0    -4014     
===========================================
- Hits         20305     1006   -19299     
+ Misses         952       73     -879     
+ Partials       487        0     -487     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@RMeli RMeli self-requested a review June 5, 2023 19:34
@RMeli
Copy link
Member

RMeli commented Jun 5, 2023

Great to see things are moving @marinegor!

I'll have a proper look at this PR when back from holidays, but I just wanted to point out that GitHub is not very smart, so "Does not fix #4162" will actually automatically close #4162 when this is merged. "Related to #4162" or something similar (with no "fix" before the PR number) would avoid this issue.

Copy link
Member

@orbeckst orbeckst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good start. I left a few comments on the code.

package/MDAnalysis/analysis/base.py Outdated Show resolved Hide resolved
package/MDAnalysis/analysis/base.py Outdated Show resolved Hide resolved
package/MDAnalysis/analysis/base.py Outdated Show resolved Hide resolved
package/MDAnalysis/analysis/base.py Outdated Show resolved Hide resolved
@yuxuanzhuang yuxuanzhuang self-requested a review June 6, 2023 07:15
@orbeckst
Copy link
Member

orbeckst commented Jun 8, 2023

@yuxuanzhuang any initial comments so that @marinegor can move forward?

package/MDAnalysis/analysis/base.py Outdated Show resolved Hide resolved
package/MDAnalysis/analysis/base.py Outdated Show resolved Hide resolved
package/MDAnalysis/analysis/base.py Outdated Show resolved Hide resolved
package/MDAnalysis/analysis/base.py Outdated Show resolved Hide resolved
package/MDAnalysis/analysis/base.py Outdated Show resolved Hide resolved
package/MDAnalysis/analysis/base.py Show resolved Hide resolved
package/MDAnalysis/analysis/base.py Outdated Show resolved Hide resolved
Copy link
Contributor Author

@marinegor marinegor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm done with my replies!

@orbeckst
Copy link
Member

I put in my calendar for early next week "review dask PR".

@orbeckst
Copy link
Member

orbeckst commented May 3, 2024

I installed a Python 3.11 conda env (macOS) with this branch and I am trying out the example from the docs https://mdanalysis--4162.org.readthedocs.build/en/4162/documentation_pages/analysis_modules.html

import multiprocessing
import MDAnalysis as mda
from MDAnalysisTests.datafiles import PSF, DCD
from MDAnalysis.analysis.rms import RMSD
from MDAnalysis.analysis.align import AverageStructure

# initialize the universe
u = mda.Universe(PSF, DCD)

# calculate average structure for reference
avg = AverageStructure(mobile=u).run()
ref = avg.results.universe

# initialize RMSD run
rmsd = RMSD(u, ref, select='backbone')
rmsd.run(backend='multiprocessing', n_workers=multiprocessing.cpu_count())

I am running in a Jupyter lab notebook. When I imported MDAnalysis, I got the warning

[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/tqdm/auto.py:21](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/tqdm/auto.py#line=20): TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm

The RMSD calculation succeeds and I can plot the rmsd.results.

However, I get the following messages:

When I ran the first time:

Exception ignored in: <module 'threading' from '[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py)'>
Exception ignored in: <module 'threading' from '[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py)'>
Exception ignored in: <module 'threading' from '[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py)'>
Exception ignored in: <module 'threading' from '[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py)'>
Exception ignored in atexit callback: <bound method TMonitor.exit of <TMonitor(Thread-1, stopped daemon 123145468305408)>>
Exception ignored in atexit callback: <function exit_cacert_ctx at 0x105828f40>
Traceback (most recent call last):
Traceback (most recent call last):
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py", line 1541](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py#line=1540), in _shutdown
Traceback (most recent call last):
Traceback (most recent call last):
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/certifi/core.py", line 10](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/certifi/core.py#line=9), in exit_cacert_ctx
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/tqdm/_monitor.py", line 44](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/tqdm/_monitor.py#line=43), in exit
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py", line 1541](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py#line=1540), in _shutdown
Traceback (most recent call last):
Traceback (most recent call last):
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py", line 1541](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py#line=1540), in _shutdown
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py", line 1541](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py#line=1540), in _shutdown
    self.join()
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py", line 1119](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py#line=1118), in join
    def exit_cacert_ctx() -> None:
    
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py", line 25](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py#line=24), in <lambda>
    self._wait_for_tstate_lock()
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py", line 1139](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/threading.py#line=1138), in _wait_for_tstate_lock
    signal.signal(signal.SIGTERM, lambda n, f: sys.exit(1))
                                               ^^^^^^^^^^^
SystemExit: 1
    if lock.acquire(block, timeout):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py", line 25](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py#line=24), in <lambda>
    def _shutdown():
    def _shutdown():
    
    
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py", line 25](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py#line=24), in <lambda>
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py", line 25](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py#line=24), in <lambda>
    def _shutdown():
    
    def _shutdown():
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py", line 25](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py#line=24), in <lambda>
    
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py", line 25](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py#line=24), in <lambda>
    signal.signal(signal.SIGTERM, lambda n, f: sys.exit(1))
    signal.signal(signal.SIGTERM, lambda n, f: sys.exit(1))
                                               ^^^^^^^^^^^
SystemExit: 1
    signal.signal(signal.SIGTERM, lambda n, f: sys.exit(1))
    signal.signal(signal.SIGTERM, lambda n, f: sys.exit(1))
                                               ^^^^^^^^^^^
                                               ^^^^^^^^^^^
SystemExit: 1
SystemExit: 1
    signal.signal(signal.SIGTERM, lambda n, f: sys.exit(1))
                                               ^^^^^^^^^^^
SystemExit: 1
                                               ^^^^^^^^^^^
SystemExit: 1

When I ran a second time the same command

rmsd.run(backend='multiprocessing', n_workers=multiprocessing.cpu_count())

no such messages appeared.

When I run with a different number of workers

rmsd.run(backend='multiprocessing', n_workers=2)

the following shorter messages appear:

Exception ignored in atexit callback: <function shutdown at 0x1049751c0>
Traceback (most recent call last):
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/logging/__init__.py", line 2181](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/logging/__init__.py#line=2180), in shutdown
    h = wr()
        ^^^^
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py", line 25](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py#line=24), in <lambda>
    signal.signal(signal.SIGTERM, lambda n, f: sys.exit(1))
                                               ^^^^^^^^^^^
SystemExit: 1

Run the same again:

Exception ignored in atexit callback: <function exit_cacert_ctx at 0x1149ccf40>
Traceback (most recent call last):
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/certifi/core.py", line 10](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/certifi/core.py#line=9), in exit_cacert_ctx
Exception ignored in atexit callback: <function exit_cacert_ctx at 0x11357cf40>
Traceback (most recent call last):
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/certifi/core.py", line 10](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/certifi/core.py#line=9), in exit_cacert_ctx
    def exit_cacert_ctx() -> None:
    def exit_cacert_ctx() -> None:
    
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py", line 25](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py#line=24), in <lambda>
    
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py", line 25](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py#line=24), in <lambda>
    signal.signal(signal.SIGTERM, lambda n, f: sys.exit(1))
                                               ^^^^^^^^^^^
SystemExit: 1
    signal.signal(signal.SIGTERM, lambda n, f: sys.exit(1))
                                               ^^^^^^^^^^^
SystemExit: 1

Run again

Exception ignored in atexit callback: <function exit_cacert_ctx at 0x10e658f40>
Traceback (most recent call last):
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/certifi/core.py", line 10](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/certifi/core.py#line=9), in exit_cacert_ctx
    def exit_cacert_ctx() -> None:
    
  File "[/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py", line 25](http://localhost:8888/Users/oliver/anaconda3/envs/mda311dask/lib/python3.11/site-packages/gsd/__init__.py#line=24), in <lambda>
    signal.signal(signal.SIGTERM, lambda n, f: sys.exit(1))
                                               ^^^^^^^^^^^
SystemExit: 1

I am fairly confused what this is about. Can someone else reproduce?

But I know that I don't want our users to see these kind of messages.

Note that running in a script

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# https://github.com/MDAnalysis/mdanalysis/pull/4162

import multiprocessing
import MDAnalysis as mda
from MDAnalysisTests.datafiles import PSF, DCD
from MDAnalysis.analysis.rms import RMSD
from MDAnalysis.analysis.align import AverageStructure

if __name__ == "__main__":
    # must all be in __main__, otherwise multiprocessing loops indefinitely

    # initialize the universe
    u = mda.Universe(PSF, DCD)

    # calculate average structure for reference
    avg = AverageStructure(mobile=u).run()
    ref = avg.results.universe

    # initialize RMSD run
    rmsd = RMSD(u, ref, select='backbone')
    rmsd.run(backend='multiprocessing', n_workers=multiprocessing.cpu_count())

does not produce the above messages, so it may be specific for jupyter lab.

@yuxuanzhuang
Copy link
Contributor

I am fairly confused what this is about. Can someone else reproduce?

I can reproduce these messages with my Mac (but not in Linux by default) in jupyter. This issue is associated with macOS using spawn as the default process start method for multiprocessing (https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods). If I explicitly set multiprocessing.set_start_method('fork'), it runs fine. Similarly, if I explicitly set multiprocessing.set_start_method('spawn') in Linux, the messages appear.

The part that doesn't play well with multiprocessing in the Jupyter Notebook environment is the ProgressBar (tqdm). We may need to find a way to display the progress bar successfully---probably not in this PR imho.

@orbeckst
Copy link
Member

orbeckst commented May 4, 2024

Good detective work, @yuxuanzhuang !

@marinegor mentioned that progress bar is not available in parallel analysis, so I didn't expect the bar to appear. (And if I try to get the progress bar with verbose=True, I get a ValueError ValueError: Can not display progressbar with non-serial backend, which is fine.) However, getting these weird messages when we are just running the parallel analysis is not good. We will get a ton of questions from users who run their code in jupyter and will then be worried about what's going on.

@orbeckst
Copy link
Member

orbeckst commented May 4, 2024

On the note of progress bars: Why do I see multiple progress bars when I run with the serial backend but accidentally set n_workers=3 as in the screen shot below?

image

It's fine that I get the warning about n_workers but I should still only see a single ProgressBar, @marinegor .

Copy link
Member

@orbeckst orbeckst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am test-driving the PR locally and check what a user would do. Apart from a simple doc fix, so far I found the two issues that I commented on, both inside Jupyter Lab (which is commonly used):

  1. highly confusing messages that @yuxuanzhuang tracked down to how multiprocessing spawns on macOS (see [GSoC] Parallelisation of AnalysisBase with multiprocessing and dask #4162 (comment)). No error/warning messages should be issued unless the code that is being run fails.
  2. Multiple progress bars are shown with the serial backend when n_workers is set (see [GSoC] Parallelisation of AnalysisBase with multiprocessing and dask #4162 (comment)). There should be only one progress bar in serial.

If I find other things I will add them here.

often trivially parallelizable, meaning you can analyze all frames
independently, and then merge them in a single object. This approach is also
known as "split-apply-combine", and isn't new to MDAnalysis users, since it was
first introduced in [pmda](https://github.com/mdanalysis/pmda). Version 2.8.0 of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix link as inline reST link

Suggested change
first introduced in [pmda](https://github.com/mdanalysis/pmda). Version 2.8.0 of
first introduced in `PMDA <https://github.com/mdanalysis/pmda>`_. Version 2.8.0 of

Comment on lines 285 to 286
@classmethod
def _is_parallelizable(cls):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a method? Can't we make it a managed attribute ... or just a class attribute?

I find it confusing when is_XXX has a call signature.

Furthermore, in transformations.TransformationBase we have the attribute parallelizable

self.parallelizable = kwargs.pop('parallelizable', True)
— I would prefer having a common API, i.e., turn _is_parallelizable into the attribute parallelizable. It should not be a class attribute of the base class to avoid someone accidentally changing it in a derived class without having overriden it first (which would ultimately mess up the parent class) — or make it a managed attribute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, it's a valid concern.

It first was a @property @classmethod, but:

Class properties are deprecated in Python 3.11 and will not be supported in Python 3.13 // Pylance

So I changed it to be a @classmethod only. The (initial) reason why it's not an attribute is that there's no such thing as @classattribute, and I wanted parallelizable to be a readonly property of a class, and not of its instance, since we don't always know the parameters of the class __init__ to check if it's parallelizable.

As for now, I see two possible solutions to it (see also a gist):

  • have __is_parallelizable attribute of all AnalysisBase subclasses, and a @property that returns it. The downside is that someone could change it to be True, and everything will break -- but I guess that's what you get when you change double-underscore parameter which isn't supposed to be changed.
  • rename _is_parallelizable to something like get_parallelizability() and leave it as @classmethod, hence maintaining the semantics that you prefer, i.e. a verb-containing method is callable.

Also, we wanted to have a default AnalysisBase attribute/property/method/whatever to make parallelization opt-in, so I can't see a way to make it instantiated in __init__.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just not mess around with double-underscore attributes. If we really wanted to we could un-do the dunder-name mangling but given that Python just does not allow us to really lock away anything, just use convention and if users want to mess up things then they are allowed to. I suggest we use properties with simple class attributes in the following way:

class AnalysisBase:
    # class authors: override _analysis_algorithm_is_parallelizable 
    # in derived classes and only set to True if you have confirmed 
    # that your algorithm works reliably when parallelized with 
    # the split-apply-combine approach (see docs)
    _analysis_algorithm_is_parallelizable = False

    def __init__(self, traj, **kwargs):
        ...

    @property
    def parallelizable(self):
        """Read-only attribute that indicates if this analysis can be
        performed in parallel.

        .. warning:: 
           The author of the code has to determine if the algorithm
           can be run in parallel. If users change this setting in any 
           way then **wrong results** will likely be produced.
        """
        return self._analysis_algorithm_is_parallelizable

and then a parallelizable derived class can just use

class MyParallelAnalysis(AnalysisBase):
        _analysis_algorithm_is_parallelizable = True

Enterprising users will be able to change _analysis_algorithm_is_parallelizable but we tell them they shouldn't so that becomes their problem, not ours.

Using an un-documented single underscore attr with a long name should also make clear that this is not something that users ought to touch.

At the end of the day, all of Python APIs are just conventions so I am ok with doing this by convention.

- add reST/sphinx markup for methods and classes and ensure that (most of them)
  resolve; add intersphinx mapping to dask docs
- added cross referencing between parallelization and backends docs
- restructured analysis landing page with additional numbered headings for
  general use and parallelization
- add citation for PMDA
- fixed links
- edited text for flow and readability
- added SeeAlsos (eg for User Guide)
- added notes/warnings
Comment on lines 270 to 272
@classmethod
def is_parallelizable(self):
return True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Propose to make this an attribute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See discussion above

results = dask.compute(computations,
scheduler="processes",
chunksize=1,
n_workers=self.n_workers)[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks.

@@ -186,95 +186,99 @@ def correct_values_backbone_group(self):
return [[0, 1, 0, 0, 0],
[49, 50, 4.6997, 1.9154, 2.7139]]

def test_rmsd(self, universe, correct_values):
def test_rmsd(self, universe, correct_values, client_RMSD):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the client_RMSD fixture is used for the first time, it's a bit mysterious where it comes from and what it does because it's not defined in the same file.

Add a comment explaining where client_RMSD is defined and what it does.

def test_parallelizable_transformations():
from MDAnalysis.transformations import NoJump
u = mda.Universe(XTC)
u.trajectory.add_transformations(NoJump(parallelizable=True))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what the test is doing.

It does not really make sense to set parallelizable=True for NoJump because it is inherently not parallelizable.

And even if parallelizable=True then why does the parallel execution below fail with ValueError?

Copy link
Contributor Author

@marinegor marinegor May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not really make sense to set parallelizable=True for NoJump because it is inherently not parallelizable.

I just wanted to have any transformation that would allow me to set this attribute. Should I pick another one? Though I changed it to parallelizable=False, so I think it's not necessary.

And even if parallelizable=True then why does the parallel execution below fail with ValueError?

You're right, it should be that a) NoJump(parallelizable=False) b) serial works and c) serial fails. I changed the code of AnalysisBase and the test.

({'stop': 30}, np.arange(30)),
({'step': 10}, np.arange(0, 98, 10))
])
def test_start_stop_step_parallel(u, run_kwargs, frames, client_FrameAnalysis):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment describing where client_FrameAnalysis is defined and what it does.

Global fixture definitions in pytest seem necessary but they make for very unreadable code — too much magic going on. Thus, comments are necessary.

@@ -332,15 +396,15 @@ def test_results_type(u):
(20, 50, 2, 15),
(20, 50, None, 30)
])
def test_AnalysisFromFunction(u, start, stop, step, nframes):
def test_AnalysisFromFunction(u, start, stop, step, nframes, client_AnalysisFromFunction):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment about location of definition for client_AnalysisFromFunction and purpose.

Once there are a few of these comments, someone new to the tests will pick up the pattern.

- mark analysis docs as documenting MDAnalysis.analysis so that references resolve
  properly
- link fixes
Comment on lines 370 to 372
@classmethod
def _is_parallelizable(cls):
return True
Copy link
Member

@orbeckst orbeckst May 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current documentation https://mdanalysis--4162.org.readthedocs.build/en/4162/documentation_pages/analysis/parallelization.html#adding-parallelization-to-your-own-analysis-class states that the method to change is called is_parallelizable() (no leading underscore):

    @classmethod
    def is_parallelizable(self):
        return True

Docs and code need to be consistent.

(In any case, as said in other comments, I'd prefer an attribute parallelizable to be consistent with the transformations.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed; also see discussion above.

@@ -2560,3 +2561,14 @@ def no_copy_shim():
else:
copy = False
return copy


def is_installed(modulename: str):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this a new function, add an explicit test for it.

I think, at the moment we get coverage for it because it is used in code but that's not a test of the function itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test.

@orbeckst
Copy link
Member

orbeckst commented May 5, 2024

I edited the docs directly instead of making a bunch of comments.

In addition to my review, I added additional comments. Please have a look.

My only major API question is how to mark an analysis as parallelizable. In the current PR we use a class function _is_parallelizable() (although the docs mis-state somewhere as is_parallelizable()). On the other hand in transformations we set an attribute parallelizable. It would be better if we have a common API choice between the two. I'd prefer that parallel analysis classes also use the attribute parallelizable instead of a class function. It should be possible to make it a managed attribute or protect it in other ways.

Copy link
Contributor Author

@marinegor marinegor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments regarding _is_parallelizable (and fixed documentation), fixed tests for is_installed.

results = dask.compute(computations,
scheduler="processes",
chunksize=1,
n_workers=self.n_workers)[0]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks.

"""
for check, msg in self._get_checks().items():
if not check:
raise ValueError(msg)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, so someone could use this function to know how they should configure their backend?

Yes, more or less -- or try to configure it few times without explicitly catching an exception.

I think the usual way to do this would be with docs and tutorials showing to configure the backend

I don't mind changing it, but still think that this way is slightly more flexible to the potential developer.

Comment on lines 285 to 286
@classmethod
def _is_parallelizable(cls):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, it's a valid concern.

It first was a @property @classmethod, but:

Class properties are deprecated in Python 3.11 and will not be supported in Python 3.13 // Pylance

So I changed it to be a @classmethod only. The (initial) reason why it's not an attribute is that there's no such thing as @classattribute, and I wanted parallelizable to be a readonly property of a class, and not of its instance, since we don't always know the parameters of the class __init__ to check if it's parallelizable.

As for now, I see two possible solutions to it (see also a gist):

  • have __is_parallelizable attribute of all AnalysisBase subclasses, and a @property that returns it. The downside is that someone could change it to be True, and everything will break -- but I guess that's what you get when you change double-underscore parameter which isn't supposed to be changed.
  • rename _is_parallelizable to something like get_parallelizability() and leave it as @classmethod, hence maintaining the semantics that you prefer, i.e. a verb-containing method is callable.

Also, we wanted to have a default AnalysisBase attribute/property/method/whatever to make parallelization opt-in, so I can't see a way to make it instantiated in __init__.

Comment on lines 370 to 372
@classmethod
def _is_parallelizable(cls):
return True
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed; also see discussion above.

@@ -2560,3 +2561,14 @@ def no_copy_shim():
else:
copy = False
return copy


def is_installed(modulename: str):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test.

Comment on lines 270 to 272
@classmethod
def is_parallelizable(self):
return True
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See discussion above

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Introducing dask-based parallel backend for the AnalysisBase.run()
9 participants