dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.55k stars 712 forks source link

Worker plugin can not be registered on worker unless its entire package source uploaded on server #8695

Open dbalabka opened 1 week ago

dbalabka commented 1 week ago

Describe the issue: Let's say I've created a plugin under my_package.dask.plugins.MyWorkerPlugin.

the plugin registration:

from my_package.dask.plugins import MyWorkerPlugin
client.register_plugin(MyWorkerPlugin())

will lead to:

  File "/opt/conda/lib/python3.10/site-packages/distributed/core.py", line 970, in _handle_comm
  File "/opt/conda/lib/python3.10/site-packages/distributed/scheduler.py", line 7893, in register_worker_plugin
  File "/opt/conda/lib/python3.10/site-packages/distributed/scheduler.py", line 6488, in broadcast
  File "/opt/conda/lib/python3.10/site-packages/distributed/utils.py", line 251, in All
  File "/opt/conda/lib/python3.10/site-packages/distributed/scheduler.py", line 6466, in send_message
  File "/opt/conda/lib/python3.10/site-packages/distributed/core.py", line 1181, in send_recv
Exception: ModuleNotFoundError("No module named 'my_package'")

It would be useful if Dask could provide more meaningful error messages or upload the package automatically. However, the serialisation method should allow the use of the registered method.

Environment:

fjetter commented 1 week ago

You should be able to define your module in cloudpickle to be pickled by value to force dask to upload this

https://github.com/cloudpipe/cloudpickle?tab=readme-ov-file#overriding-pickles-serialization-mechanism-for-importable-constructs

dbalabka commented 1 week ago

@fjetter , thanks for a quick reply. I will try it and get back to you with the feedback.

dbalabka commented 1 week ago

@fjetter , thanks it works!

from my_project.dask_plugins import MyModule
from my_project import dask_plugins 

cloudpickle.register_pickle_by_value(dask_plugins)
client.register_plugin(MyModule())

Few comments regarding this issue:

  1. Should it be documented in the Dask Distributed page about plugins? https://distributed.dask.org/en/latest/plugins.html The only mention is here: https://github.com/dask/distributed/blob/d8dc8ad2172ff34113e4bc47d57ae55401cd6705/docs/source/protocol.rst#cloudpickle-for-functions-and-some-data
  2. The solution is a bit verbose. Should it be the responsibility of the client.register_plugin(...) method? Can it be encapsulated into register_plugin?
fjetter commented 1 week ago

This is not a shortcoming of the plugin system. You are faced with the exact same problem if you are submitting functions as ordinary tasks so this is a problem of the serialization system. I'm not sure what would be a better place to document this. I wouldn't mind if we just cross referenced this protocol section in the worker plugin (for reference, you linked to the code, the documentation is hosted here https://distributed.dask.org/en/stable/protocol.html ; dask/dask and dask/distributed have different docs domains). Are you interested in contributing a PR for the documentation?

Regarding the encapsulation in register_plugin this isn't easily possible. It's not just that it isn't a plugin exclusive problem but rather that we simply do not know and cannot infer this on your machine. Serialization by value is expensive and should only be used as a last resort so we cannot set this as a default. Likewise, we cannot perform a cluster-wide environment check for all packages (and submodules, and functions, ...) on every call.

dbalabka commented 1 week ago

@fjetter, another possible solution might be to provide more convenient way deliver a project source package to dask nodes. So, developer should upload whole project source before expecting using any plugins/functions/classes on worker nodes. Here is a ticket which describes the idea: https://github.com/dask/distributed/issues/8698

dbalabka commented 1 week ago

@fjetter , i will contribute the documentation adjustments