apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.09k stars 14.29k forks source link

exceptions.DagRunNotFound: DagRun for example_bash_operator with run_id or execution_date of #23679

Closed pingzh closed 2 years ago

pingzh commented 2 years ago

Apache Airflow version

main (development)

What happened

trying to run airflow tasks run command locally and force StandardTaskRunner to use _start_by_exec instead of _start_by_fork

airflow tasks run example_bash_operator also_run_this scheduled__2022-05-08T00:00:00+00:00  --job-id 237 --local --subdir /Users/ping_zhang/airlab/repos/airflow/airflow/example_dags/example_bash_operator.py -f -i

However, it always errors out:

see https://user-images.githubusercontent.com/8662365/168164336-a75bfac8-cb59-43a9-b9f3-0c345c5da79f.png

[2022-05-12 12:08:32,893] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this Traceback (most recent call last):
[2022-05-12 12:08:32,893] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this   File "/Users/ping_zhang/miniforge3/envs/apache-***/bin/***", line 33, in <module>
[2022-05-12 12:08:32,893] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this     sys.exit(load_entry_point('apache-***', 'console_scripts', '***')())
[2022-05-12 12:08:32,893] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this   File "/Users/ping_zhang/airlab/repos/***/***/__main__.py", line 38, in main
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this     args.func(args)
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this   File "/Users/ping_zhang/airlab/repos/***/***/cli/cli_parser.py", line 51, in command
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this     return func(*args, **kwargs)
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this   File "/Users/ping_zhang/airlab/repos/***/***/utils/cli.py", line 99, in wrapper
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this     return f(*args, **kwargs)
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this   File "/Users/ping_zhang/airlab/repos/***/***/cli/commands/task_command.py", line 369, in task_run
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this     ti, _ = _get_ti(task, args.execution_date_or_run_id, args.map_index, pool=args.pool)
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this   File "/Users/ping_zhang/airlab/repos/***/***/utils/session.py", line 71, in wrapper
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this     return func(*args, session=session, **kwargs)
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this   File "/Users/ping_zhang/airlab/repos/***/***/cli/commands/task_command.py", line 152, in _get_ti
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this     dag_run, dr_created = _get_dag_run(
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this   File "/Users/ping_zhang/airlab/repos/***/***/cli/commands/task_command.py", line 112, in _get_dag_run
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this     raise DagRunNotFound(
[2022-05-12 12:08:32,894] {base_task_runner.py:109} INFO - Job 265: Subtask also_run_this ***.exceptions.DagRunNotFound: DagRun for example_bash_operator with run_id or execution_date of 'scheduled__2022-05-08T00:00:00+00:00' not found
[2022-05-12 12:08:33,014] {local_task_job.py:163} INFO - Task exited with return code 1
[2022-05-12 12:08:33,048] {local_task_job.py:265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2022-05-12 12:11:30,742] {taskinstance.py:1120} INFO - Dependencies not met for <TaskInstance: example_bash_operator.also_run_this scheduled__2022-05-08T00:00:00+00:00 [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state
[2022-05-12 12:11:30,743] {local_task_job.py:102} INFO - Task is not able to be run

i have checked the dag_run does exist in my db:

image

What you think should happen instead

No response

How to reproduce

pull the latest main branch with this commit: 7277122ae62305de19ceef33607f09cf030a3cd4

run airflow scheduler, webserver and worker locally with CeleryExecutor.

Operating System

Apple M1 Max, version: 12.2

Versions of Apache Airflow Providers

NA

Deployment

Other

Deployment details

on my local mac with latest main branch, latest commit: 7277122ae62305de19ceef33607f09cf030a3cd4

Anything else

Python version:

Python 3.9.7

Are you willing to submit PR?

Code of Conduct

pingzh commented 2 years ago

the root cause should be https://github.com/apache/airflow/blob/749e53def43055225a2e5d09596af7821d91b4ac/airflow/cli/commands/task_command.py#L106

it is very strange. in the airflow tasks run --local process, the dag_run can be correctly loaded but in the airflow tasks run --raw process, it returns None

image

pingzh commented 2 years ago

The session was not configed correctly. This is why it always complains DagRunNotFound:

image

pingzh commented 2 years ago

looks like the bug is introduced in this pr https://github.com/apache/airflow/pull/22284.

my airflow.cfg does not have [database] section, but i have sql_alchemy_conn under core. so when the StandardTaskRunner creates the tmp cfg file, sql_alchemy_conn is in both [database] and [core]. the value of sql_alchemy_conn in [database] is the sqlite, while sql_alchemy_conn in [core] has the my intended value, (mysql). then sql_alchemy_conn is get from [database], which is why it is sqlite instead of mysql

potiuk commented 2 years ago

AAARGH.

potiuk commented 2 years ago

Good diagnosis @pingzh . Confirmed!

pingzh commented 2 years ago

thanks @potiuk for quickly getting this fixed 👍