apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

Pool Support for Task Groups in Apache Airflow #35689

Open rodolfo-nobrega opened 12 months ago

rodolfo-nobrega commented 12 months ago

Description

I would like to propose a new feature for Apache Airflow: the ability to assign a pool to an entire task group, enabling better control over the execution of tasks within the group. This feature would be particularly useful for ensuring that tasks within a group are executed sequentially before another group starts, especially when multiple groups of tasks are designed to run in parallel.

I propose adding the functionality to assign a pool directly to a task group, which would allow all tasks within the group to be managed by the pool's slot limitations. This feature would provide a much simpler and more efficient way to ensure that tasks within a group are executed in sequence before moving on to another group, especially in cases where parallel execution of multiple groups is required.

This enhancement would greatly improve the usability and flexibility of task groups in Airflow, especially for complex workflows where task sequencing and parallel execution control are crucial.

Thank you for considering this feature request.

Use case/motivation

In my current workflow, I have multiple task groups structured to run in parallel, each containing a sequence of tasks that should be executed in a specific order. The task groups are similar in structure and purpose, resembling the following format:

Task Group 1: a >> b >> c Task Group 2: d >> e >> f Task Group 3: g >> h >> i The desired behavior is to have two task groups running in parallel at any given time, with each group executing its tasks in the defined sequence. However, I'm encountering a challenge with the current Airflow features:

When using Concurrency: Airflow schedules the first task of each group (a, d, g) to run simultaneously, followed by the second tasks of each group (b, e, h), and so on. This approach does not maintain the sequential integrity of each group.

When using a Pool: Assigning all tasks in a group to a specific pool still results in the same behavior as above, where Airflow schedules one task from each group in parallel, rather than executing all tasks in a single group sequentially before starting the next group.

Related issues

Current Workarounds and Limitations:

SubDAGs: One solution is to use SubDagOperator to encapsulate each task group into a SubDAG. While this approach ensures sequential execution within each group, it introduces performance and complexity issues, as SubDAGs are known to be less efficient and more complex to manage.

Are you willing to submit a PR?

Code of Conduct

boring-cyborg[bot] commented 12 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

Taragolis commented 11 months ago

have multiple task groups structured to run in parallel, each containing a sequence of tasks that should be executed in a specific order.

When using Concurrency: Airflow schedules the first task of each group (a, d, g) to run simultaneously, followed by the second tasks of each group (b, e, h), and so on. This approach does not maintain the sequential integrity of each group.

I can't get what you want to active, but actually work as it expected and designed due to parallel and distributive manier. If you need to run TaskGroup tasks only after previous one completed then you need to run one after each other, e.g.:

Task Group 1 >> Task Group 2 >> Task Group 3


I want to add information, that this feature (as well as many from area:TaskGroup) unlikely would be implemented with current of implementation and design of TaskGroup, right now it is just an namespace / nice UI feature.

There is no information about task group after it created, because it is just a nice namespace with additional abilities during DAG creation, all other information not available during DagRun because it transform into a prefix(es) for task_id:

potiuk commented 8 months ago

One more comment here (result of discussion in Slack).

I think it would be indeed a very nice feature to add in Airflow, but likely you can achieve it by using some external DB (might be any DB - relational or not) whre you can implement transaction doing an atomic "Increase or Wait" functionality - similar to what semaphores are doing in proces synchronisaton, and using the new Setup/Teardown functionality in Airflow.

All you need is to have a call (might be an SQL QUERY)) that will atomically run "Increase or Wait" call (I.e. attempt to increase some value but when it reaches some threashold - wait and attmpt again - either in a loop or when you are notified that the counter/semaphore has been decreased).

For each of your group - you should write a custom Setup task doing "Increase or wait" kind of functionality, and Teardown task doing "decrease". That should achieve exactly what you want - each task group would continue only after Setup manages to successfully increase such external semaphore.

This is of course not super easy to implement and would be nice to have similar feature in Airflow, but it's entirely possible to implement something like that with - I think any remote transaction DB you would want to use, or even with object storage most likely.

bruno-uy commented 5 months ago

I'm trying to coordinate two groups of tasks in different pipelines and this feature would be really valuable.

For my use case it would be enough to have a pool in the TaskGroup level that works as the normal pool in a task.

Let me give an example of the problem with using that in a single task:

Pipeline A:
t1 >> TaskGroup(t2, t3) >> t4

Pipeline B:
t5 >> TaskGroup(t6, t7) >> t8

If I add the pool to t2, t3, t6 and t7 (let's say it's only 1 slot), the first one to start will take the spot. Let's say it's t2. If t6 is ready to run before t3, t6 will get the spot on the pool, but I want the whole task group to run when they acquire the pool.

Hope this example helps. Is this similar to what you had in mind @rodolfo-nobrega?

potiuk commented 5 months ago

Sure. not an easy one because Airflow schedules tasks individually - but if you would like to propose A PR on that @bruno-uy @rodolfo-nobrega - or more likely an "Airflow Improvement Proposal" - feel free.

bruno-uy commented 5 months ago

Sure. not an easy one because Airflow schedules tasks individually - but if you would like to propose A PR on that @bruno-uy @rodolfo-nobrega - or more likely an "Airflow Improvement Proposal" - feel free.

I will see if I can check that out in a couple of weeks. Pretty busy right now 😓