Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask in pyodide #9053

Merged
merged 14 commits into from Jun 20, 2022
Merged

Dask in pyodide #9053

merged 14 commits into from Jun 20, 2022

Conversation

ian-r-rose
Copy link
Collaborator

@ian-r-rose ian-r-rose commented May 6, 2022

This is a small step towards #7764 and dask/distributed#6257. It's basically just defensively importing threading and multiprocessing and defaulting to the synchronous scheduler if those fail. So I think this is currently mostly be useful for demos and training around the dask collections API. But it does work.

This is distinct from actually getting a distributed.Client working and talking to a remote cluster, which will require some actual networking work.

  • Tests added / passed
  • Passes pre-commit run --all-files

dask/bag/core.py Outdated Show resolved Hide resolved
@ian-r-rose ian-r-rose marked this pull request as draft May 6, 2022 21:15
@jakirkham
Copy link
Member

cc @rth (who may find this of interest 🙂)

setup.py Outdated Show resolved Hide resolved
dask/dataframe/io/hdf.py Outdated Show resolved Hide resolved
@ian-r-rose
Copy link
Collaborator Author

Thanks for taking a look @jakirkham !

@rth
Copy link
Contributor

rth commented May 10, 2022

defaulting to the synchronous scheduler

Yes, pragmatically this seems to be a reasonable first step to get Dask running in Pyodide.

It's basically just defensively importing threading and multiprocessing and defaulting to the synchronous scheduler if those fail.

Note that currently importing threading and multiprocessing would still work in Pyodide. Even part of the API works (e.g using locks). What would fail is actually starting a thread or process (or importing private modules). Or at least, this has been the idea in Pyodide so far, to maximize the number of packages that could be imported without error if they use e.g. multiprocessing somewhere. The reason you are able to catch import errors currently is because some of the deeper APIs would indeed raise an ImportError (and maybe we are not fully consistent about what raises an exception and what doesn't).

in Pyodide repl

>>> import threading
>>> import multiprocessing
>>> threading.current_thread()
<_MainThread(MainThread, started 3038148)>
>>> threading.Lock()
<unlocked _thread.lock object at 0xc3cc40>
>>> import concurrent.futures
>>> import multiprocessing.pool
Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/lib/python3.10/multiprocessing/pool.py", line 30, in <module>
    from .connection import wait
  File "/lib/python3.10/multiprocessing/connection.py", line 21, in <module>
    import _multiprocessing
ModuleNotFoundError: No module named '_multiprocessing'

So there might be more reliable/standard ways of detecting that threading/multiprocessing is not available, and if you have feedback on how to improve the current situation, we would be interested in hearing it.

@ian-r-rose
Copy link
Collaborator Author

Thanks for weighing in @rth!

ModuleNotFoundError: No module named '_multiprocessing'

This is exactly what I was seeing (multiprocessing.pool is imported in a number of places). The import error is, as you suggest, not necessarily the most reliable check. Do you have any ideas about what a more stable check might be? It would be a shame to have to try to start a dummy thread/process upon import.

@ian-r-rose
Copy link
Collaborator Author

I suppose what I'm after (per the FAQs )is to check platform.system(). So I think we want the defensive imports regardless, but can add an additional check for platform.system() == "Emscripten".

@jakirkham
Copy link
Member

Would that platform tag change if/when the ecosystem moves to WASM?

Seems like we would want this check in as few places as possible to keep updating/fixing when needed simple

dask/array/core.py Outdated Show resolved Hide resolved
@rth
Copy link
Contributor

rth commented May 12, 2022

Would that platform tag change if/when the ecosystem moves to WASM?

I opened https://discuss.python.org/t/expected-behavior-for-unsupported-stdlib-modules-in-the-browser/ to discuss it. It should be OK. As Christian mentioned, the best way to detect a platform is to use,

import sys

if sys.platform == 'emscripten':
   ...

This works in Pyodide now and will work in the future. But then some Emscripten builds for Node.js do have threading enabled https://pythondev.readthedocs.io/wasm.html#targets (and Pyodide might also enable it in the future) so that's also not very reliable.

So to detect,

  • for multiprocessing maybe catching the ImportError on _multiprocessing (or multiprocessing.pool indirectly) would be OK. That's what was done in No multiprocessing joblib/joblib#1256 for instance
  • for threading, after quickly looking I can't find anything threading related that would raise an ImportError in pyodide (_thread also imports fine). But then if you say the current PR works for you and you are able to catch an ImportError maybe that's fine.

Christian also proposed adding more specific feature-detection flags in the above-linked thread, but it would take a while to propagate until Pyodide, so if you want something working now, the current approach is probably the best choice.

@ian-r-rose
Copy link
Collaborator Author

Thanks for the detailed information @rth, and thanks for opening the upstream discussion.

This works in Pyodide now and will work in the future. But then some Emscripten builds for Node.js do have threading enabled https://pythondev.readthedocs.io/wasm.html#targets (and Pyodide might also enable it in the future) so that's also not very reliable.

I pushed a change to check sys.platform before doing any defensive imports. This feels more future-proof to me at the moment, and I think I would be okay with not trying to get threading working on a node-based platform for the moment. But I'm happy to get pushback on that!

* for threading, after quickly looking I can't find anything threading related that would raise an ImportError in pyodide 

Yeah, I think I was imprecise before, the problem isn't with the stdlib threading, it's with dask's threaded module, which itself imports multiprocessing.pool. So that's what I'm being defensive about here, but also what felt fragile about the import-based approach, since it would be pretty easy to refactor the imports in that module.

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @ian-r-rose. I still need to take a more detailed look at the changes here, but from a cursory look I don't see any tests. I'm not familiar with pyodide -- is it straightforward to add tests that make sure things are working as expected?

@ian-r-rose
Copy link
Collaborator Author

ian-r-rose commented May 31, 2022

I'm not familiar with pyodide -- is it straightforward to add tests that make sure things are working as expected?

It's not really straightforward as it's a completely different execution environment (and the defensive imports are around aspects of the standard library not being available). That said, I think I could monkeypatch sys.platform in a test to make sure that the basic behavior here works.

Edit: on reflection, since this logic only happens on startup, it's tough to mock. I'm not sure I see a good way to test this behavior, would love to hear ideas.

@ian-r-rose
Copy link
Collaborator Author

Okay, I've figured out a CI test for this behavior. It ain't pretty, but it works. Would love to get this in for 2022.6.0 @jrbourbeau

Comment on lines 88 to 95
if _EMSCRIPTEN:
from dask import local

DEFAULT_GET = local.get_sync
else:
from dask import threaded

DEFAULT_GET = threaded.get
Copy link
Member

@jakirkham jakirkham Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems odd to have this here. Should we move it to dask.local instead?

Edit: This pattern repeats below

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean. Are you objecting to repeating this clause for all the different collections? In principle, it might make sense to consolidate, though I've kept them collection-specific since Bag has a different default scheduler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a bit worrying that Emscripten logic is leaking into other parts of the codebase like this.

Think we could do a bit better by containing with some changes in those schedulers and then logic here and elsewhere like

Suggested change
if _EMSCRIPTEN:
from dask import local
DEFAULT_GET = local.get_sync
else:
from dask import threaded
DEFAULT_GET = threaded.get
try:
from dask.threaded import get as DEFAULT_GET
except ImportError:
from dask.local import get_sync as DEFAULT_GET

Maybe this could be handled other ways like having a centralized registry (dict?) of different gets and we could look for the appropriate one there

Copy link
Member

@jakirkham jakirkham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some suggestions below on handling the get logic

dask/threaded.py Show resolved Hide resolved
dask/multiprocessing.py Show resolved Hide resolved
Comment on lines 88 to 95
if _EMSCRIPTEN:
from dask import local

DEFAULT_GET = local.get_sync
else:
from dask import threaded

DEFAULT_GET = threaded.get
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a bit worrying that Emscripten logic is leaking into other parts of the codebase like this.

Think we could do a bit better by containing with some changes in those schedulers and then logic here and elsewhere like

Suggested change
if _EMSCRIPTEN:
from dask import local
DEFAULT_GET = local.get_sync
else:
from dask import threaded
DEFAULT_GET = threaded.get
try:
from dask.threaded import get as DEFAULT_GET
except ImportError:
from dask.local import get_sync as DEFAULT_GET

Maybe this could be handled other ways like having a centralized registry (dict?) of different gets and we could look for the appropriate one there

@ian-r-rose
Copy link
Collaborator Author

Thanks for the suggestion @jakirkham, this seems sensible to me. I had moved away from an ImportError-based approach, as it relies on an implementation detail of the pyodide distribution. But as long as we are raising it ourselves, it makes perfect sense to keep the logic contained within the scheduler implementations themselves.

@ian-r-rose
Copy link
Collaborator Author

Hmm, @jakirkham your suggestion is actually significantly more difficult to test in CI, since it appears that unittest.mock occurs after import time, so relying on catching an ImportError from dask.threading is tough. Right now, I'm not sure how to appropriately test it outside of setting some magic environment variable. Mocking sys.platform causes problems elsewhere (which is not entirely surprising).

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ian-r-rose -- apologies for the delayed response

dask/base.py Show resolved Hide resolved
setup.py Outdated Show resolved Hide resolved
@jakirkham
Copy link
Member

Hmm, @jakirkham your suggestion is actually significantly more difficult to test in CI, since it appears that unittest.mock occurs after import time, so relying on catching an ImportError from dask.threading is tough. Right now, I'm not sure how to appropriately test it outside of setting some magic environment variable. Mocking sys.platform causes problems elsewhere (which is not entirely surprising).

Another option that came up in this comment ( #9053 (comment) ) was using a dict to collect different backends. This is easier to mock. One can get to handle retrieval.

@ian-r-rose
Copy link
Collaborator Author

Another option that came up in this comment ( #9053 (comment) ) was using a dict to collect different backends. This is easier to mock. One can get to handle retrieval.

I tried that as well, and wasn't really happy with it either, in that the mock just felt like a tautology. I'll push a commit for comparison.

@ian-r-rose
Copy link
Collaborator Author

cf. 78766ad

It's not a great test, but I suppose it could do

@jakirkham
Copy link
Member

Gotcha. This does feel cleaner.

Think the testing point merely highlights it is hard to test this well outside of just running with Pyodide.

Copy link
Member

@jakirkham jakirkham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a couple small follow up suggestions based on the last change

dask/dataframe/io/hdf.py Outdated Show resolved Hide resolved
dask/dataframe/io/hdf.py Outdated Show resolved Hide resolved
Copy link
Member

@jakirkham jakirkham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

We should keep an eye on CI for a few days to seem if any of the keyword renames in functions cause any issues (though probably not)

@ian-r-rose
Copy link
Collaborator Author

Thanks for the review @jakirkham!

@jsignell jsignell mentioned this pull request Jun 20, 2022
9 tasks
@jsignell
Copy link
Member

Merging this ahead of the release on Friday. Thanks for working on this @ian-r-rose it seems like an exciting first step!

@jsignell jsignell merged commit a62c008 into dask:main Jun 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants