apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.35k stars 13.8k forks source link

Allow multiple pools for one task #13975

Open ginevragaudioso opened 3 years ago

ginevragaudioso commented 3 years ago

Hello!

Description of feature:

I think it would be helpful to allow for multiple pools in one task. Currently, the pool argument for any class inheriting from BaseOperator is of type string, and thus only allows one pool to be entered. I believe it would be useful to allow to set multiple pools for one task, meaning allowing the argument pool to be a list of string instead of just one string. This would mean that a task would have to wait on a spot to be available in every one of the pools it declares, instead of in the only one pool it declares, and this would mean that a task would take up spots in every one of the pools it declares, instead of in only the one pool it declares.

Use case:

I have some tasks that require multiple resources. I cannot split the tasks into separate tasks each requiring one resource, since the tasks need the two (or more) resources at once to complete their assignment. I also have some tasks only requiring one of the resources, so I can't create a pool for both resources. Example: Task 1 requires resource A and B Task 2 requires resource A Task 3 requires resource B Resource A can only have 4 connections. Resource B can only have 16 connections. I would need to have task 1 be in pool A and pool B, and this is not possible today since I can only specify one pool.

What would I want to happen?

Allow multiple pools in task creation. I looked into airflow source code, and it looks like the assumption that we only have one pool is deep into SQL, so I cannot just easily fork airflow and add this feature, so the change is not small and I do not have enough airflow understanding to make this change.

boring-cyborg[bot] commented 3 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

anushkrishnav commented 3 years ago

I would like to work on it with more guidance

ginevragaudioso commented 3 years ago

Thanks @anushkrishnav ! I can't provide much guidance on the implementation side of things, as I stated before I did not take the time to look into airflow source code deep enough to be able to give any meaningful advice. All I know is that it seems that the one-pool assumption goes all the way into SQL, so this may require a table model change, but I could be wrong here. In terms of feature request, I can help clarify the desired outcome. The goal is to be able to call something like (BashOperator is only an example, but this change would need to go into BaseOperator so that all operators inherit it): ` my_task = BashOperator(

some kwargs here, unrelated to pool(s)

pools=[RESOURCE_A_POOL, RESOURCE_B_POOL],
pools_slots=[1, 1], # this could be default, but just mentioning here since this kwarg also needs to change

) Currently, thepoolsargument that takes a list of pools does not exist, we only have apoolargument that takes one pool. With the proposed feature in this issue, and the proposed new call in this comment,my_taskcan only be scheduled when there is one slot available in all declared pools, andmy_task` will take up one slot in all pools. I think it's ok to leave resource ordering (which is needed to avoid deadlocks probably) to the user, and airflow could just "book" the pools in the order they are given.

Please let me know if you need any more guidance from me, and thanks for replying to this issue!

zachary-povey commented 3 years ago

There is another fundamental issue with pools in airflow that this would help solve: if you use pools to limit concurrency across different DAGs you lose the ability to prioritise one task above another in the same DAG, which may have a concurrency limit on it. This is because the priority_weight argument for the operator refers to priority within a pool not a DAG.

If we could add tasks to more than one pool, you could then have each task in a pool for the DAG and another pool for limiting concurrency with a task in seperate DAG which shares a resource.

c-thiel commented 3 years ago

To motivate this a little bit further, the following use-case would also be solved with this PR:

When we use the KubernetesPodOperator, we launch pods in Namespaces. These Namespaces have limits - however Airflow is currently unaware of those. Thus, if we hit the limits, Airflow will just continue to schedule tasks which will fail immediately. Thus we should put each tasks in two pools: One representing the memory limit and one representing the CPU limit. This really would be an essential feature for larger Kubernetes deployments.

jackemuk commented 1 year ago

Instead of pooling by task, what about adding pooling by connection? I have a number of dags that require connections to multiple databases and use PostgresHook and OdbcHook, so using pooling on the task level isn't a solution.

potiuk commented 1 year ago

This calls for Aiflow Improvement Proposal - pools are essential part of Airflow scheduling and adding new pool capabilities (including connections) - especially in multi-scheduler scenario is complex and has a number of consequences that should be discussed.

It would be great if someone writes such a proposal, describing the implementation and raise that as a discussion on the devlist with the proposal https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals - then someone woudl have to implement it.

This is how normally such a big feature should be added to Airflow.

notatallshaw-gts commented 1 year ago

Instead of pooling by task, what about adding pooling by connection? I have a number of dags that require connections to multiple databases and use PostgresHook and OdbcHook, so using pooling on the task level isn't a solution.

This seems like a completely different feature that's being requested that doesn't solve the same set of problems as the original request.

This calls for Aiflow Improvement Proposal - pools are essential part of Airflow scheduling and adding new pool capabilities (including connections) - especially in multi-scheduler scenario is complex and has a number of consequences that should be discussed.

It would be great if someone writes such a proposal, describing the implementation and raise that as a discussion on the devlist with the proposal https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals - then someone woudl have to implement it.

This is how normally such a big feature should be added to Airflow.

I would consider drafting an AIP but I don't have the technical knowledge of Airflow's architecture to propose or PoC an implementation. How much detail is expected from an AIP?

potiuk commented 1 year ago

I would consider drafting an AIP but I don't have the technical knowledge of Airflow's architecture to propose or PoC an implementation. How much detail is expected from an AIP?

Rather detailed - look at the other AIPs (completed) - they are much better explanation of the level of detail that I could give here.

Just to set expectation - this is how things work in Open Source. Things get implemented, when someone implements them. If you want something implemented, you either do it, or find someone who will get an interest and implement it. This project is done in the community and run by the Apache Software Foundation rules - where anyone can contribute.