dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Dynamically imported functions by path cannot be executed with `ClientExecutor`. #8607

Open tobiasraabe opened 6 months ago

tobiasraabe commented 6 months ago

Describe the issue: The functions that I want to execute with the ClientExecutor are dynamically imported via the module's path.

Even if I register the dynamically imported module with cloudpickle.register_pickle_by_value, the deserialization fails.

Minimal Complete Verifiable Example:

First, the main file contains the code to import modules dynamically and then submits the imported functions to the executor. For experimentation, the executor from loky was also tested, which did not throw an error.

# Content of main.py
import sys
import importlib.util
import cloudpickle

from pathlib import Path
from types import ModuleType
import cloudpickle
from loky import get_reusable_executor
from distributed import Client, LocalCluster

def import_path(path: Path) -> ModuleType:
    """Adapted from https://docs.python.org/3/library/importlib.html#importing-a-source-file-directly."""
    module_name = path.name

    spec = importlib.util.spec_from_file_location(module_name, str(path))

    if spec is None:
        raise ImportError(f"Can't find module {module_name!r} at location {path}.")

    mod = importlib.util.module_from_spec(spec)

    sys.modules[module_name] = mod

    spec.loader.exec_module(mod)
    return mod

if __name__ == "__main__":
    # Import the module.
    module = import_path(Path("functions.py").resolve())

    # Register the module for pickling.
    cloudpickle.register_pickle_by_value(module)

    # with get_reusable_executor(max_workers=1) as executor:
    #     future = executor.submit(module.func)

    client = Client(LocalCluster(n_workers=1))
    with client.get_executor() as executor:
        future = executor.submit(module.func)

    print(future.result())

Second, a module functions.py that holds the dynamically imported function.

def func(): return "SUCCESS"

Running the code yields

Console ```console ❯ python main.py 2024-04-03 00:07:17,237 - distributed.protocol.core - CRITICAL - Failed to deserialize Traceback (most recent call last): File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/protocol/core.py", line 175, in loads return msgpack.loads( ^^^^^^^^^^^^^^ File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/protocol/core.py", line 172, in _decode_default return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 96, in loads return pickle.loads(x) ^^^^^^^^^^^^^^^ ModuleNotFoundError: No module named 'functions.py'; 'functions' is not a package 2024-04-03 00:07:17,345 - tornado.application - ERROR - Exception in callback functools.partial(>, exception=ModuleNotFoundError("No module named 'functions.py'; 'functions' is not a package")>) Traceback (most recent call last): File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/tornado/ioloop.py", line 750, in _run_callback ret = callback() ^^^^^^^^^^ File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/tornado/ioloop.py", line 774, in _discard_future_result future.result() File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/worker.py", line 206, in wrapper return await method(self, *args, **kwargs) # type: ignore ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/worker.py", line 1302, in handle_scheduler await self.handle_stream(comm) File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/core.py", line 1025, in handle_stream msgs = await comm.read() ^^^^^^^^^^^^^^^^^ File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/comm/tcp.py", line 247, in read msg = await from_frames( ^^^^^^^^^^^^^^^^^^ File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/comm/utils.py", line 78, in from_frames res = _from_frames() ^^^^^^^^^^^^^^ File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/comm/utils.py", line 61, in _from_frames return protocol.loads( ^^^^^^^^^^^^^^^ File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/protocol/core.py", line 175, in loads return msgpack.loads( ^^^^^^^^^^^^^^ File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/protocol/core.py", line 172, in _decode_default return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/tobia/micromamba/envs/pytask-parallel/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 96, in loads return pickle.loads(x) ^^^^^^^^^^^^^^^ ModuleNotFoundError: No module named 'functions.py'; 'functions' is not a package ```

Anything else we need to know?:

Everything works if I change the serialization in this line from pickle to cloudpickle.

https://github.com/dask/distributed/blob/5647d06333f100f798ce2704da98634b85983f71/distributed/protocol/pickle.py#L63

Maybe the logic can be adjusted such that if a module shows up in cloudpickle.list_registry_pickle_by_value(), the user meant to pickle it by value.

The issue also touches on https://github.com/dask/distributed/issues/7841. If the user in the issue had used --import-mode importlib as the import mode for pytest, the same issue appeared. pytest uses a modified version of import_path.

Environment: