apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.71k stars 14.21k forks source link

Clearing a task flow function executed earlier with task changed to mapped task crashes scheduler #31431

Closed tirkarthi closed 1 year ago

tirkarthi commented 1 year ago

Apache Airflow version

main (development)

What happened

Clearing a task flow function executed earlier with task changed to mapped task crashes scheduler. It seems TaskMap stored has a foreign key reference by map_index which needs to be cleared before execution.

airflow scheduler       
/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/cli/cli_config.py:1001 DeprecationWarning: The namespace option in [kubernetes] has been moved to the namespace option in [kubernetes_executor] - the old setting has been used, but please update your config.
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py:196 DeprecationWarning: The '[celery] task_adoption_timeout' config option is deprecated. Please update your config to use '[scheduler] task_queued_timeout' instead.
/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py:201 DeprecationWarning: The worker_pods_pending_timeout option in [kubernetes] has been moved to the worker_pods_pending_timeout option in [kubernetes_executor] - the old setting has been used, but please update your config.
/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py:206 DeprecationWarning: The '[kubernetes_executor] worker_pods_pending_timeout' config option is deprecated. Please update your config to use '[scheduler] task_queued_timeout' instead.
[2023-05-19T23:41:07.907+0530] {executor_loader.py:114} INFO - Loaded executor: SequentialExecutor
[2023-05-19 23:41:07 +0530] [15527] [INFO] Starting gunicorn 20.1.0
[2023-05-19 23:41:07 +0530] [15527] [INFO] Listening at: http://[::]:8793 (15527)
[2023-05-19 23:41:07 +0530] [15527] [INFO] Using worker: sync
[2023-05-19 23:41:07 +0530] [15528] [INFO] Booting worker with pid: 15528
[2023-05-19T23:41:07.952+0530] {scheduler_job_runner.py:789} INFO - Starting the scheduler
[2023-05-19T23:41:07.952+0530] {scheduler_job_runner.py:796} INFO - Processing each file at most -1 times
[2023-05-19T23:41:07.954+0530] {scheduler_job_runner.py:1542} INFO - Resetting orphaned tasks for active dag runs
[2023-05-19 23:41:07 +0530] [15529] [INFO] Booting worker with pid: 15529
[2023-05-19T23:41:08.567+0530] {scheduler_job_runner.py:853} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 836, in _execute
    self._run_scheduler_loop()
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 970, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 1052, in _do_scheduling
    callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/retries.py", line 90, in wrapped_function
    for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/tenacity/__init__.py", line 382, in __iter__
    do = self.iter(retry_state=retry_state)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/tenacity/__init__.py", line 349, in iter
    return fut.result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/retries.py", line 99, in wrapped_function
    return func(*args, **kwargs)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 1347, in _schedule_all_dag_runs
    callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py", line 2811, in __iter__
    return self._iter().__iter__()
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py", line 2818, in _iter
    result = self.session.execute(
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1669, in execute
    conn = self._connection_for_bind(bind, close_with_result=True)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1519, in _connection_for_bind
    return self._transaction._connection_for_bind(
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 721, in _connection_for_bind
    self._assert_active()
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 601, in _assert_active
    raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.ForeignKeyViolation) update or delete on table "task_instance" violates foreign key constraint "task_map_task_instance_fkey" on table "task_map"
DETAIL:  Key (dag_id, task_id, run_id, map_index)=(bash_simple, get_command, manual__2023-05-18T13:54:01.345016+00:00, -1) is still referenced from table "task_map".

[SQL: UPDATE task_instance SET map_index=%(map_index)s, updated_at=%(updated_at)s WHERE task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.task_id = %(task_instance_task_id)s AND task_instance.run_id = %(task_instance_run_id)s AND task_instance.map_index = %(task_instance_map_index)s]
[parameters: {'map_index': 0, 'updated_at': datetime.datetime(2023, 5, 19, 18, 11, 8, 90512, tzinfo=Timezone('UTC')), 'task_instance_dag_id': 'bash_simple', 'task_instance_task_id': 'get_command', 'task_instance_run_id': 'manual__2023-05-18T13:54:01.345016+00:00', 'task_instance_map_index': -1}]
(Background on this error at: http://sqlalche.me/e/14/gkpj) (Background on this error at: http://sqlalche.me/e/14/7s2a)
[2023-05-19T23:41:08.572+0530] {scheduler_job_runner.py:865} INFO - Exited execute loop
Traceback (most recent call last):
  File "/home/karthikeyan/stuff/python/airflow/.env/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/__main__.py", line 48, in main
    args.func(args)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 51, in command
    return func(*args, **kwargs)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/cli.py", line 112, in wrapper
    return f(*args, **kwargs)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 77, in scheduler
    _run_scheduler_job(job_runner, skip_serve_logs=args.skip_serve_logs)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 42, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/job.py", line 284, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/job.py", line 313, in execute_job
    ret = execute_callable()
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 836, in _execute
    self._run_scheduler_loop()
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 970, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 1052, in _do_scheduling
    callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/retries.py", line 90, in wrapped_function
    for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/tenacity/__init__.py", line 382, in __iter__
    do = self.iter(retry_state=retry_state)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/tenacity/__init__.py", line 349, in iter
    return fut.result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/retries.py", line 99, in wrapped_function
    return func(*args, **kwargs)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 1347, in _schedule_all_dag_runs
    callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py", line 2811, in __iter__
    return self._iter().__iter__()
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py", line 2818, in _iter
    result = self.session.execute(
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1669, in execute
    conn = self._connection_for_bind(bind, close_with_result=True)
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1519, in _connection_for_bind
    return self._transaction._connection_for_bind(
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 721, in _connection_for_bind
    self._assert_active()
  File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 601, in _assert_active
    raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.ForeignKeyViolation) update or delete on table "task_instance" violates foreign key constraint "task_map_task_instance_fkey" on table "task_map"
DETAIL:  Key (dag_id, task_id, run_id, map_index)=(bash_simple, get_command, manual__2023-05-18T13:54:01.345016+00:00, -1) is still referenced from table "task_map".

[SQL: UPDATE task_instance SET map_index=%(map_index)s, updated_at=%(updated_at)s WHERE task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.task_id = %(task_instance_task_id)s AND task_instance.run_id = %(task_instance_run_id)s AND task_instance.map_index = %(task_instance_map_index)s]
[parameters: {'map_index': 0, 'updated_at': datetime.datetime(2023, 5, 19, 18, 11, 8, 90512, tzinfo=Timezone('UTC')), 'task_instance_dag_id': 'bash_simple', 'task_instance_task_id': 'get_command', 'task_instance_run_id': 'manual__2023-05-18T13:54:01.345016+00:00', 'task_instance_map_index': -1}]
(Background on this error at: http://sqlalche.me/e/14/gkpj) (Background on this error at: http://sqlalche.me/e/14/7s2a)

What you think should happen instead

No response

How to reproduce

  1. Create the dag with command = get_command(1, 1) and trigger a dagrun waiting for it to complete
  2. Now change this to command = get_command.partial(arg1=[1]).expand(arg2=[1, 2, 3, 4]) so that the task is now mapped.
  3. Clear the existing task that causes the scheduler to crash.
import datetime, time

from airflow.operators.bash import BashOperator
from airflow import DAG
from airflow.decorators import task

with DAG(
    dag_id="bash_simple",
    start_date=datetime.datetime(2022, 1, 1),
    schedule=None,
    catchup=False,
) as dag:

    @task
    def get_command(arg1, arg2):
        for i in range(10):
            time.sleep(1)
            print(i)

        return ["echo hello"]

    command = get_command(1, 1)
    # command = get_command.partial(arg1=[1]).expand(arg2=[1, 2, 3, 4])

    t1 = BashOperator.partial(task_id="bash").expand(bash_command=command)

if __name__ == "__main__":
    dag.test()

Operating System

Ubuntu

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

ephraimbuddy commented 1 year ago

Problem is, we are not cascading on update. Will work on it