apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

Airflow-2.6.3 Mysql: Deadlock found when trying to get lock; try restarting transaction #35144

Closed liangriyu closed 12 months ago

liangriyu commented 1 year ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

2023-10-24 00:10:20,620 ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 835, in _execute
    self._run_scheduler_loop()
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 969, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 1051, in _do_scheduling
    callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/utils/retries.py", line 78, in wrapped_function
    for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/tenacity/__init__.py", line 347, in __iter__
    do = self.iter(retry_state=retry_state)
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/tenacity/__init__.py", line 314, in iter
    return fut.result()
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/utils/retries.py", line 87, in wrapped_function
    return func(*args, **kwargs)
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py", line 1355, in _schedule_all_dag_runs
    for dag_run in dag_runs:
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2901, in __iter__
    result = self._iter()
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2916, in _iter
    result = self.session.execute(
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1716, in execute
    conn = self._connection_for_bind(bind)
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1555, in _connection_for_bind
    return self._transaction._connection_for_bind(
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 724, in _connection_for_bind
    self._assert_active()
  File "/usr/local/miniconda3/envs/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 604, in _assert_active
    raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (mysql.connector.errors.InternalError) 1213 (40001): Deadlock found when trying to get lock; try restarting transaction
[SQL: UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s]
[parameters: ({'last_scheduling_decision': datetime.datetime(2023, 10, 23, 16, 10, 20, 153406), 'updated_at': datetime.datetime(2023, 10, 23, 16, 10, 20, 389092), 'dag_run_id': 154}, {'last_scheduling_decision': datetime.datetime(2023, 10, 23, 16, 10, 19, 835645), 'updated_at': datetime.datetime(2023, 10, 23, 16, 10, 20, 389109), 'dag_run_id': 157}, {'last_scheduling_decision': datetime.datetime(2023, 10, 23, 16, 10, 20, 81685), 'updated_at': datetime.datetime(2023, 10, 23, 16, 10, 20, 389115), 'dag_run_id': 158}, {'last_scheduling_decision': datetime.datetime(2023, 10, 23, 16, 10, 20, 116207), 'updated_at': datetime.datetime(2023, 10, 23, 16, 10, 20, 389119), 'dag_run_id': 159})]
(Background on this error at: https://sqlalche.me/e/14/2j85) (Background on this error at: https://sqlalche.me/e/14/7s2a)
2023-10-24 00:10:21,697 INFO - Sending Signals.SIGTERM to group 31151. PIDs of all processes in the group: [32813, 32842, 31151]
2023-10-24 00:10:21,698 INFO - Sending the signal Signals.SIGTERM to group 31151
2023-10-24 00:10:21,991 INFO - Process psutil.Process(pid=32813, status='terminated', started='00:10:20') (32813) terminated with exit code None
2023-10-24 00:10:22,084 INFO - Process psutil.Process(pid=32842, status='terminated', started='00:10:20') (32842) terminated with exit code None
2023-10-24 00:10:22,136 INFO - Process psutil.Process(pid=31151, status='terminated', exitcode=0, started='00:09:41') (31151) terminated with exit code 0
2023-10-24 00:10:22,138 INFO - Exited execute loop

What you think should happen instead

No response

How to reproduce

When multiple dag tasks fail simultaneously

Operating System

centos7

Versions of Apache Airflow Providers

apache-airflow 2.6.3 apache-airflow-providers-celery 3.2.1 apache-airflow-providers-common-sql 1.5.2 apache-airflow-providers-datadog 3.3.1 apache-airflow-providers-ftp 3.4.2 apache-airflow-providers-http 4.4.2 apache-airflow-providers-imap 3.2.2 apache-airflow-providers-mysql 5.1.1 apache-airflow-providers-redis 3.2.1 apache-airflow-providers-sqlite 3.4.2

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

Taragolis commented 1 year ago

How to reproduce When multiple dag tasks fail simultaneously

I've unable to reproduce with this description and simple DAG

import pendulum

from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator

for ix in range(1, 4):
    with DAG(
        f"issue_35144_dag_{ix}",
        start_date=pendulum.datetime(2023, 6, 1, tz="UTC"),
        schedule="@daily",
        catchup=True,
        max_active_runs=16,
        tags=["issue", "35144", f"no: {ix}"]
    ):
        @task
        def div(x):
            return x / 0

        div.expand(x=list(range(3))) >> EmptyOperator(task_id="empty", trigger_rule="all_done")

In my case (main branch, MySQL 8.0, ARM) it works without any deadlocks


Could you provide more details when it happen, some reproducible DAG example also would be nice, and what version of MySQL Database do you use?

langfu54 commented 1 year ago

had the exactly same problem , once the task instance increase ,the schelduler shut down.

Taragolis commented 1 year ago

once the task instance increase

You mean increased number of simultaneous Task Instances? If so, what numbers of TI are we talking about? 10-100-1000?

Any chance to get reproducible cases? Without it will be difficult to understand what exactly is the reason to that deadlocks.

Taragolis commented 1 year ago

Sorry I couldn't resist


X: Mom can we have a solution for that deadlock issues in Airflow on MySQL? Mom: We have solution at home

Solution at home ![image](https://github.com/apache/airflow/assets/3998685/422969e9-f861-4ae3-bf2d-c04d41c0ab94)
langfu54 commented 1 year ago

once the task instance increase

You mean increased number of simultaneous Task Instances? If so, what numbers of TI are we talking about? 10-100-1000? less than 100 Any chance to get reproducible cases? Without it will be difficult to understand what exactly is the reason to that deadlocks. i just installed airflow==2.6.3 and , less than 100 task start ,then scheduler shut sown with " Deadlock found when trying to get lock; try restarting transaction...."

the lock in db as follows: DELETE FROM rendered_task_instance_fields WHERE rendered_task_instance_fields.dag_id = 'ods_orderdbtotal_hourly' AND rendered_task_instance_fields.task_id = 'orderdbtotal_order_detail_promotion_tmp' AND ((rendered_task_instance_fields.dag_id, rendered_task_instance_fields.task_id, rendered_task_instance_fields.run_id) NOT IN (SELECT anon_1.dag_id, anon_1.task_id, anon_1.run_id FROM (SELECT DISTINCT rendered_task_instance_fields.dag_id AS dag_id, rendered_task_instance_fields.task_id AS task_id, rendered_task_instance_fields.run_id AS run_id, dag_run.execution_date AS execution_date FROM rendered_task_instance_fields INNER JOIN dag_run ON rendered_task_instance_fields.dag_id = dag_run.dag_id AND rendered_task_instance_fields.run_id = dag_run.run_id WHERE rendered_task_instance_fields.dag_id = 'ods_orderdbtotal_hourly' AND rendered_task_instance_fields.task_id = 'orderdbtotal_order_detail_promot

UPDATE dag_run SET last_scheduling_decision='2023-10-26 10:22:41.409798', updated_at='2023-10-26 10:22:41.599107' WHERE dag_run.id = 253

Taragolis commented 1 year ago

@langfu54 In your case you could try to set [core] max_num_rendered_ti_fields_per_task to 0. Set this value to 0 mean that worker/scheduler wouldn't try to cleanup rendered_task_instance_fields table by this query.

This PR also potentially could things better https://github.com/apache/airflow/pull/33527 (should be part of Airflow 2.8) but no guarantee

langfu54 commented 1 year ago

appreciate your help , it looks that , airflow works correctly now .

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] commented 12 months ago

This issue has been closed because it has not received response from the issue author.