apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.26k stars 13.78k forks source link

Use `selectinload` in trigger #40487

Open josephangbc opened 4 days ago

josephangbc commented 4 days ago

closes: #33647

As mentioned by @arunravimv in #33647, we have added this patch to our own Airflow deployment and have noticed improvements in triggerer performance.

Following are the Explain Analyze outputs for the two SQL Alchemy relationship loading strategies

Triggerer Process

Using joinedload in bulk_fetch method

-> Nested loop left join  (cost=101 rows=95) (actual time=0.22..0.359 rows=3 loops=1)
    -> Nested loop left join  (cost=67.8 rows=95) (actual time=0.21..0.348 rows=3 loops=1)
        -> Nested loop left join  (cost=34.6 rows=95) (actual time=0.204..0.338 rows=3 loops=1)
            -> Filter: (`trigger`.id in (969,968,984))  (cost=1.36 rows=3) (actual time=0.049..0.0565 rows=3 loops=1)
                -> Index range scan on trigger using PRIMARY over (id = 968) OR (id = 969) OR (id = 984)  (cost=1.36 rows=3) (actual time=0.048..0.0545 rows=3 loops=1)
            -> Nested loop inner join  (cost=35.9 rows=31.7) (actual time=0.0915..0.0932 rows=1 loops=3)
                -> Index lookup on task_instance_1 using ti_trigger_id (trigger_id=`trigger`.id)  (cost=8.97 rows=31.7) (actual time=0.0716..0.0731 rows=1 loops=3)
                -> Single-row index lookup on dag_run_1 using dag_run_dag_id_run_id_key (dag_id=task_instance_1.dag_id, run_id=task_instance_1.run_id)  (cost=0.251 rows=1) (actual time=0.0194..0.0195 rows=1 loops=3)
        -> Single-row index lookup on trigger_1 using PRIMARY (id=task_instance_1.trigger_id)  (cost=0.251 rows=1) (actual time=0.00301..0.00305 rows=1 loops=3)
    -> Single-row index lookup on job_1 using PRIMARY (id=trigger_1.triggerer_id)  (cost=0.251 rows=1) (actual time=0.00316..0.00321 rows=1 loops=3)

Using selectinload in bulk_fetch method

-> Nested loop inner join  (cost=5.26 rows=3) (actual time=0.0895..0.362 rows=3 loops=1)
    -> Nested loop left join  (cost=4.21 rows=3) (actual time=0.065..0.313 rows=3 loops=1)
        -> Nested loop left join  (cost=3.16 rows=3) (actual time=0.0521..0.298 rows=3 loops=1)
            -> Index range scan on task_instance using ti_trigger_id over (trigger_id = 968) OR (trigger_id = 969) OR (trigger_id = 984), with index condition: (task_instance.trigger_id in (969,968,984))  (cost=2.11 rows=3) (actual time=0.0399..0.273 rows=3 loops=1)
            -> Single-row index lookup on trigger_1 using PRIMARY (id=task_instance.trigger_id)  (cost=0.283 rows=1) (actual time=0.00755..0.00759 rows=1 loops=3)
        -> Single-row index lookup on job_1 using PRIMARY (id=trigger_1.triggerer_id)  (cost=0.283 rows=1) (actual time=0.00454..0.00458 rows=1 loops=3)
    -> Single-row index lookup on dag_run_1 using dag_run_dag_id_run_id_key (dag_id=task_instance.dag_id, run_id=task_instance.run_id)  (cost=0.283 rows=1) (actual time=0.016..0.016 rows=1 loops=3)

triggerview/list API

Using joined for relationship between TaskInstance and Trigger

-> Sort: `trigger`.id DESC  (actual time=0.0215..0.0215 rows=0 loops=1)
    -> Stream results  (cost=8234 rows=79751) (actual time=0.0169..0.0169 rows=0 loops=1)
        -> Nested loop left join  (cost=8234 rows=79751) (actual time=0.016..0.016 rows=0 loops=1)
            -> Table scan on trigger  (cost=0.35 rows=1) (actual time=0.0152..0.0152 rows=0 loops=1)
            -> Nested loop inner join  (cost=33331 rows=79751) (never executed)
                -> Table scan on dag_run_1  (cost=5419 rows=51596) (never executed)
                -> Filter: (task_instance_1.trigger_id = `trigger`.id)  (cost=0.386 rows=1.55) (never executed)
                    -> Index lookup on task_instance_1 using ti_dag_run (dag_id=dag_run_1.dag_id, run_id=dag_run_1.run_id)  (cost=0.386 rows=1.55) (never executed)

Using selectin for relationship between TaskInstance and Trigger

-> Nested loop inner join  (cost=8.41 rows=8) (actual time=0.0409..0.0409 rows=0 loops=1)
    -> Index range scan on task_instance using ti_trigger_id over (trigger_id = 109) OR (trigger_id = 110) OR (6 more), with index condition: (task_instance.trigger_id in (116,115,114,113,112,111,110,109))  (cost=5.61 rows=8) (actual time=0.0402..0.0402 rows=0 loops=1)
    -> Single-row index lookup on dag_run_1 using dag_run_dag_id_run_id_key (dag_id=task_instance.dag_id, run_id=task_instance.run_id)  (cost=0.263 rows=1) (never executed)

From above Explain Analyze results, we can see that using selectinload is gives more optimal performance for triggerer process as well as the triggerview list api.