apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

dag stuck in "running" status when running in backfill job #34448

Closed charleschang0531 closed 9 months ago

charleschang0531 commented 1 year ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

We are running airflow 2.6.2 with KubernetesExecutor, and we have two dags

  1. dag with dbt (dag_test.py)
  2. dag with backfill command to rerun dag_test.py (dag_rerun.py)

I'm trying to mock a situation when the job still failed when running airflow dag backfill, and there is a problem i cant solve when I try to backfill, the dag_test.py Run shows running forever when some task in dag_test.py are in failed state

  1. dag_test.py
    
    from datetime import datetime, timedelta
    from airflow.configuration import conf
    from airflow import DAG, AirflowException
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.bash import BashOperator
    from textwrap import dedent
    from airflow.decorators import task
    from airflow.kubernetes.secret import Secret
    from airflow.models import Variable
    from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
    )
    from kubernetes.client import models as k8s
    import os
    import random

from common.airflow import create_dag_by_common from common.kubernetes import ( CONTAINER_RESOURCES, NODE_AFFINITY, get_config )

def create_dag(dag_id: str) -> DAG:

# Define DAG args
default_args = {
    "retries": 0,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    dag_id=dag_id,
    default_args=default_args,
    description="basic dag",
    doc_md="",
    schedule_interval="06 * * * *",
    start_date=datetime(2023, 9, 16),
    catchup=True,
    max_active_runs=4,
    max_active_tasks=10
)
with dag:
    dag_start = DummyOperator(
        task_id="dag_start",
        dag=dag,
    )

    dag_end = DummyOperator(
        task_id="dag_end",
        dag=dag,
    )

    @task()
    def random_failed():            
        number = random.randrange(1, 3)
        print(number)
        if number == 2 or number == 3:
            # DagRun.set_state = "failed"?
            raise AirflowException("Unexpected number 2 or 3")

    task_dbt_run = KubernetesPodOperator(
        task_id="task_dbt_run",
        image=f"{image_url}",
        secrets=[secret_volume],
        cmds=["sh", "-c", "dbt run -s path:models/test --target pixel"],
        env_vars={
            "Airflow_Job_StartTime": "{{ ts }}"
        },
        name="task_dbt_run",
        is_delete_operator_pod=True,
        image_pull_policy="Always",
        in_cluster=True,
        get_logs=True,
        do_xcom_push=False,
        affinity=NODE_AFFINITY,
        dag=dag
    )

    dag_start >> random_failed() >> task_dbt_run >> dag_end

return dag

dag_id = "dag_test"

create_dag_by_common( dag_id=dag_id, create_dag_func=create_dag )


2. dag_rerun.py
```python
from datetime import datetime, timedelta
from airflow.configuration import conf
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash import BashOperator
from textwrap import dedent
from airflow.decorators import task
from airflow.kubernetes.secret import Secret
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s
import os

from common.airflow import create_dag_by_common
from common.kubernetes import (
    get_config
)

def create_dag(dag_id: str) -> DAG:

    # Define DAG args
    default_args = {
        "retries": 0,
        "retry_delay": timedelta(minutes=5),
    }

    dag = DAG(
        dag_id=dag_id,
        default_args=default_args,
        description="basic dag",
        doc_md="",
        schedule_interval="45 * * * *",
        start_date=datetime(2023, 9, 14),
        catchup=False,
        max_active_runs=2,
        max_active_tasks=10
    )
    with dag:
        dag_start = DummyOperator(
            task_id="dag_start",
            dag=dag,
        )

        dag_end = DummyOperator(
            task_id="dag_end",
            dag=dag,
        )

        templated_command = dedent(
        """        
        airflow dags backfill dag_test -s 2023-09-15T08:00:00 -e 2023-09-16T15:00:00 --rerun-failed-tasks --continue-on-failures --disable-retry
        """
        )

        rerun_task = BashOperator(
            task_id="rerun_task",
            depends_on_past=False,
            bash_command=templated_command
        )      

        dag_start >> rerun_task >> dag_end

    return dag

dag_id = "dag_rerun"

create_dag_by_common(
    dag_id=dag_id, create_dag_func=create_dag
)
  1. Result image (9)

What you think should happen instead

The backfill jobs should rerun failed Runs, and if rerun is still failed, we hope that can just mark Run failed status, and keep going on next job

How to reproduce

No response

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow 2.6.2 apache-airflow-providers-amazon 8.1.0 apache-airflow-providers-celery 3.2.0 apache-airflow-providers-cncf-kubernetes 7.0.0 apache-airflow-providers-common-sql 1.5.1 apache-airflow-providers-docker 3.7.0 apache-airflow-providers-elasticsearch 4.5.0 apache-airflow-providers-ftp 3.4.1 apache-airflow-providers-google 10.1.1 apache-airflow-providers-grpc 3.2.0 apache-airflow-providers-hashicorp 3.4.0 apache-airflow-providers-http 4.4.1 apache-airflow-providers-imap 3.2.1 apache-airflow-providers-microsoft-azure 6.1.1 apache-airflow-providers-mysql 5.1.0 apache-airflow-providers-odbc 3.3.0 apache-airflow-providers-postgres 5.5.0 apache-airflow-providers-redis 3.2.0 apache-airflow-providers-sendgrid 3.2.0 apache-airflow-providers-sftp 4.3.0 apache-airflow-providers-slack 7.3.0 apache-airflow-providers-snowflake 4.1.0 apache-airflow-providers-sqlite 3.4.1 apache-airflow-providers-ssh 3.7.0 google-cloud-orchestration-airflow 1.9.0 pytest-airflow 0.0.3

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

RNHTTR commented 1 year ago

I've been able to reproduce this. Interacting with a task via the database (e.g. clearing or marking the task as failed) during a backfill DAG run results in tasks no longer being scheduled.