apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.71k stars 14.21k forks source link

Tasks fail and do not log due to backend DB (dead?)lock #16982

Closed easontm closed 2 years ago

easontm commented 3 years ago

Apache Airflow version: 2.1.1

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.18

Environment:

What happened:

In an unpredictable fashion, some tasks are unable to start. They do not retry and they do not write to the shared log directory, but if I run kubectl logs <worker pod> while it sits in Error state afterward, I can see:

[2021-07-12 23:30:21,713] {dagbag.py:496} INFO - Filling up the DagBag from /usr/local/airflow/dags/foo/bar.py
Running <TaskInstance: foo_bar.my_task 2021-07-12T22:30:00+00:00 [queued]> on host <WORKER POD>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 206, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 259, in query
    _mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.7/dist-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 238, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
    run_job.run()
  File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/base_job.py", line 237, in run
    self._execute()
  File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/local_task_job.py", line 96, in _execute
    pool=self.pool,
  File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1023, in check_and_change_state_before_execution
    self.refresh_from_db(session=session, lock_for_update=True)
  File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 623, in refresh_from_db
    ti = qry.with_for_update().first()

<SQLALCHEMY TRACE OMITTED FOR BREVITY>

  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 259, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.execution_date = %s
 LIMIT %s FOR UPDATE]
[parameters: ('foobar', 'my_task', datetime.datetime(2021, 7, 12, 22, 30), 1)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
Full length log ``` [2021-07-12 23:30:21,713] {dagbag.py:496} INFO - Filling up the DagBag from /usr/local/airflow/dags/foo/bar.py Running on host Traceback (most recent call last): File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context cursor, statement, parameters, context File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute cursor.execute(statement, parameters) File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 206, in execute res = self._query(query) File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 319, in _query db.query(q) File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 259, in query _mysql.connection.query(self, query) MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction') The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/bin/airflow", line 8, in sys.exit(main()) File "/usr/local/lib/python3.7/dist-packages/airflow/__main__.py", line 40, in main args.func(args) File "/usr/local/lib/python3.7/dist-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, **kwargs) File "/usr/local/lib/python3.7/dist-packages/airflow/utils/cli.py", line 91, in wrapper return f(*args, **kwargs) File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 238, in task_run _run_task_by_selected_method(args, dag, ti) File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method _run_task_by_local_task_job(args, ti) File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job run_job.run() File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/base_job.py", line 237, in run self._execute() File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/local_task_job.py", line 96, in _execute pool=self.pool, File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", line 70, in wrapper return func(*args, session=session, **kwargs) File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1023, in check_and_change_state_before_execution self.refresh_from_db(session=session, lock_for_update=True) File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", line 67, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 623, in refresh_from_db ti = qry.with_for_update().first() File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", line 3429, in first ret = list(self[0:1]) File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", line 3203, in __getitem__ return list(res) File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", line 3535, in __iter__ return self._execute_and_instances(context) File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances result = conn.execute(querycontext.statement, self._params) File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1011, in execute return meth(self, multiparams, params) File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection return connection._execute_clauseelement(self, multiparams, params) File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement distilled_params, File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context e, statement, parameters, cursor, context File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception sqlalchemy_exception, with_traceback=exc_info[2], from_=e File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_ raise exception File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context cursor, statement, parameters, context File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute cursor.execute(statement, parameters) File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 206, in execute res = self._query(query) File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 319, in _query db.query(q) File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 259, in query _mysql.connection.query(self, query) sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction') [SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id FROM task_instance WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.execution_date = %s LIMIT %s FOR UPDATE] [parameters: ('foobar', 'my_task', datetime.datetime(2021, 7, 12, 22, 30), 1)] (Background on this error at: http://sqlalche.me/e/13/e3q8) ```

Afterward, the task is marked as Failed. The issue is transient, and tasks can be manually rerun to try again.

What you expected to happen:

If a lock cannot be obtained, it should exit more gracefully and reschedule.

How to reproduce it:

You can trigger the non-graceful task failure by manually locking the row and then trying to run the task -- it should work on any task.

  1. Connect to the MySQL instance backing Airflow
  2. SET autocommit = OFF;
  3. START TRANSACTION;
  4. Lock the row
    SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id
    FROM task_instance
    WHERE task_instance.dag_id = 'foobar'
    AND task_instance.task_id = 'my_task'
    AND task_instance.execution_date = '2021-07-12 00:00:00.000000'
    LIMIT 1 FOR UPDATE;
  5. Try to run the task via the UI.

Anything else we need to know:

Ideally deadlock doesn't ever occur and the task executes normally, however the deadlocks are seemingly random and I cannot replicate them. I hypothesized that somehow the scheduler was spinning up two worker pods at the same time, but if that were the case I would see two dead workers in Error state by performing kubectl get pods. Deadlock itself seems to occur on <1% of tasks, but it seems that deadlock itself consistently fails the task without retry.

boring-cyborg[bot] commented 3 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

brandondtb commented 3 years ago

@easontm Do you have sql_engine_collation_for_ids configured correctly for you database?

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html?highlight=configuration%20reference#sql-engine-collation-for-ids

BTW I agree that regardless of what is causing this lock timeout the scheduler should gracefully retry the task.

easontm commented 3 years ago

Sorry for the late reply, yes. Also as an update, I'm no longer experiencing deadlock on 2.1.4. However I haven't verified that if deadlock does occur, if the task has a more graceful exit.

easontm commented 2 years ago

I'm now (2.2.2) getting MySQL deadlocks from the worker itself, which fails the task.

File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 259, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, dag_run_1.id AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash
FROM task_instance INNER JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s
 LIMIT %s FOR UPDATE]
[parameters: ('FOO', 'BAR', 'scheduled__2021-11-30T17:01:00+00:00', 1)]
SamWheating commented 2 years ago

Hi @easontm - how many schedulers were you running when this error occurred?

We briefly tried running 6 schedulers in order to reduce scheduler latency (there's probably a better way to achieve this) and notices a large uptick in the volume of tasks failing in the same way (task failed with no logs, logs show a mySQL lock timeout on SELECT task_instance.try_number ... FOR UPDATE)

When we reduced the number of schedulers, these failures subsided, leading me to believe its some sort of weird concurrency issue or race condition related to multiple schedulers?

easontm commented 2 years ago

@SamWheating just checked my config file history -- we were actually running just one.

SamWheating commented 2 years ago

Interesting! thanks for confirming.

Anyways, we've been seeing this issue as well, with the following setup:

I'll continue investigating and let y'all know if we find anything useful.

SIvaCoHan commented 2 years ago

I meet the same issue.

and I am investigating on this

potiuk commented 2 years ago

I have a big request to anyone seing any deadlock issues. We are investigating some deadlocks and any extra information that can be provided might be helpful.

Can anyone who has similar issue, make a "github gist" with the logs of their deadlocks and post links here? Ideally, it should describe the situation (when it happened, what was going on) but also - if possible - contain server side logs that describe the lock, not only the client side?

Usually on the server side we have much more information - particularly what the deadlock was with. The problems with deadlocks is that they are usually caused by the "other query" which is not visible in the client logs, so having a server log when we see both queries/transaction details might help us to invesitgate it.

We would really appreciate it!

potiuk commented 2 years ago

@SamWheating @SIvaCoHan @easontm ^^

Pretty please :)

tulanowski commented 2 years ago

I experience the same problem. In the workers logs from last night I can see few dozen errors caused by deadlock. In the scheduler log I can see only one deadlock error, after which scheduler was restarted. Here is a gist containing the logs: https://gist.github.com/tulanowski/fcc8358bad3c8e5d15678639ec015d8b

Airflow version 2.2.0 MySQL 5.7.33 running on AWS RDS Celery executor with 6 workers 1 scheduler replica

Hope it helps!

easontm commented 2 years ago

Fortunately I haven't seen this behavior in some time. If it returns I'll gather what info I can.

potiuk commented 2 years ago

I experience the same problem. In the workers logs from last night I can see few dozen errors caused by deadlock. In the scheduler log I can see only one deadlock error, after which scheduler was restarted. Here is a gist containing the logs: https://gist.github.com/tulanowski/fcc8358bad3c8e5d15678639ec015d8b

Ah. I missed it let me take a look.

potiuk commented 2 years ago

I experience the same problem. In the workers logs from last night I can see few dozen errors caused by deadlock. In the scheduler log I can see only one deadlock error, after which scheduler was restarted. Here is a gist containing the logs: https://gist.github.com/tulanowski/fcc8358bad3c8e5d15678639ec015d8b

Airflow version 2.2.0 MySQL 5.7.33 running on AWS RDS Celery executor with 6 workers 1 scheduler replica

Hope it helps!

If I am not mistaken, it did :).

The (very likely IMHO) fix is in #21362.

Question @tulanowski - > would it be possible that you patch your Airflow instance with the fix (it's really simple) and see if it helps? It just passed all the tests in the latest fixup, so it should be safe to copy.

Also - just to confirm - do you happen to have some tasks that throw AirflowRescheduleException occasionally ?

Because IMHO this is when it could happen when you are unlucky and race condition happens.

tulanowski commented 2 years ago

Question @tulanowski - > would it be possible that you patch your Airflow instance with the fix (it's really simple) and see if it helps? It just passed all the tests in the latest fixup, so it should be safe to copy.

Yes, tomorrow I'll try to patch our staging instance and see if it helps. I'll let you know the result day after.

Also - just to confirm - do you happen to have some tasks that throw AirflowRescheduleException occasionally ?

Yes, actually quite often - we have big number of sensors using "reschedule" mode.

potiuk commented 2 years ago

Cool! Looking forward to it !

patrickbrady-xaxis commented 2 years ago

@potiuk to add some additional color, my team and I just patched what seemed to be a very similar issue to the OP (deadlocks during xcom table updates) on our own DB by adding a separate index to our xcom table on dag_id, task_id, and execution_date. A similar index had been present in 2.1.2 but was removed when we migrated to 2.1.4.

Our specific implementation runs thousands of the same dag_ids simultaneously with different configurations, so with only the primary key on dag_id, task_id, key, and execution_date, every xcom update query was only able to narrow as far as dag_id + task_id, leaving thousands of rows to scan for a matching execution_date. All of our tasks update xcom with status codes, and many of the tasks have similar run times across different dag runs, leading to large numbers of concurrent requests with execution_date as the only distinguishing factor.

Hope that might be helpful to anyone with a similar setup.

potiuk commented 2 years ago

@patrickbrady-xaxis.. It's an interesting one and possibly worth considering to add back. Maybe you can create an issue for that and describe it as this seems like a valid case.

Just for the future reference - when you move to 2.2 you will need to chang it to "run_id" instead of "execution_date".

tulanowski commented 2 years ago

Hi @potiuk! Finally I was able to test the patch. Unfortunately I still see big number of deadlock every night. Now I see only errors in workers, no deadlocks in scheduler. I created new git, which contain new logs, and contents of taskinstance.py file, which I used after applying the patch: https://gist.github.com/tulanowski/62219e040f356a6a15c7fee522530037

potiuk commented 2 years ago

Thanks. Really helpful1 I will look at this then. I believe the contention is still with Task Reschedule, but now it looks like the contention is with another query (possibly one that is not guarded by the same row-id). seems that the contention is with SELECT statement this time which might require another dead lock prevention mechanism.

The TaskReschedule relation with task_instance index seems to have further reach it seems - (and a bit unexpected/unforeseen in the initial design of the locks)

potiuk commented 2 years ago

@tulanowski -> did you maybe have a chance to upgrade to a more recent version of Airlfow (We are at 2.3.2 now) - are those problems/deadlocks still hapening?

tulanowski commented 2 years ago

Hi @potiuk! This week we upgraded staging instance to version 2.3.2. Unfortunately deadlock errors are still frequent.

potiuk commented 2 years ago

Echh...

potiuk commented 2 years ago

OK. I finally got into it. The reason is actually very simple as I found out.

The patch was supposed to work. BUT your MYSQL version does not support row level locking @tulanowski . You are using MySQL 5.7 and it simply does not have row-locking feature.

MySQL 8 is the first version that actually uses "SELECT FOR UPDATE SKIP_LOCKED functionality" and the problem is that this is what is needed to get mini-scheduler and multiple schedulers to work. And likely it might be cause of deadlock whe for example you use backfill in parallel to Airflow.

So the solution for you is to upgrade to MySQL 8.