spotify / luigi

Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
Apache License 2.0
17.82k stars 2.39k forks source link

Add "Tags" to Tasks and Workers that specify the kinds of Tasks that Workers want to take on #3190

Open noah-gil opened 2 years ago

noah-gil commented 2 years ago

Similar to how you can define tags in .gitlab-ci.yml, it would be helpful if Workers had a set of tags that specify the kinds of Tasks they are willing to take on. Similarly, Tasks would have the same tags attached if they require a Worker with a certain capability in order to execute. A Task will only run on a Worker if the set of tags for the Task is a subset of the tags on the Worker.

For example, suppose I have a cloud cluster of Luigi Workers composing of normal VMs and VMs with attached GPUs. I have devised a workflow composed of some Tasks that can utilize a GPU for hardware acceleration and other Tasks that do not utilize a GPU. I want to be able to assign the GPU Tasks and Workers the tag gpu and all other Tasks and Workers will be assigned cpu. When the Workers ask the Central Scheduler for tasks, they will include their set of tags in the request, and the scheduler will only assign Tasks to Workers that have a subset match of tags. This allows me to utilize the hardware most efficiently. Otherwise, I have no influence on how tasks get divided, and my workflow may not execute as efficiently as I desire.

I've looked through the documentation, and I do not see evidence of this kind of feature existing. The closest I can see is resources, but those seem to act more like global semaphores.

lallea commented 2 years ago

This functionality is not present. For those that need it, the closest approximation is as follows:

Split your pipeline into different legs, where each leg has homogeneous environment requirements. Split with luigi.task.externalize. Schedule each leg on the machine that has the right resources, e.g. with a cron job, or use k8s node affinity to steer jobs to the right node. The luigid scheduler will recognise that the external tasks match their original tasks and let downstream workers know when they can run. If you get unwanted pauses due to scheduling delays, use keep_alive and max_keep_alive_idle_duration config options.