Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically imported functions by path cannot be executed with ClientExecutor. #8607

Open
tobiasraabe opened this issue Apr 2, 2024 · 0 comments

Comments

@tobiasraabe
Copy link

Describe the issue: The functions that I want to execute with the ClientExecutor are dynamically imported via the module's path.

Even if I register the dynamically imported module with cloudpickle.register_pickle_by_value, the deserialization fails.

Minimal Complete Verifiable Example:

First, the main file contains the code to import modules dynamically and then submits the imported functions to the executor. For experimentation, the executor from loky was also tested, which did not throw an error.

# Content of main.py
import sys
import importlib.util
import cloudpickle

from pathlib import Path
from types import ModuleType
import cloudpickle
from loky import get_reusable_executor
from distributed import Client, LocalCluster


def import_path(path: Path) -> ModuleType:
    """Adapted from https://docs.python.org/3/library/importlib.html#importing-a-source-file-directly."""
    module_name = path.name

    spec = importlib.util.spec_from_file_location(module_name, str(path))

    if spec is None:
        raise ImportError(f"Can't find module {module_name!r} at location {path}.")

    mod = importlib.util.module_from_spec(spec)

    sys.modules[module_name] = mod

    spec.loader.exec_module(mod)
    return mod


if __name__ == "__main__":
    # Import the module.
    module = import_path(Path("functions.py").resolve())

    # Register the module for pickling.
    cloudpickle.register_pickle_by_value(module)

    # with get_reusable_executor(max_workers=1) as executor:
    #     future = executor.submit(module.func)

    client = Client(LocalCluster(n_workers=1))
    with client.get_executor() as executor:
        future = executor.submit(module.func)

    print(future.result())

Second, a module functions.py that holds the dynamically imported function.

def func(): return "SUCCESS"

Running the code yields

Console
python main.py 
2024-04-03 00:07:17,237 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/protocol/core.py", line 175, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/protocol/core.py", line 172, in _decode_default
    return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 96, in loads
    return pickle.loads(x)
           ^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'functions.py'; 'functions' is not a package
2024-04-03 00:07:17,345 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7f006efefa10>>, <Task finished name='Task-4' coro=<Worker.handle_scheduler() done, defined at /home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/worker.py:203> exception=ModuleNotFoundError("No module named 'functions.py'; 'functions' is not a package")>)
Traceback (most recent call last):
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/tornado/ioloop.py", line 750, in _run_callback
    ret = callback()
          ^^^^^^^^^^
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/tornado/ioloop.py", line 774, in _discard_future_result
    future.result()
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/worker.py", line 206, in wrapper
    return await method(self, *args, **kwargs)  # type: ignore
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/worker.py", line 1302, in handle_scheduler
    await self.handle_stream(comm)
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/core.py", line 1025, in handle_stream
    msgs = await comm.read()
           ^^^^^^^^^^^^^^^^^
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/comm/tcp.py", line 247, in read
    msg = await from_frames(
          ^^^^^^^^^^^^^^^^^^
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/comm/utils.py", line 78, in from_frames
    res = _from_frames()
          ^^^^^^^^^^^^^^
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(
           ^^^^^^^^^^^^^^^
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/protocol/core.py", line 175, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/protocol/core.py", line 172, in _decode_default
    return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 96, in loads
    return pickle.loads(x)
           ^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'functions.py'; 'functions' is not a package

Anything else we need to know?:

Everything works if I change the serialization in this line from pickle to cloudpickle.

result = pickle.dumps(x, **dump_kwargs)

Maybe the logic can be adjusted such that if a module shows up in cloudpickle.list_registry_pickle_by_value(), the user meant to pickle it by value.

The issue also touches on #7841. If the user in the issue had used --import-mode importlib as the import mode for pytest, the same issue appeared. pytest basically uses import_path under the hood with some adjustments.

Environment:

  • Dask version: 2024.3.1
  • Python version: 3.11.8
  • Operating System: WSL
  • Install method (conda, pip, source): conda
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant