apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.58k stars 14.17k forks source link

Listener called twice with multiple schedulers in Airflow 2.7 #42580

Open Mint2702 opened 1 week ago

Mint2702 commented 1 week ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.1

What happened?

When running two schedulers in Airflow, I have encountered an issue where the listener responsible for sending custom metrics using the StatsD exporter is triggered twice. This happens specifically when the DAG state changes, and the listener is called in both on_dag_run_failed and on_dag_run_success.

Key Observations:

  1. The duplication only occurs when two schedulers are running. With a single scheduler, the bug does not occur.
  2. If a task within a DAG fails while it is in the queued state, the bug does not occur, and the metric is sent correctly.
  3. When a task fails while it is in the running state, the bug occurs, and the metric is sent twice.
  4. The listener can be triggered twice either: a) On the same scheduler, with a time difference of 5-10 seconds between calls b) On two separate schedulers, nearly simultaneously.

What you think should happen instead?

Listener should be called only once per event, even if two schedulers are running. The metric should only be sent once when the DAG state changes, such as on DAG success or failure.

How to reproduce

  1. Deploy Airflow 2.7.0 using the official Helm chart on Kubernetes, and configure two active schedulers.
  2. Add a listener that sends a custom metric to a StatsD exporter during DAG state changes (on_dag_run_failed, on_dag_run_success).
  3. Run DAGs with tasks that may fail in the running state and monitor the metrics sent during DAG state changes.
  4. Observe that the listener is sometimes called twice, resulting in duplicate metric entries.

Operating System

Ubuntu

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

Listener code:

@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str) -> None:
    """
    This method is called when dag run state changes to SUCCESS.
    """
    dag_id = dag_run.dag_id
    metrics.send_dags_starting_delay(dag_run=dag_run)

@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str) -> None:
    """
    This method is called when dag run state changes to FAILED.
    """
    dag_id = dag_run.dag_id
    run_id = dag_run.run_id
    external_trigger = dag_run.external_trigger
    metrics.send_dags_starting_delay(dag_run=dag_run)

Metric sending code:

from airflow.stats import Stats

def send_dags_starting_delay(dag_run: DagRun) -> None:
    dag_id = dag_run.dag_id

    task_instanses: list[TaskInstance] = dag_run.task_instances
    tasks_queued_duration = timedelta()
    for ti in task_instanses:
        task_queued_duration = ti.start_date - ti.queued_dttm
        tasks_queued_duration += task_queued_duration

    Stats.timing(f"dagrun.tasks.starting.delay.{dag_id}", tasks_queued_duration)

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 week ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

jscheffl commented 6 days ago

Version 2.7 of Airflow is pretty old and un-maintained. I'd propose in general to update. There might numerous fixes and improvements since then.

Can you test against the most recent Airflow 2.10 version?