apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.71k stars 14.21k forks source link

Triggerer process die with DB Deadlock #23639

Closed humit0 closed 2 years ago

humit0 commented 2 years ago

Apache Airflow version

2.2.5

What happened

When create many Deferrable operator (eg. TimeDeltaSensorAsync), triggerer component died because of DB Deadlock issue.

[2022-05-11 02:45:08,420] {triggerer_job.py:358} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> (ID 5397) starting
[2022-05-11 02:45:08,421] {triggerer_job.py:358} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> (ID 5398) starting
[2022-05-11 02:45:09,459] {triggerer_job.py:358} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> (ID 5400) starting
[2022-05-11 02:45:09,461] {triggerer_job.py:358} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> (ID 5399) starting
[2022-05-11 02:45:10,503] {triggerer_job.py:358} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> (ID 5401) starting
[2022-05-11 02:45:10,504] {triggerer_job.py:358} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> (ID 5402) starting
[2022-05-11 02:45:11,113] {triggerer_job.py:108} ERROR - Exception when executing TriggererJob._run_trigger_loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line 254, in query
    _mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line 106, in _execute
    self._run_trigger_loop()
  File "/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line 127, in _run_trigger_loop
    Trigger.clean_unused()
  File "/usr/local/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/models/trigger.py", line 91, in clean_unused
    session.query(TaskInstance).filter(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 4063, in update
    update_op.exec_()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
    self._do_exec()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
    self._execute_stmt(update_stmt)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
    self.result = self.query._execute_crud(stmt, self.mapper)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
    return conn.execute(stmt, self._params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line 254, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE task_instance SET trigger_id=%s WHERE task_instance.state != %s AND task_instance.trigger_id IS NOT NULL]
[parameters: (None, <TaskInstanceState.DEFERRED: 'deferred'>)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
[2022-05-11 02:45:11,118] {triggerer_job.py:111} INFO - Waiting for triggers to clean up
[2022-05-11 02:45:11,592] {triggerer_job.py:117} INFO - Exited trigger loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line 254, in query
    _mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.8/site-packages/airflow/__main__.py", line 48, in main
    args.func(args)
  File "/usr/local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/cli/commands/triggerer_command.py", line 56, in triggerer
    job.run()
  File "/usr/local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 246, in run
    self._execute()
  File "/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line 106, in _execute
    self._run_trigger_loop()
  File "/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line 127, in _run_trigger_loop
    Trigger.clean_unused()
  File "/usr/local/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/models/trigger.py", line 91, in clean_unused
    session.query(TaskInstance).filter(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 4063, in update
    update_op.exec_()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
    self._do_exec()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
    self._execute_stmt(update_stmt)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
    self.result = self.query._execute_crud(stmt, self.mapper)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
    return conn.execute(stmt, self._params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line 254, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE task_instance SET trigger_id=%s WHERE task_instance.state != %s AND task_instance.trigger_id IS NOT NULL]
[parameters: (None, <TaskInstanceState.DEFERRED: 'deferred'>)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)

What you think should happen instead

Triggerer processor does not raise Deadlock error.

How to reproduce

Create "test_timedelta" DAG and run it.

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.time_delta import TimeDeltaSensorAsync

default_args = {
    "owner": "user",
    "start_date": datetime(2021, 2, 8),
    "retries": 2,
    "retry_delay": timedelta(minutes=20),
    "depends_on_past": False,
}

with DAG(
    dag_id="test_timedelta",
    default_args=default_args,
    schedule_interval="10 11 * * *",
    max_active_runs=1,
    max_active_tasks=2,
    catchup=False,
) as dag:
    start =  DummyOperator(task_id="start")
    end = DummyOperator(task_id="end")
    for idx in range(800):
        tx = TimeDeltaSensorAsync(
            task_id=f"sleep_{idx}",
            delta=timedelta(days=3),
        )
        start >> tx >> end

Operating System

uname_result(system='Linux', node='d2845d6331fd', release='5.10.104-linuxkit', version='#1 SMP Thu Mar 17 17:08:06 UTC 2022', machine='x86_64', processor='')

Versions of Apache Airflow Providers

apache-airflow-providers-apache-druid | 2.3.3 apache-airflow-providers-apache-hive | 2.3.2 apache-airflow-providers-apache-spark | 2.1.3 apache-airflow-providers-celery | 2.1.3 apache-airflow-providers-ftp | 2.1.2 apache-airflow-providers-http | 2.1.2 apache-airflow-providers-imap | 2.2.3 apache-airflow-providers-jdbc | 2.1.3 apache-airflow-providers-mysql | 2.2.3 apache-airflow-providers-postgres | 4.1.0 apache-airflow-providers-redis | 2.0.4 apache-airflow-providers-sqlite | 2.1.3 apache-airflow-providers-ssh | 2.4.3

Deployment

Other Docker-based deployment

Deployment details

webserver: 1 instance scheduler: 1 instance worker: 1 instance (Celery) triggerer: 1 instance redis: 1 instance Database: 1 instance (mysql)

Anything else

webserver: 172.19.0.9 scheduler: 172.19.0.7 triggerer: 172.19.0.5 worker: 172.19.0.8

MYSQL (SHOW ENGINE INNODB STATUS;)

------------------------
LATEST DETECTED DEADLOCK
------------------------
2022-05-11 07:47:49 139953955817216
*** (1) TRANSACTION:
TRANSACTION 544772, ACTIVE 0 sec starting index read
mysql tables in use 1, locked 1
LOCK WAIT 7 lock struct(s), heap size 1128, 2 row lock(s)
MySQL thread id 20, OS thread handle 139953861383936, query id 228318 172.19.0.5 airflow_user updating
UPDATE task_instance SET trigger_id=NULL WHERE task_instance.state != 'deferred' AND task_instance.trigger_id IS NOT NULL

*** (1) HOLDS THE LOCK(S):
RECORD LOCKS space id 125 page no 231 n bits 264 index ti_state of table `airflow_db`.`task_instance` trx id 544772 lock_mode X locks rec but not gap
Record lock, heap no 180 PHYSICAL RECORD: n_fields 4; compact format; info bits 0
 0: len 6; hex 717565756564; asc queued;;
 1: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
 2: len 9; hex 736c6565705f323436; asc sleep_246;;
 3: len 30; hex 7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc scheduled__2022-05-09T11:10:00; (total 36 bytes);

*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 125 page no 47 n bits 128 index PRIMARY of table `airflow_db`.`task_instance` trx id 544772 lock_mode X locks rec but not gap waiting
Record lock, heap no 55 PHYSICAL RECORD: n_fields 28; compact format; info bits 0
 0: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
 1: len 9; hex 736c6565705f323436; asc sleep_246;;
 2: len 30; hex 7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc scheduled__2022-05-09T11:10:00; (total 36 bytes);
 3: len 6; hex 000000085001; asc     P ;;
 4: len 7; hex 01000001411e2f; asc     A /;;
 5: len 7; hex 627b6a250b612d; asc b{j% a-;;
 6: SQL NULL;
 7: SQL NULL;
 8: len 7; hex 72756e6e696e67; asc running;;
 9: len 4; hex 80000001; asc     ;;
 10: len 12; hex 643238343564363333316664; asc d2845d6331fd;;
 11: len 4; hex 726f6f74; asc root;;
 12: len 4; hex 8000245e; asc   $^;;
 13: len 12; hex 64656661756c745f706f6f6c; asc default_pool;;
 14: len 7; hex 64656661756c74; asc default;;
 15: len 4; hex 80000002; asc     ;;
 16: len 20; hex 54696d6544656c746153656e736f724173796e63; asc TimeDeltaSensorAsync;;
 17: len 7; hex 627b6a240472e0; asc b{j$ r ;;
 18: SQL NULL;
 19: len 4; hex 80000002; asc     ;;
 20: len 5; hex 80057d942e; asc   } .;;
 21: len 4; hex 80000001; asc     ;;
 22: len 4; hex 800021c7; asc   ! ;;
 23: len 30; hex 36353061663737642d363762372d343166382d383439342d636637333061; asc 650af77d-67b7-41f8-8494-cf730a; (total 36 bytes);
 24: SQL NULL;
 25: SQL NULL;
 26: SQL NULL;
 27: len 2; hex 0400; asc   ;;

*** (2) TRANSACTION:
TRANSACTION 544769, ACTIVE 0 sec updating or deleting
mysql tables in use 1, locked 1
LOCK WAIT 7 lock struct(s), heap size 1128, 4 row lock(s), undo log entries 2
MySQL thread id 12010, OS thread handle 139953323235072, query id 228319 172.19.0.8 airflow_user updating
UPDATE task_instance SET start_date='2022-05-11 07:47:49.745773', state='running', try_number=1, hostname='d2845d6331fd', job_id=9310 WHERE task_instance.task_id = 'sleep_246' AND task_instance.dag_id = 'test_timedelta' AND task_instance.run_id = 'scheduled__2022-05-09T11:10:00+00:00'

*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 125 page no 47 n bits 120 index PRIMARY of table `airflow_db`.`task_instance` trx id 544769 lock_mode X locks rec but not gap
Record lock, heap no 55 PHYSICAL RECORD: n_fields 28; compact format; info bits 0
 0: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
 1: len 9; hex 736c6565705f323436; asc sleep_246;;
 2: len 30; hex 7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc scheduled__2022-05-09T11:10:00; (total 36 bytes);
 3: len 6; hex 000000085001; asc     P ;;
 4: len 7; hex 01000001411e2f; asc     A /;;
 5: len 7; hex 627b6a250b612d; asc b{j% a-;;
 6: SQL NULL;
 7: SQL NULL;
 8: len 7; hex 72756e6e696e67; asc running;;
 9: len 4; hex 80000001; asc     ;;
 10: len 12; hex 643238343564363333316664; asc d2845d6331fd;;
 11: len 4; hex 726f6f74; asc root;;
 12: len 4; hex 8000245e; asc   $^;;
 13: len 12; hex 64656661756c745f706f6f6c; asc default_pool;;
 14: len 7; hex 64656661756c74; asc default;;
 15: len 4; hex 80000002; asc     ;;
 16: len 20; hex 54696d6544656c746153656e736f724173796e63; asc TimeDeltaSensorAsync;;
 17: len 7; hex 627b6a240472e0; asc b{j$ r ;;
 18: SQL NULL;
 19: len 4; hex 80000002; asc     ;;
 20: len 5; hex 80057d942e; asc   } .;;
 21: len 4; hex 80000001; asc     ;;
 22: len 4; hex 800021c7; asc   ! ;;
 23: len 30; hex 36353061663737642d363762372d343166382d383439342d636637333061; asc 650af77d-67b7-41f8-8494-cf730a; (total 36 bytes);
 24: SQL NULL;
 25: SQL NULL;
 26: SQL NULL;
 27: len 2; hex 0400; asc   ;;

*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 125 page no 231 n bits 264 index ti_state of table `airflow_db`.`task_instance` trx id 544769 lock_mode X locks rec but not gap waiting
Record lock, heap no 180 PHYSICAL RECORD: n_fields 4; compact format; info bits 0
 0: len 6; hex 717565756564; asc queued;;
 1: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
 2: len 9; hex 736c6565705f323436; asc sleep_246;;
 3: len 30; hex 7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc scheduled__2022-05-09T11:10:00; (total 36 bytes);

*** WE ROLL BACK TRANSACTION (1)

Airflow env

AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://airflow_user:airflow_pass@mysql/airflow_db
AIRFLOW__CORE__DEFAULT_TIMEZONE=KST
AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE=KST
AIRFLOW_HOME=/home/deploy/airflow
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=30
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__WEBSERVER__SECRET_KEY=aoiuwernholo
AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS=False
AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://airflow_user:airflow_pass@mysql/airflow_db

Are you willing to submit PR?

Code of Conduct

potiuk commented 2 years ago

@humit0 @andrewdanks @socar-humprey - we have a 2.3.3rc3 (see #24863) that is supposed to handle the Trigger deadlocks - is it possible that you test this version to see if the deadlocks has been fixe with it ? That would be great to get confirmation.