apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.16k stars 14.04k forks source link

deferred / deferrable task are not take in account by max_active_tasks #40528

Open raphaelauv opened 2 months ago

raphaelauv commented 2 months ago

Apache Airflow version

2.9.2

What happened?

If you set max_active_tasks, it do not limit the number of concurrent deferred tasks.

It's possible to restrict the number of concurrent deferred tasks with a pool but I want limit the number of tasks run by a dag and keep the possibility for every task to use a specific pool ( we can only use 1 pool by task in airflow )

What you think should happen instead?

max_active_tasks should take in account the deferred task or we should create a new boolean setting

max_active_tasks_include_deferred like what was done for the pools

How to reproduce

from datetime import datetime, timezone, timedelta

from airflow.sensors.time_sensor import TimeSensorAsync
from pendulum import today
from airflow import DAG

with DAG(
        dag_id='example',
        schedule_interval='0 0 * * *',
        max_active_tasks=10,
        start_date=today("UTC").add(days=-1)):
    for i in range(100):
        TimeSensorAsync(task_id=f"a_{i}", target_time=(datetime.now(tz=timezone.utc) + timedelta(minutes=10)).time())

Are you willing to submit PR?

Code of Conduct

gopidesupavan commented 1 month ago

@raphaelauv @kaxil @RNHTTR I would like to look into this, could you please assign to me?

RNHTTR commented 1 month ago

Go for it!

gopidesupavan commented 4 weeks ago

Thank you @RNHTTR assigning to me, was looking into this, My understand of this ask is. limit total deferrable tasks per dag? Thanks @raphaelauv putting this.

To my understand of the triggers and deferrable state, when the any task defers, a new record/row will be created into the trigger tables and once task state is set DEFERRABLE, then the triggerer job picks these deferred tasks from the trigger table and executes in asyncio event loop.

The max_active_task verification is done by scheduler job, so no interaction between the scheduler job and trigger job. I would think to limit number of max deferrable tasks by dag below approach.

Create new state something like DEFERRED_QUEUE, and when the capacity available(<=max deferrable tasks). then move the state to DEFERRED, and trigger job runner should consider only deferred state tasks to run.

Please suggest your thoughts :)

RNHTTR commented 3 weeks ago

max_active_tasks already exists to limit the number of tasks in the running or queued state for a given DAG. the proposal here is to include tasks that are in the deferred state in this calculation.

Per @raphaelauv 's suggestion, I think there should be a new parameter max_active_tasks_include_deferred which will be False by default. When this parameter is set to True, then tasks in the deferred status should count toward the calculation for max_active_tasks.

gopidesupavan commented 3 weeks ago

Sure thank you for the clarification :) will come up with PR.