apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.19k stars 14.32k forks source link

`dag.test` unable to run after encountering an issue due to a DB error #41119

Closed Dev-iL closed 3 months ago

Dev-iL commented 3 months ago

Apache Airflow version

2.9.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

An exception raised while evaluating dag.test makes it impossible to run dag.test again w/o first reinitializing the DB. The error encountered in any subsequent runs is:

sqlalchemy.exc.NoReferencedTableError: Foreign key associated with column 'dag_run_note.user_id' could not find table 'ab_user' with which to generate a foreign key to target column 'id'

The workaround to enable running the DAG again is to do:

airflow db reset -y && airflow db migrate

See logs of two consecutive runs in the details below.

What you think should happen instead?

The DAG/task should be evaluated again (i.e. same behavior as the 1st run).

How to reproduce

Save the below DAG definition as a script, then run it twice in a row:

import pendulum
from airflow import DAG
from airflow.decorators import task

with DAG(
    "bug_example",
    start_date=pendulum.datetime(2020, 1, 1, tz="US/Eastern"),
    schedule="@daily",
    catchup=False,
) as dag:

    @task.python(
        run_as_user="somename",
    )
    def some_task() -> None:
        raise ValueError("This is a bug")

    some_task()

if __name__ == "__main__":

    dag.test(
        execution_date=pendulum.datetime(2024, 7, 25),
    )

Operating System

Rocky Linux 9.3 (Blue Onyx)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.26.0
apache-airflow-providers-common-io==1.3.2
apache-airflow-providers-common-sql==1.14.2
apache-airflow-providers-fab==1.2.1
apache-airflow-providers-ftp==3.10.0
apache-airflow-providers-http==4.12.0
apache-airflow-providers-imap==3.6.1
apache-airflow-providers-smtp==1.7.1
apache-airflow-providers-sqlite==3.8.1

Deployment

Virtualenv installation

Deployment details

  1. Created a clean venv (python 3.9.18)
  2. Installed apache-airflow[amazon] (to silence import errors in other dags)

Anything else?

This issue observed in 2.9.0 as well.

Log of the 1st run ```none [2024-07-30T08:34:17.487-0400] {dag.py:4199} INFO - dagrun id: bug_example[2024-07-30T08:42:08.842-0400] {dag.py:4199} INFO - dagrun id: bug_example [2024-07-30T08:42:08.866-0400] {dag.py:4215} INFO - created dagrun [2024-07-30T08:42:08.894-0400] {dag.py:4161} INFO - [DAG TEST] starting task_id=some_task map_index=-1 [2024-07-30T08:42:08.895-0400] {dag.py:4164} INFO - [DAG TEST] running task [2024-07-30 08:42:09,620] {taskinstance.py:2648} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='bug_example' AIRFLOW_CTX_TASK_ID='some_task' AIRFLOW_CTX_EXECUTION_DATE='2024-07-25T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-07-25T00:00:00+00:00' [2024-07-30T08:42:09.620-0400] {taskinstance.py:2648} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='bug_example' AIRFLOW_CTX_TASK_ID='some_task' AIRFLOW_CTX_EXECUTION_DATE='2024-07-25T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-07-25T00:00:00+00:00' [2024-07-30T08:42:09.622-0400] {taskinstance.py:430} INFO - ::endgroup:: [2024-07-30T08:42:09.622-0400] {taskinstance.py:441} INFO - ::group::Post task execution logs [2024-07-30 08:42:09,622] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/decorators/base.py", line 265, in execute return_value = super().execute(context) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/operators/python.py", line 235, in execute return_value = self.execute_callable() File "/.venv-bug/lib64/python3.9/site-packages/airflow/operators/python.py", line 252, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/airflow/dags/minimal_example.py", line 16, in some_task raise ValueError("This is a bug") ValueError: This is a bug [2024-07-30T08:42:09.622-0400] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/decorators/base.py", line 265, in execute return_value = super().execute(context) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/operators/python.py", line 235, in execute return_value = self.execute_callable() File "/.venv-bug/lib64/python3.9/site-packages/airflow/operators/python.py", line 252, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/airflow/dags/minimal_example.py", line 16, in some_task raise ValueError("This is a bug") ValueError: This is a bug [2024-07-30T08:42:09.628-0400] {taskinstance.py:1206} INFO - Marking task as FAILED. dag_id=bug_example, task_id=some_task, run_id=manual__2024-07-25T00:00:00+00:00, execution_date=20240725T000000, start_date=, end_date=20240730T124209 [2024-07-30T08:42:09.646-0400] {dag.py:2925} ERROR - Task failed; ti= Traceback (most recent call last): File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/dag.py", line 2923, in test _run_task(ti=ti, inline_trigger=not triggerer_running, session=session) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/dag.py", line 4165, in _run_task ti._run_raw_task(session=session, raise_on_defer=inline_trigger) File "/.venv-bug/lib64/python3.9/site-packages/airflow/utils/session.py", line 76, in wrapper return func(*args, **kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 2479, in _run_raw_task self._execute_task_with_callbacks(context, test_mode, session=session) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 2676, in _execute_task_with_callbacks result = self._execute_task(context, task_orig) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 2701, in _execute_task return _execute_task(self, context, task_orig) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/decorators/base.py", line 265, in execute return_value = super().execute(context) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/operators/python.py", line 235, in execute return_value = self.execute_callable() File "/.venv-bug/lib64/python3.9/site-packages/airflow/operators/python.py", line 252, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/airflow/dags/minimal_example.py", line 16, in some_task raise ValueError("This is a bug") ValueError: This is a bug [2024-07-30T08:42:09.652-0400] {dagrun.py:819} ERROR - Marking run failed [2024-07-30T08:42:09.652-0400] {dagrun.py:901} INFO - DagRun Finished: dag_id=bug_example, execution_date=2024-07-25 00:00:00+00:00, run_id=manual__2024-07-25T00:00:00+00:00, run_start_date=2024-07-25 00:00:00+00:00, run_end_date=2024-07-30 12:42:09.652401+00:00, run_duration=477729.652401, state=failed, external_trigger=False, run_type=manual, data_interval_start=2024-07-23 04:00:00+00:00, data_interval_end=2024-07-24 04:00:00+00:00, dag_hash=None ```
Log of the 2nd run ```none [2024-07-30T08:43:36.170-0400] {dag.py:4199} INFO - dagrun id: bug_example Traceback (most recent call last): File "/airflow/dags/minimal_example.py", line 22, in dag.test( File "/.venv-bug/lib64/python3.9/site-packages/airflow/utils/session.py", line 79, in wrapper return func(*args, session=session, **kwargs) File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/dag.py", line 2890, in test dr: DagRun = _get_or_create_dagrun( File "/.venv-bug/lib64/python3.9/site-packages/airflow/models/dag.py", line 4205, in _get_or_create_dagrun session.commit() File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/orm/session.py", line 1454, in commit self._transaction.commit(_to_root=self.future) File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/orm/session.py", line 832, in commit self._prepare_impl() File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl self.session.flush() File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/orm/session.py", line 3449, in flush self._flush(objects) File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/orm/session.py", line 3589, in _flush transaction.rollback(_capture_exception=True) File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush flush_context.execute() File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute rec.execute(self) File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 667, in execute util.preloaded.orm_persistence.delete_obj( File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 330, in delete_obj table_to_mapper = base_mapper._sorted_tables File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 1184, in __get__ obj.__dict__[self.__name__] = result = self.fget(obj) File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/orm/mapper.py", line 3386, in _sorted_tables sorted_ = sql_util.sort_tables( File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/sql/ddl.py", line 1217, in sort_tables for (t, fkcs) in sort_tables_and_constraints( File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/sql/ddl.py", line 1289, in sort_tables_and_constraints filtered = filter_fn(fkc) File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/sql/ddl.py", line 1207, in _skip_fn if skip_fn(fk): File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/orm/mapper.py", line 3369, in skip dep = table_to_mapper.get(fk.column.table) File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 1113, in __get__ obj.__dict__[self.__name__] = result = self.fget(obj) File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/sql/schema.py", line 2532, in column return self._resolve_column() File "/.venv-bug/lib64/python3.9/site-packages/sqlalchemy/sql/schema.py", line 2543, in _resolve_column raise exc.NoReferencedTableError( sqlalchemy.exc.NoReferencedTableError: Foreign key associated with column 'dag_run_note.user_id' could not find table 'ab_user' with which to generate a foreign key to target column 'id' ```

Are you willing to submit PR?

Code of Conduct

Dev-iL commented 3 months ago

I guess it's a duplicate of https://github.com/apache/airflow/issues/34109, but also observed on 2.9.3.