apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.71k stars 14.21k forks source link

Issues with backilling with dynamic taks mapping over task group #34012

Open mrn-aglic opened 1 year ago

mrn-aglic commented 1 year ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

This issue happens in Airflow 2.6.1 with Postgres 13.

I have enabled the DAG, and the scheduled run executed successfully. However, backfilling causes a variety of issues. Here is the initial command I ran: airflow dags backfill stale_data_test --start-date 2023-07-01 --end-date 2023-08-01 -B

Sometimes I get the following:

[2023-09-01T12:51:02.784+0000] {dagbag.py:541} INFO - Filling up the DagBag from /usr/local/airflow/dags/sem/stale_data_test.py
[2023-09-01T12:51:02.785+0000] {dagbag.py:541} INFO - Filling up the DagBag from /usr/local/airflow/dags/sem/stale_data_test.py
[2023-09-01T12:51:02.787+0000] {local_executor.py:135} ERROR - Failed to execute task No map_index passed to mapped task.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/executors/local_executor.py", line 131, in _execute_work_in_fork
    args.func(args)
  File "/usr/local/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 51, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/cli.py", line 112, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/cli/commands/task_command.py", line 405, in task_run
    ti, _ = _get_ti(task, args.map_index, exec_date_or_run_id=args.execution_date_or_run_id, pool=args.pool)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/cli/commands/task_command.py", line 166, in _get_ti
    raise RuntimeError("No map_index passed to mapped task")
RuntimeError: No map_index passed to mapped task

Re-running the command, backfilling finished successfully. However, running for a different start and end date I got the issue again.

When running the backfill with this command: airflow dags backfill stale_data_test --start-date 2023-05-01 --end-date 2023-06-01 -B I have got this issue:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", line 912, in _execute
    self._execute_dagruns(
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", line 799, in _execute_dagruns
    processed_dag_run_dates = self._process_backfill_task_instances(
  File "/usr/local/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", line 695, in _process_backfill_task_instances
    run.update_state(session=session)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 572, in update_state
    info = self.task_instance_scheduling_decisions(session)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 719, in task_instance_scheduling_decisions
    schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
  File "/usr/local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 821, in _get_ready_tis
    new_tis = _expand_mapped_task_if_needed(schedulable)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 799, in _expand_mapped_task_if_needed
    expanded_tis, _ = ti.task.expand_mapped_task(self.run_id, session=session)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/abstractoperator.py", line 471, in expand_mapped_task
    session.flush()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
    _emit_update_statements(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1035, in _emit_update_statements
    raise orm_exc.StaleDataError(
sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'task_instance' expected to update 1 row(s); 0 were matched.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/astro/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.10/site-packages/airflow/__main__.py", line 48, in main
    args.func(args)
  File "/usr/local/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 51, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/cli.py", line 112, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py", line 139, in dag_backfill
    _run_dag_backfill(dags, args)
  File "/usr/local/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py", line 92, in _run_dag_backfill
    dag.run(
  File "/usr/local/lib/python3.10/site-packages/airflow/models/dag.py", line 2490, in run
    run_job(job=job, execute_callable=job_runner._execute)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/jobs/job.py", line 284, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/usr/local/lib/python3.10/site-packages/airflow/jobs/job.py", line 313, in execute_job
    ret = execute_callable()
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", line 941, in _execute
    session.commit()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1454, in commit
    self._transaction.commit(_to_root=self.future)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 830, in commit
    self._assert_active(prepared_ok=True)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 604, in _assert_active
    raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: UPDATE statement on table 'task_instance' expected to update 1 row(s); 0 were matched. (Background on this error at: https://sqlalche.me/e/14/7s2a)

This second issue might be related to this.

I have also tried to limit the number of max_active_dag_runs to 8, but then I got a deadlock (didn't reproduce at time of writing). Probably related to the issue here.

What you think should happen instead

The backfill should finish successfully without any issues.

How to reproduce

To reproduce I have made a simple DAG:

import pendulum
from airflow import DAG
from airflow.decorators import task_group
from airflow.models import DagRun
from airflow.operators.python import PythonOperator
from airflow.utils.types import DagRunType

from dags.include.helpers.dag_helpers import merge_with_default_args

_QUERY_INTERVAL_START_OFFSET = 14
_QUERY_INTERVAL_END_OFFSET = 2

def _get_start_end_dates(dag_run: DagRun, data_interval_end: pendulum.DateTime):
    if dag_run.run_type in [DagRunType.BACKFILL_JOB, DagRunType.MANUAL]:
        start_date = data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()
        end_date = data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()

        return [
            {
                "start_date": start_date.isoformat(),
                "end_date": end_date.isoformat(),
            }
        ]

    return [
        {
            "start_date": data_interval_end.subtract(days=i).date().isoformat(),
            "end_date": data_interval_end.subtract(days=i).date().isoformat(),
        }
        for i in range(_QUERY_INTERVAL_END_OFFSET, _QUERY_INTERVAL_START_OFFSET + 1)
    ]

def _get_insert_run_data(
    dag_run: DagRun,
    data_interval_end: pendulum.DateTime,
):
    current_date = data_interval_end.date().isoformat()
    return [
        {"current_date": current_date, **dates}
        for dates in _get_start_end_dates(dag_run, data_interval_end)
    ]

def _print(start_date: str, end_date: str, current_date: str):
    print(f"start_date: {start_date}")
    print(f"end_date: {end_date}")
    print(f"current_date: {current_date}")

with DAG(
    dag_id="stale_data_test",
    catchup=False,
    start_date=pendulum.datetime(2023, 6, 7),
    template_searchpath=[
        "/usr/local/airflow/dags/include",
        "/usr/local/airflow/dags/sem",
    ],
    default_args=merge_with_default_args(),
    schedule="0 6 * * *",  # At 06:00 UTC every day
    # max_active_runs=8,
    # max_active_tasks=8,
):
    get_run_data = PythonOperator(
        task_id="get_run_data",
        python_callable=_get_insert_run_data,
    )

    @task_group(group_id="insert_new_daily_data")
    def insert_new_daily_data(start_date: str, end_date: str, current_date: str):
        cleanup = PythonOperator(
            task_id="cleanup",
            python_callable=_print,
            op_kwargs={
                "start_date": start_date,
                "end_date": end_date,
                "current_date": current_date,
            },
        )

        insert = PythonOperator(
            task_id="insert",
            python_callable=_print,
            op_kwargs={
                "start_date": start_date,
                "end_date": end_date,
                "current_date": current_date,
            },
        )

        cleanup >> insert

    insert_new_daily_data.expand_kwargs(kwargs=get_run_data.output)

Then I have bashed into the docker container and run the following commands (after waiting each finished or interrupting in case of specific errors): 1 airflow dags backfill stale_data_test --start-date 2023-07-01 --end-date 2023-08-01 -B 2 airflow dags backfill stale_data_test --start-date 2023-07-01 --end-date 2023-08-01 -B 3 airflow dags backfill stale_data_test --start-date 2023-06-01 --end-date 2023-07-01 -B 4 airflow dags backfill stale_data_test --start-date 2023-05-01 --end-date 2023-06-01 -B

Try for more dates or re-running with --reset-dagruns -y.

Operating System

OS X (Linux in Docker)

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

Docker: Engine: 24.0.2 Compose: v2.19.1

Docker desktop: Version 4.21.1 (114176)

Astro CLI Version: 1.17.1

Anything else

These problems occur regularly.

Are you willing to submit PR?

Code of Conduct

ephraimbuddy commented 1 year ago

What I see is deadlock:

[2023-09-14T10:41:55.788+0000] {backfill_job_runner.py:686} WARNING - Deadlock discovered for ti_status.to_run=dict_values([<TaskInstance: stale_data_test.insert_new_daily_data.insert backfill__2023-07-30T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance: stale_data_test.insert_new_daily_data.insert backfill__2023-07-22T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance: stale_data_test.insert_new_daily_data.insert backfill__2023-07-19T06:00:00+00:00 map_index=0 [scheduled]>])
[2023-09-14T10:41:55.874+0000] {backfill_job_runner.py:416} INFO - [backfill progress] | finished run 0 of 31 | tasks waiting: 0 | succeeded: 16 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 3 | not ready: 3
[2023-09-14T10:41:55.889+0000] {local_executor.py:402} INFO - Shutting down LocalExecutor; waiting for running tasks to finish.  Signal again if you don't want to wait.
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 33, in <module>
    sys.exit(load_entry_point('apache-airflow', 'console_scripts', 'airflow')())
  File "/opt/airflow/airflow/__main__.py", line 59, in main
    args.func(args)
  File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/utils/cli.py", line 114, in wrapper
    return f(*args, **kwargs)
  File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/cli/commands/dag_command.py", line 153, in dag_backfill
    _run_dag_backfill(dags, args)
  File "/opt/airflow/airflow/cli/commands/dag_command.py", line 105, in _run_dag_backfill
    dag.run(
  File "/opt/airflow/airflow/models/dag.py", line 2671, in run
    run_job(job=job, execute_callable=job_runner._execute)
  File "/opt/airflow/airflow/utils/session.py", line 79, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/jobs/job.py", line 305, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/opt/airflow/airflow/jobs/job.py", line 334, in execute_job
    ret = execute_callable()
  File "/opt/airflow/airflow/utils/session.py", line 79, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/jobs/backfill_job_runner.py", line 949, in _execute
    raise BackfillUnfinished(err, ti_status)
airflow.exceptions.BackfillUnfinished: BackfillJob is deadlocked.
These tasks have succeeded:
DAG ID           Task ID       Run ID                                 Try number
---------------  ------------  -----------------------------------  ------------
stale_data_test  get_run_data  backfill__2023-07-16T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-17T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-18T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-19T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-20T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-21T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-22T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-23T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-24T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-25T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-26T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-27T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-28T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-29T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-30T06:00:00+00:00             1
stale_data_test  get_run_data  backfill__2023-07-31T06:00:00+00:00             1

These tasks are running:
DAG ID    Task ID    Run ID    Try number
--------  ---------  --------  ------------

These tasks have failed:
DAG ID    Task ID    Run ID    Try number
--------  ---------  --------  ------------

These tasks are skipped:
DAG ID    Task ID    Run ID    Try number
--------  ---------  --------  ------------

These tasks are deadlocked:
DAG ID           Task ID                       Run ID                                 Map Index    Try number
---------------  ----------------------------  -----------------------------------  -----------  ------------
stale_data_test  insert_new_daily_data.insert  backfill__2023-07-19T06:00:00+00:00            1             0
stale_data_test  insert_new_daily_data.insert  backfill__2023-07-22T06:00:00+00:00            1             0
stale_data_test  insert_new_daily_data.insert  backfill__2023-07-30T06:00:00+00:00            1             0
ephraimbuddy commented 1 year ago

Updated the DAG since the above has some issues:

import pendulum
from airflow import DAG
from airflow.decorators import task_group
from airflow.models import DagRun
from airflow.operators.python import PythonOperator
from airflow.utils.types import DagRunType

_QUERY_INTERVAL_START_OFFSET = 14
_QUERY_INTERVAL_END_OFFSET = 2

def _get_start_end_dates(dag_run: DagRun, data_interval_end: pendulum.DateTime):
    if dag_run.run_type in [DagRunType.BACKFILL_JOB, DagRunType.MANUAL]:
        start_date = data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()
        end_date = data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()

        return [
            {
                "start_date": start_date.isoformat(),
                "end_date": end_date.isoformat(),
            }
        ]

    return [
        {
            "start_date": data_interval_end.subtract(days=i).date().isoformat(),
            "end_date": data_interval_end.subtract(days=i).date().isoformat(),
        }
        for i in range(_QUERY_INTERVAL_END_OFFSET, _QUERY_INTERVAL_START_OFFSET + 1)
    ]

def _get_insert_run_data(
    dag_run: DagRun,
    data_interval_end: pendulum.DateTime,
):
    current_date = data_interval_end.date().isoformat()
    return [
        {"current_date": current_date, **dates}
        for dates in _get_start_end_dates(dag_run, data_interval_end)
    ]

def _print(start_date: str, end_date: str, current_date: str):
    print(f"start_date: {start_date}")
    print(f"end_date: {end_date}")
    print(f"current_date: {current_date}")

with DAG(
    dag_id="stale_data_test",
    catchup=False,
    start_date=pendulum.datetime(2023, 6, 7),
    schedule="0 6 * * *",
):
    get_run_data = PythonOperator(
        task_id="get_run_data",
        python_callable=_get_insert_run_data,
    )

    @task_group(group_id="insert_new_daily_data")
    def insert_new_daily_data(start_date: str, end_date: str, current_date: str):
        cleanup = PythonOperator(
            task_id="cleanup",
            python_callable=_print,
            op_kwargs={
                "start_date": start_date,
                "end_date": end_date,
                "current_date": current_date,
            },
        )

        insert = PythonOperator(
            task_id="insert",
            python_callable=_print,
            op_kwargs={
                "start_date": start_date,
                "end_date": end_date,
                "current_date": current_date,
            },
        )

        cleanup >> insert

    insert_new_daily_data.expand_kwargs(kwargs=get_run_data.output)