apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.06k stars 14.29k forks source link

Allow dynamically set `pool_slots` for potentially heavy tasks. #35803

Open Digoya opened 11 months ago

Digoya commented 11 months ago

Description

Allow to set pool_slots for a task based on the result of another task.

One task may require different amount of resources for runtime depending on its inputs. However we can assume approximate resource requirements for a task based on its initial parameters. It would be nice to be able to dynamically set pool_slots based on the result of another task.


@dag
def example_dag():

    @task
    def get_slots_required():
        from lib import estimate_cost
        data = get_data()
        return 3 if estimate_cost(data) > 15 else 1

    @task
    def heavy_or_not_task():
        ...

    slots = get_slots_required()
    heavy_or_not_task.override(pool_slots=slots)()

dag_instance = example_dag()

Use case/motivation

This feature expands capabilities of pool system in Airflow allowing more fine grained control over resources making system more stable and efficient.

Related issues

https://github.com/apache/airflow/issues/33657

Are you willing to submit a PR?

Code of Conduct

boring-cyborg[bot] commented 11 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

potiuk commented 11 months ago

Just to set expectations here.

This is something that is going to be super complex and I am not sure if we are ever going to do that. Scheduler performs selection on which tasks should be scheduled as select query on the database where it checks how many tasks are there in the pool. Dynamically seeting the pool size while tasks are running might have a lot of undesireable side effects, because the execution of tasks and scheduling are run completely independently from each other. There might be many side effects of changing pool side on-the-flight while tasks are already being scheduled for it - tasks not scheduled where they should, over-subscribing pools, deadlocks etc. etc.

As @uranusjr explained in https://github.com/apache/airflow/issues/33657 - while theorethically possible, it could only be done if someone would heavily change the pool mechanism to allow such dynamic behaviour (basically it means that whether to choose task as candidate for scheduling the selection cannot be based on rather static calculation of pool capacity there.

So, if you are counting on it as something to be implemented - this is unlikely to happen any time soon, unless someone will take on the task of re-implementing pool mechanism.

Of course if you would like to be that someone @Digoya - feel free on doing it, but if you have no skills/capacity/experience/will/understanding of Airflow, this one will have to wait for somone who has it all and picks it up.

hussein-awala commented 11 months ago

I agree with @potiuk about the complexity of this feature.

However, when we merge https://github.com/apache/airflow/pull/35210, I will try to do the same thing for pool and pool_slots to make them dynamic based on the TI attributes (operator parameters + extra configurations like retries, execution time, ...)

jscheffl commented 11 months ago

@hussein-awala if you have your hands on the topic, would it be possible to make the same for the queue as well? There is also one feature request (and I would add myself up to this): #35542

Maybe I am not too deep in scheduler if the point of change is the same area. But as I have the "same" demand I'd offer contribution to this as well (e.g. testing)

sid-habu commented 2 months ago

@hussein-awala With your merged PR, is it now possible to dynamically set the pool and pool_slot attributes of a task based on an XCOM return value of an upstream task?

I am trying to customize a Sensor and have enabled pool_slots as a templated variable. However, I am struggling to pass the jinja templated XCOM value from an upstream task as the value to pool_slots for the customized sensor task

sid-habu commented 2 months ago

Looks like it is not possible to set the pool and pool_slots attributes dynamically as the values are fetched from the task instance before these fields are rendered.

Pooling is such a powerful feature but in our multi-tenant platform we need to control the pool and slots dynamically