apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.07k stars 14.29k forks source link

Simultaneous tasks completion causing missing dataset triggered dag runs #35870

Open boushphong opened 11 months ago

boushphong commented 11 months ago

Apache Airflow version

2.7.3

What happened

When multiple airflow tasks finish at about the same time, and those tasks are also responsible for triggering other Dag via Dataset. There will be missing dataset triggered dag runs.

For example: A Dag that has 2 tasks triggering another Dag via Dataset, there must be 2 dataset triggered dag runs for the triggered dag. From my observation, if 2 tasks finishes at about the same time, there will be missing triggered dag runs, so there might be only 1 dag run will be triggered instead of 2.

What you think should happen instead

The number of dataset triggered dag runs has to be added up to the number of tasks (that triggers the dataset run) that finishes at the same time.

How to reproduce

Code to reproduce:

import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.datasets import Dataset

with DAG(
        "dataset_triggered_runs",
        start_date=datetime.datetime(2022, 1, 1),
        schedule="0 0 * * *",
        catchup=False,
) as dataset_triggered_runs:
    bash_sleep = BashOperator(
        task_id="bash_sleep", bash_command="sleep 3", outlets=[Dataset("test")]
    )

    bash_sleep_2 = BashOperator(
        task_id="bash_sleep_2", bash_command="sleep 5", outlets=[Dataset("test")]
    )

with DAG(
        "missing_dataset_triggered_runs",
        start_date=datetime.datetime(2022, 1, 1),
        schedule="0 0 * * *",
        catchup=False,
) as missing_dataset_triggered_runs:
    echo1 = BashOperator(
        task_id="bash_sleep", bash_command="echo 1", outlets=[Dataset("test")]
    )

    echo2 = BashOperator(
        task_id="bash_sleep_2", bash_command="echo 2", outlets=[Dataset("test")]
    )

with DAG(
        "trigger_v1",
        start_date=datetime.datetime(2022, 1, 1),
        schedule=[Dataset("test")],
) as dag2:
    dataset_trigger_task = EmptyOperator(task_id="empty_task_2")

The dataset_triggered_runs DAG have 2 tasks (that triggers dataset run) finishing at different time, and there are 2 dataset triggered dag runs, which is expected.

[2023-11-27T03:19:22.897+0700] {dagrun.py:653} INFO - Marking run <DagRun trigger_v1 @ 2023-11-26 20:19:21.217750+00:00: dataset_triggered__2023-11-26T20:19:21.217750+00:00, state:running, queued_at: 2023-11-26 20:19:21.809036+00:00. externally triggered: False> successful
[2023-11-27T03:19:22.897+0700] {dagrun.py:704} INFO - DagRun Finished: dag_id=trigger_v1, execution_date=2023-11-26 20:19:21.217750+00:00, run_id=dataset_triggered__2023-11-26T20:19:21.217750+00:00, run_start_date=2023-11-26 20:19:21.851541+00:00, run_end_date=2023-11-26 20:19:22.897392+00:00, run_duration=1.045851, state=success, external_trigger=False, run_type=dataset_triggered, data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26 00:00:00+00:00, dag_hash=f4426a33d21238a4f41898744b2ce017
[2023-11-27T03:19:23.490+0700] {dagrun.py:653} INFO - Marking run <DagRun dataset_triggered_runs @ 2023-11-26 20:19:17.627907+00:00: manual__2023-11-26T20:19:17.627907+00:00, state:running, queued_at: 2023-11-26 20:19:17.637778+00:00. externally triggered: True> successful
[2023-11-27T03:19:23.490+0700] {dagrun.py:704} INFO - DagRun Finished: dag_id=dataset_triggered_runs, execution_date=2023-11-26 20:19:17.627907+00:00, run_id=manual__2023-11-26T20:19:17.627907+00:00, run_start_date=2023-11-26 20:19:17.888672+00:00, run_end_date=2023-11-26 20:19:23.490855+00:00, run_duration=5.602183, state=success, external_trigger=True, run_type=manual, data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26 00:00:00+00:00, dag_hash=d60293c69d62b7a5aeb3cf3d884b9693
[2023-11-27T03:19:23.500+0700] {scheduler_job_runner.py:685} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='dataset_triggered_runs', task_id='bash_sleep_2', run_id='manual__2023-11-26T20:19:17.627907+00:00', try_number=1, map_index=-1)
[2023-11-27T03:19:23.503+0700] {scheduler_job_runner.py:722} INFO - TaskInstance Finished: dag_id=dataset_triggered_runs, task_id=bash_sleep_2, run_id=manual__2023-11-26T20:19:17.627907+00:00, map_index=-1, run_start_date=2023-11-26 20:19:18.027568+00:00, run_end_date=2023-11-26 20:19:23.206466+00:00, run_duration=5.178898, state=success, executor_state=success, try_number=1, max_tries=0, job_id=189, pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2023-11-26 20:19:17.914731+00:00, queued_by_job_id=1, pid=23848
[2023-11-27T03:19:23.678+0700] {dagrun.py:653} INFO - Marking run <DagRun trigger_v1 @ 2023-11-26 20:19:23.214909+00:00: dataset_triggered__2023-11-26T20:19:23.214909+00:00, state:running, queued_at: 2023-11-26 20:19:23.471484+00:00. externally triggered: False> successful
[2023-11-27T03:19:23.678+0700] {dagrun.py:704} INFO - DagRun Finished: dag_id=trigger_v1, execution_date=2023-11-26 20:19:23.214909+00:00, run_id=dataset_triggered__2023-11-26T20:19:23.214909+00:00, run_start_date=2023-11-26 20:19:23.479981+00:00, run_end_date=2023-11-26 20:19:23.678956+00:00, run_duration=0.198975, state=success, external_trigger=False, run_type=dataset_triggered, data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26 00:00:00+00:00, dag_hash=f4426a33d21238a4f41898744b2ce017

However, the missing_dataset_triggered_runs DAG have 2 tasks (that triggers dataset run) finishing at about the same time, and there is only 1 dataset triggered dag run, which is unexpected. This is very likely a bug.

[2023-11-27T03:21:42.147+0700] {scheduler_job_runner.py:722} INFO - TaskInstance Finished: dag_id=missing_dataset_triggered_runs, task_id=bash_sleep_2, run_id=scheduled__2023-11-25T00:00:00+00:00, map_index=-1, run_start_date=2023-11-26 20:21:41.184677+00:00, run_end_date=2023-11-26 20:21:41.356996+00:00, run_duration=0.172319, state=success, executor_state=success, try_number=1, max_tries=0, job_id=190, pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2023-11-26 20:21:41.048606+00:00, queued_by_job_id=1, pid=24351
[2023-11-27T03:21:42.147+0700] {scheduler_job_runner.py:722} INFO - TaskInstance Finished: dag_id=missing_dataset_triggered_runs, task_id=bash_sleep, run_id=scheduled__2023-11-25T00:00:00+00:00, map_index=-1, run_start_date=2023-11-26 20:21:41.184319+00:00, run_end_date=2023-11-26 20:21:41.345462+00:00, run_duration=0.161143, state=success, executor_state=success, try_number=1, max_tries=0, job_id=191, pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2023-11-26 20:21:41.048606+00:00, queued_by_job_id=1, pid=24350
[2023-11-27T03:21:43.175+0700] {dagrun.py:653} INFO - Marking run <DagRun trigger_v1 @ 2023-11-26 20:21:41.356742+00:00: dataset_triggered__2023-11-26T20:21:41.356742+00:00, state:running, queued_at: 2023-11-26 20:21:42.118410+00:00. externally triggered: False> successful
[2023-11-27T03:21:43.175+0700] {dagrun.py:704} INFO - DagRun Finished: dag_id=trigger_v1, execution_date=2023-11-26 20:21:41.356742+00:00, run_id=dataset_triggered__2023-11-26T20:21:41.356742+00:00, run_start_date=2023-11-26 20:21:42.126641+00:00, run_end_date=2023-11-26 20:21:43.175274+00:00, run_duration=1.048633, state=success, external_trigger=False, run_type=dataset_triggered, data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26 00:00:00+00:00, dag_hash=f4426a33d21238a4f41898744b2ce017

Operating System

Docker

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

nathadfield commented 11 months ago

@boushphong Thanks for logging and I have been able to replicate it. I see that you indicated you'd be willing to submit a PR, so shall I assign you to this issue?

boushphong commented 11 months ago

Sure things. Let me work on this.

boushphong commented 11 months ago

@nathadfield Just submitted a pull request. It's a DRAFT PR for now. https://github.com/apache/airflow/pull/36032/files#diff-4253adbb36bfb93cb75ab00c7d509518134e5bf1ad16473b64a2a6d8fa456c92L208-L214

I went with the idea to remove primary key for the dataset_dag_run_queue table so that when we insert a new record in the table as in (code):

stmt = insert(DatasetDagRunQueue).values(dataset_id=dataset.id).on_conflict_do_nothing()

so that we won't face any conflict issue because if a Dag has multiple tasks updating the same Dataset, we would get a conflict because we insert 2 records but they'd conflict with each other due to the primary key constraint.

Just briefing my idea before committing more time to this solution. WDYT? By the way, if I make changes to the model, Do I have to modify the migrations package and if so where would I have to look into. Cheers!

nathadfield commented 11 months ago

@boushphong I'm probably not best to comment on this as I don't really know much about this aspect of Airflow. Perhaps tag some of the people who have also worked in this area on the PR?