dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.55k stars 712 forks source link

Package upload plugin #8698

Open dbalabka opened 1 week ago

dbalabka commented 1 week ago

Dask's remote task execution is very straightforward, using a function not dependent on an external package. However, the most common use case relies on existing installed libraries or project packages. There are two types of dependencies:

  1. pip/poetry installed packages
  2. dependencies located in the project source/package

To deliver both dependencies to workers, we do the following:

  1. We create a custom dask image that contains all required extra pip packages. The primary assumption is that we don't change dependencies often, and project-specific dask images can remain untouched for a while. So, we don't rebuild images often. However, to simplify the process, we use some automation that extracts all required packages with poetry export -f requirements.txt --output requirements.txt and builds a docker image remotely using the Kubernetes driver. PipInstall plugin is another way to do it, but it might slow down the cluster starting time till minutes. In our case, it takes less than a minute after image warmup on Kubernetes nodes.
  2. The project source is more dynamic and requires to be uploaded each time we spin up a cluster. We use the existing client.upload_file() function that rely on UploadFile plugin plugin. To clarify, we keep running the cluster only during Python script execution and tear it down when the script finishes.

While we successfully solved the delivery of extra dependencies to remote worker nodes, this requires a deep understanding of Dask cluster deployment and extra helper functions that do not come with Dask out of the box. I propose improving the Developer's Experience in this direction. I would focus on local source delivery on worker nodes first. To be more specific:

  1. Creating a new function upload_package(module: ModuleType) as a complimentary function for existing upload_file(path).
  2. egg file automated creation by a new function upload_package().
  3. Possibility to update existing venv packages like Dask-specific modules on remove worker nodes that should simplify the debug process. In the scope of #11160 investigation, I already proved that is possible (please see https://github.com/dask/dask/issues/11160#issuecomment-2158877551)

We already have a working prototype of the Worker/Scheduler plugin that performs all the above described. If there is a demand for such a plugin, we look forward to contributing our source. Any comments and suggestions are very welcome 🤗

Here are some usage examples:

Project source uploading to all workers:

import my_project_source

cluster = KubeCluster()  
client = cluster.get_client()

# Upload the entire project source to all worker nodes in a very convenient way
clients.register_plugin(UploadModule(my_project_source))

# It will be even more convenient with a new client function
client.upload_package(my_project_source)

We can replace part of the Dask source on all worker nodes for debugging purposes:

from dask.dataframe import backends

client.upload_package(backends)

Here is an example of an adjusted function: https://github.com/dask/dask/issues/11160#issuecomment-2158877551