Closed hkc-8010 closed 2 years ago
Thanks for opening your first issue here! Be sure to follow the issue template!
I faced the same issue with airflow 2.3.0rc2 Had a basic dag added.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(dag_id="map-reduce", start_date=datetime(2022,4,22)) as dag:
@task
def add_one(x: int):
return x + 1
@task
def sum_it(values):
total = sum(values)
print (f"Total was {total}")
added_values = add_one.expand(x=[1,2,'a'])
sum_it(added_values)
added_values_correct = add_one.expand(x=[1, 2, 3])
sum_it (added_values_correct)
Added scheduler logs in attachments. logs.txt
The deadlock issue is not the cause of the tasks being set to skipped -- the deadlock occurs when trying to.
@abhishekbhakat what error do you get with that? The log you included there doesn't show any error.
I experience exactly the same issue. Reducing the number of schedulers to one seems to have resolved the issue for my deployment, but now, tasks are scheduling a lot slower. I initially had 3 schedulers. Here are my new config variables:
[scheduler]
job_heartbeat_sec = 30
scheduler_heartbeat_sec = 5
num_runs = -1
scheduler_idle_sleep_time = 1
min_file_process_interval = 30
dag_dir_list_interval = 120
print_stats_interval = 240
pool_metrics_interval = 5
scheduler_health_check_threshold = 60
orphaned_tasks_check_interval = 300.0
scheduler_zombie_task_threshold = 300
catchup_by_default = True
max_tis_per_query = 512
use_row_level_locking = False
max_dagruns_to_create_per_loop = 100
max_dagruns_per_loop_to_schedule = 200
schedule_after_task_execution = True
parsing_processes = 2
file_parsing_sort_mode = modified_time
use_job_schedule = True
allow_trigger_in_future = False
dependency_detector = airflow.serialization.serialized_objects.DependencyDetector
trigger_timeout_check_interval = 15
run_duration = 41460
It seems that after having passed the use_row_level_locking
to False
, the problem has disappear on my side (with 3 schedulers). Maybe the doc should be updated because:
The short version is that users of PostgreSQL 10+ or MySQL 8+ are all ready to go -- you can start running as many copies of the scheduler as you like -- there is no further set up or config options needed. If you are using a different database please read on.
Again, some more information about my last comments, because my scheduler again crashed when I clean a lot of tasks. By the way, I am running behind a pgbouncer
Here is the failed log:
×Failed to clear task instances: "(psycopg2.errors.DeadlockDetected) deadlock detected DETAIL:
Process 22854 waits for ShareLock on transaction 778696725; blocked by process 21808. Process 21808 waits for ShareLock on transaction 778696547; blocked by process 22854.
HINT: See server log for query details.
CONTEXT: while updating tuple (3743,4) in relation "task_instance" [SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.task_id = %(task_instance_task_id)s AND task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.run_id = %(task_instance_run_id)s] [
parameters: ({'state': None, 'task_instance_task_id': 'some_tasks_name', 'task_instance_dag_id': 'some_tasks_name', 'task_instance_run_id': 'scheduled__2022-05-14T22:00:00+00:00'}, {'state': None, 'task_instance_task_id': 'some_tasks_name', 'task_instance_dag_id': 'some_dag_id', 'task_instance_run_id': 'scheduled__2022-04-29T00:00:00+00:00'}, {'state': None, 'task_instance_task_id': 'some_dag_id', 'task_instance_dag_id': 'some_dag_id', 'task_instance_run_id': 'scheduled__2022-05-19T00:00:00+00:00'})] (Background on this error at: http://sqlalche.me/e/13/e3q8)"
@V0lantis it would be great to see few more lines of the stacktrace to know where the exception is happening for you.
Also any chance you can take a look at the database log to see what queries are acccosiated with the two processes/transactions. From this we can only see on half of the problem.
What airflow version are you running? And are there any plugins/custom code interacting with airflow?
Thank you @tanelk for your quick reaction time ! Here is some answers to your question :
@V0lantis it would be great to see few more lines of the stacktrace to know where the exception is happening for you.
You can find here (just_scheduler_logs.log) the full stack trace of one of my scheduler with the issue discussed above.
Also any chance you can take a look at the database log to see what queries are acccosiated with the two >processes/transactions. From this we can only see on half of the problem.
I fortunately found it. (removing some logs because I found a lot of 2022-05-20 17:10:09 UTC:172.17.45.29(42506):airflow@airflow:[22919]:ERROR: duplicate key value violates unique constraint "variable_key_key"
, but with a control-f you can find the transaction_id
which is referenced from the scheduler logs given above.
Here are postgresql logs:
I am running on an Airflow 2.2.4. There are some custom plugins (not many though), simply plugins which are creating new dynamics dags from a given template. We are communicating to pg (pg 12.7 by the way) through pgbouncer.
Hope that will help understand what is the issue :+1:
Are these logs from where you have set use_row_level_locking
to False
?
The use_row_level_locking
is used to avoid this sort of issues when running multiple schedulers - these logs indicate that two schedulers are scheduling the same DAG runs and end up getting deadlocked. With row locking enabled this situation should not happen because each scheduler picks different DAG runs to look at.
The interesting situtuation is when use_row_level_locking
is True
and things still get deadlocked - that sounds like a bug on airflows part (or perhaps interaction with some other code you are running).
Are these logs from where you have set use_row_level_locking to False?
Indeed, it is ! My bad then for having set this param, thinking that postgres would allow it. Thanks for the help, and sorry if I wasted your time 🙏
It seems that after having passed the
use_row_level_locking
toFalse
, the problem has disappear on my side (with 3 schedulers). Maybe the doc should be updated because:
I would advise against doing this while running multiple schedulers -- if you do then it is entirely possible that Airflow will not correctly respect configured concurrency limits for DAGs/Tasks/Pools. Edit: oh, or it will crash
Yep, that's effectively what my Airflow deployment did. I misinterpreted the documentation, sorry.
The deadlock issue is not the cause of the tasks being set to skipped -- the deadlock occurs when trying to.
@abhishekbhakat what error do you get with that? The log you included there doesn't show any error.
@ashb The log file provided has that error in around line no. 5810
Attaching the error message below:
[[34m2022-04-29 12:25:18,883[0m] {[34mscheduler_job.py:[0m753} ERROR[0m - Exception when executing SchedulerJob._run_scheduler_loop[0m
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1685, in _execute_context
self.dialect.do_executemany(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 917, in do_executemany
self._psycopg2_extras().execute_batch(
File "/usr/local/lib/python3.9/site-packages/psycopg2/extras.py", line 1187, in execute_batch
cur.execute(b";".join(sqls))
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL: Process 29749 waits for ShareLock on transaction 62344; blocked by process 31075.
Process 31075 waits for ShareLock on transaction 62338; blocked by process 29749.
HINT: See server log for query details.
CONTEXT: while updating tuple (4,23) in relation "task_instance"
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute
self._run_scheduler_loop()
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 824, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 906, in _do_scheduling
callback_to_run = self._schedule_dag_run(dag_run, session)
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1146, in _schedule_dag_run
self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1176, in _verify_integrity_if_dag_changed
dag_run.verify_integrity(session=session)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 944, in verify_integrity
session.flush()
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3255, in flush
self._flush(objects)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3395, in _flush
transaction.rollback(_capture_exception=True)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3355, in _flush
flush_context.execute()
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 453, in execute
rec.execute(self)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 627, in execute
util.preloaded.orm_persistence.save_obj(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 234, in save_obj
_emit_update_statements(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 998, in _emit_update_statements
c = connection._execute_20(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1520, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 313, in _execute_on_connection
return connection._execute_clauseelement(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1389, in _execute_clauseelement
ret = self._execute_context(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1748, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1929, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1685, in _execute_context
self.dialect.do_executemany(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 917, in do_executemany
self._psycopg2_extras().execute_batch(
File "/usr/local/lib/python3.9/site-packages/psycopg2/extras.py", line 1187, in execute_batch
cur.execute(b";".join(sqls))
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL: Process 29749 waits for ShareLock on transaction 62344; blocked by process 31075.
Process 31075 waits for ShareLock on transaction 62338; blocked by process 29749.
HINT: See server log for query details.
CONTEXT: while updating tuple (4,23) in relation "task_instance"
[SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.task_id = %(task_instance_task_id)s AND task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.run_id = %(task_instance_run_id)s AND task_instance.map_index = %(task_instance_map_index)s]
I have been noticing this error as well. PostgreSQL 14 with Airflow 2.3.1. My scheduler containers keep crashing (I run two of them on two different nodes).
DETAIL: Process 7063 waits for ShareLock on transaction 3165652; blocked by process 7243.
Process 7243 waits for ShareLock on transaction 3165651; blocked by process 7063.
HINT: See server log for query details.
CONTEXT: while updating tuple (208084,20) in relation "task_instance"
[SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.task_id = %(task_instance_task_id)s AND task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.run_id = %(task_instance_run_id)s AND task_instance.map_index = %(task_instance_map_index)s]
I'm also receiving this error with a single LocalExecutor
scheduler running on Airflow 2.3.2.
Has anyone found an effective workaround for this?
2022-06-24 08:01:10,633] {scheduler_job.py:756} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1706, in _execute_context
cursor, statement, parameters, context
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 716, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL: Process 271343 waits for ShareLock on transaction 251928224; blocked by process 282010.
Process 282010 waits for ShareLock on transaction 251928207; blocked by process 271343.
HINT: See server log for query details.
CONTEXT: while updating tuple (474,18) in relation "task_instance"
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 739, in _execute
self._run_scheduler_loop()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 827, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 909, in _do_scheduling
callback_to_run = self._schedule_dag_run(dag_run, session)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1161, in _schedule_dag_run
dag_run.schedule_tis(schedulable_tis, session)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 1042, in schedule_tis
.update({TI.state: State.SCHEDULED}, synchronize_session=False)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3196, in update
execution_options={"synchronize_session": synchronize_session},
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1670, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1520, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 314, in _execute_on_connection
self, multiparams, params, execution_options
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1399, in _execute_clauseelement
cache_hit=cache_hit,
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1749, in _execute_context
e, statement, parameters, cursor, context
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1930, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1706, in _execute_context
cursor, statement, parameters, context
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 716, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL: Process 271343 waits for ShareLock on transaction 251928224; blocked by process 282010.
Process 282010 waits for ShareLock on transaction 251928207; blocked by process 271343.
HINT: See server log for query details.
CONTEXT: while updating tuple (474,18) in relation "task_instance"
[SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND (task_instance.task_id, task_instance.map_index) IN ((%(param_1_1_1)s, %(param_1_1_2)s))]
I'm also seeing this issue, albeit with a slightly different query:
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL: Process 16440 waits for ShareLock on transaction 788648981; blocked by process 16481.
Process 16481 waits for ShareLock on transaction 788648979; blocked by process 16440.
HINT: See server log for query details.
CONTEXT: while deleting tuple (0,25) in relation "dag"
<snip stack same as posted by others>
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL: Process 16440 waits for ShareLock on transaction 788648981; blocked by process 16481.
Process 16481 waits for ShareLock on transaction 788648979; blocked by process 16440.
HINT: See server log for query details.
CONTEXT: while deleting tuple (0,25) in relation "dag"
[SQL: DELETE FROM dag WHERE dag.dag_id IN (%(dag_id_1_1)s) RETURNING dag.dag_id]
[parameters: {'dag_id_1_1': 'Pipeline.DAG_NAME_REDACTED'}]
(Background on this error at: http://sqlalche.me/e/14/e3q8)
It happens pretty much every time I delete a dag. Only way I've found around it is to browse the dag runs, delete all but one of them, then delete the dag.
I'm running multiple schedulers, and use_row_locking = True
. Postgres 13.1.
Our version path was 2.2.4 -> 2.3.2 (we skipped 2.3.0 and 2.3.1 because of reasons). 2.2.4 is fine, 2.3.2 is not. Anecdotally, db load appears much higher in 2.3.2 - the box running postgres is pretty much flat out all the time, versus ~25% previously. I don't have hard numbers, because several things changed at once in our testing env (mea culpa), but I will at some point be upgrading another env, and I'll be watching closely.
N.B. I've tried shutting down all-but-one schedulers, and that also fixes it. 2 schedulers running: delete fails, as per above. 1 scheduler running, delete works.
Is row-locking broken somehow in 3.2.3?
Having this issue as well, we're only on 2.0.2.
Looking through some of the Airflow configs I'm wondering if it could be related to the file_parsing_sort_mode? It seems like the default behavior could cause the schedulers to always start at the same place when parsing dag files, and I could see that causing a deadlock at scale. I'm wondering if flipping to random_seeded_by_host
would solve it
This option doesn't seem to be availble in our version, does someone want to try it out for us 😃
Our version path was 2.2.4 -> 2.3.2 (we skipped 2.3.0 and 2.3.1 because of reasons). 2.2.4 is fine, 2.3.2 is not. Anecdotally, db load appears much higher in 2.3.2 - the box running postgres is pretty much flat out all the time, versus ~25% previously. I don't have hard numbers, because several things changed at once in our testing env (mea culpa), but I will at some point be upgrading another env, and I'll be watching closely.
Our DB load has increased a lot as well. I actually swapped from a Docker based postgresql (mounted volume), to installing it directly on a server instead to see if it would help. I also implemented pgbouncer and bunch of other changes, so I do not have any useful data or information due to a lot of troubleshooting and changes.
Does anyone know if it comes from Websersver/Schedulers or Workers? I know it's a difficult question but maybe someone happens to have log of queries before/after and could make a simple stat what has changed ?
We've made some progress in at least reducing the amount of deadlocks. We're running 2.0.2 on K8s and we've discovered the following:
We've been able to reduce deadlocks almost entirely simply by adding a startupProbe to the scheduler deployment in K8s and telling K8s to only roll out schedulers one at a time to avoid them starting at the same time. When they started at the same time all of them would deadlock and running tasks would get killed and rescheduled etc. Rolling out one at a time has almost entirely removed deadlocking, and the few times it does happen it's isolated to one scheduler where other schedulers can keep things moving
The fact it happens more frequently when starting schedulers at the same time makes me think it might be related to the file_parsing_sort_mode
I mentioned above. Since the default behavior is modified_time
, My theory is that all the schedulers are configured to "start" it's scheduler loop at the same place, which would naturally increase the chance of deadlocking
@ldacey @whitleykeith @argibbs @eitanme -> I spoke with some enlightened people :) (yeah talking about you @ashb and @bbovenzi ) -> and after the talk I have a hypothesis, that this is the new Grid view doing auto-refresh for a long running DAG.
There was a fix by @ashb https://github.com/apache/airflow/pull/24284 that is going to be released in 2.3.3 which decreases significantly a number of queries that are generated by the Grid view refresh. It's a huge improvement and might impact both - load on the DB and possibly memory usage of the webserver - especially if there are almost continuously running dags and a number of people leaves the browser open with "auto-refresh" on the Grid View.
Is there a way some of you could test the hypothesis and see if there might be a correlation (requires a bit of coordination what your users do).
(BTW. If that's it then Ash's fix is coming in 2.3.3).
Hah, spooky.
Without boring you with the details, I was running some-many-lots of DAGs from scratch today. I noticed that db load increased with the number of active dags and then experimented to see if it was the dags themselves, or just because I was watching them all.
Turning off auto-refresh seemed to massively improve db load, and I just came here to update the thread, only to find you've beaten me to it.
As an aside, what made me look at auto-refresh (@bbovenzi) was that the spinner is always spinning when auto-refresh is turned on. In the old view, the refresh dots would only animate every few seconds when it polled for an updated state. I don't know if the always-spinning thing means it's always refreshing, or if it's just meangingless.
But long story short, yes, auto-refresh smells fishy to me.
It's just always spinning but good point. I can fix that.
Is the DB load still bad with the latest on main? If needed I can look into other ways to improve the autorefresh performance.
I haven't tried with latest main, just 2.3.2; I tend to only run with the release builds, but I can try main if it'll help.
Another option @argibbs will be to apply the fix of @ashb to your installation - it's not much, and it should be cleanly applicable on top of your code by applying a patch/
This is "safer" (less changes) applied and gives better "proof" that this was the problem (and the good thing you could apply to production and let it run and "patch-fix-ti" without waiting for 2.3.3.
Just to follow up here on the deadlock issue I saw which, I think, is unrelated to the load issues associated with the new GridView though I'm happy to hear those will be fixed in 2.3.3.
Since I was only running one scheduler, I tried setting use_row_level_locking
to false and have not seen the psycopg2.errors.DeadlockDetected: deadlock detected
error since. I also experience the problem on both 2.2.4 and 2.3.2 and 2.2.4 didn't have the GridView.
Not sure if this helps and I'll try toggling the setting back once 2.3.3 is released to see if that has an impact, but wanted to give an update.
Also of note, I was experiencing this with only one LocalExecutor
scheduler when many tasks were kicked off at the same time so I don't think my issue has to do with multi-scheduler interaction. It's more related to overall task load.
I am also encountering this issue. I collected some details about both queries involved in the deadlock, hopefully this is helpful.
Deployment details:
In the deadlocks there is an UPDATE statement deadlocking with a SELECT ... FOR UPDATE.
Based on stack traces visible in the scheduler logs, the UPDATE originates from the main scheduler loop here: https://github.com/apache/airflow/blob/2.2.5/airflow/models/dagrun.py#L901-L910
Based on the database logs, the SELECT statement has the form:
SELECT task_instance.try_number AS task_instance_try_number, ...
FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
WHERE task_instance.dag_id = 'my_dag_id' AND task_instance.task_id = 'my_task_id' AND task_instance.run_id = 'sanitized_run_id_1'
LIMIT 1 FOR UPDATE
Searching the Airflow source code, the query that looks most similar to the SELECT from the database error is in TaskInstance.refresh_from_db()
:
https://github.com/apache/airflow/blob/2.2.5/airflow/models/taskinstance.py#L714-L736
Example scheduler logs showing the origins of the UPDATE statement:
[2022-07-06 18:54:29,456] {{scheduler_job.py:753}} INFO - Exited execute loop
Traceback (most recent call last):
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL: Process 99711 waits for ShareLock on transaction 527390121; blocked by process 100627.
Process 100627 waits for ShareLock on transaction 527390039; blocked by process 99711.
HINT: See server log for query details.
CONTEXT: while updating tuple (48513,18) in relation "task_instance"
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/airflow/venv-py3/bin/airflow", line 8, in <module>
sys.exit(main())
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/__main__.py", line 48, in main
args.func(args)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
_run_scheduler_job(args=args)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
job.run()
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 246, in run
self._execute()
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 726, in _execute
self._run_scheduler_loop()
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 807, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 890, in _do_scheduling
callback_to_run = self._schedule_dag_run(dag_run, session)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1147, in _schedule_dag_run
dag_run.schedule_tis(schedulable_tis, session)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/dagrun.py", line 909, in schedule_tis
.update({TI.state: State.SCHEDULED}, synchronize_session=False)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 4063, in update
update_op.exec_()
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
self._do_exec()
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
self._execute_stmt(update_stmt)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
return conn.execute(stmt, self._params)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
distilled_params,
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL: Process 99711 waits for ShareLock on transaction 527390121; blocked by process 100627.
Process 100627 waits for ShareLock on transaction 527390039; blocked by process 99711.
HINT: See server log for query details.
CONTEXT: while updating tuple (48513,18) in relation "task_instance"
[SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN (%(task_id_1)s)]
[parameters: {'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 'dag_id_1': 'sanitized_dag_id_1', 'run_id_1': 'sanitized_run_id_1', 'task_id_1': 'sanitized_task_id_1'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
Example Postgres logs showing a complete SELECT ... FOR UPDATE statement:
2022-07-06 18:54:25.816 UTC [100639] ERROR: deadlock detected
2022-07-06 18:54:25.816 UTC [100639] DETAIL: Process 100639 waits for ShareLock on transaction 527390039; blocked by process 99711.
Process 99711 waits for ShareLock on transaction 527390130; blocked by process 100639.
Process 100639: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_insta
Process 99711: UPDATE task_instance SET state='scheduled' WHERE task_instance.dag_id = 'sanitized_dag_id_2' AND task_instance.run_id = 'sanitized_run_id_2' AND task_instance.task_id IN ('sanitized_task_id_2', 'sanitized_task_id_3')
2022-07-06 18:54:25.816 UTC [100639] HINT: See server log for query details.
2022-07-06 18:54:25.816 UTC [100639] CONTEXT: while locking tuple (725,169) in relation "dag_run"
2022-07-06 18:54:25.816 UTC [100639] STATEMENT: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, [dag_run_1.id](http://dag_run_1.id/) AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash
FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
WHERE task_instance.dag_id = 'sanitized_dag_id_2' AND task_instance.task_id = 'sanitized_task_id_3' AND task_instance.run_id = 'sanitized_run_id_2'
LIMIT 1 FOR UPDATE
Unfortunately we are not able to repro this on a test instance so I have not been able to try on newer Airflow versions, but based on the discussion on this thread it sounds like the issue is present until at least 2.3.2.
Very useful Thanks. I will take a look at it shortly.
We have figured out the origins of the SELECT ... FOR UPDATE and a mechanism for the deadlocks.
The short story is it originates from the airflow run task
cli command inside task pods.
The SELECT does indeed originate from TaskInstance.refresh_from_db()
as suggested above. It is called as follows:
airflow/jobs/local_task_job.py:89 _execute
airflow/models/taskinstance.py:1184: check_and_change_state_before_execution
airflow/models/taskinstance.py:714 refresh_from_db(lock_for_update=True, session=session)
Line numbers both in the synopsis above and the stack trace below are for Airflow 2.2.5.
Stack traces including the SELECT statements can be found in failed pod logs, I have included one below:
Traceback (most recent call last):
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
psycopg2.errors.InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 298, in task_run
self._execute()
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1184, in check_and_change_state_before_execution
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/session.py", line 67, in wrapper
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 734, in refresh_from_db
for attempt in run_with_db_retries(logger=self.log):
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/tenacity/__init__.py", line 390, in __iter__
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/tenacity/__init__.py", line 368, in iter
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/tenacity/__init__.py", line 186, in reraise
File "/opt/python3.7/lib64/python3.7/concurrent/futures/_base.py", line 428, in result
return self.__get_result()
File "/opt/python3.7/lib64/python3.7/concurrent/futures/_base.py", line 384, in __get_result
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 736, in refresh_from_db
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3203, in __getitem__
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute
sqlalchemy.exc.InternalError: (psycopg2.errors.InFailedSqlTransaction) current transaction is aborted, commands ignored until end of transaction block
[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, [dag_run_1.id](http://dag_run_1.id/) AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND task_instance.run_id = %(run_id_1)s
LIMIT %(param_1)s FOR UPDATE]
[parameters: {'dag_id_1': 'sanitized_dag_name_1', 'task_id_1': 'sanitized_task_name_1', 'run_id_1': 'sanitized_run_id_1', 'param_1': 1}]
(Background on this error at: http://sqlalche.me/e/13/2j85)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute
cursor.execute(statement, parameters)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/airflow/venv-py3/bin/airflow", line 8, in <module>
sys.exit(main())
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/__main__.py", line 48, in main
args.func(args)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
_run_task_by_selected_method(args, dag, ti)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job
run_job.run()
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 246, in run
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/local_task_job.py", line 97, in _execute
external_executor_id=self.external_executor_id,
return func(*args, session=session, **kwargs)
self.refresh_from_db(session=session, lock_for_update=True)
return func(*args, **kwargs)
do = self.iter(retry_state=retry_state)
raise retry_exc.reraise()
raise self.last_attempt.result()
raise self._exception
ti: Optional[TaskInstance] = qry.with_for_update().first()
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3429, in first
ret = list(self[0:1])
return list(res)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
return self._execute_and_instances(context)
result = conn.execute(querycontext.statement, self._params)
return meth(self, multiparams, params)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
distilled_params,
e, statement, parameters, cursor, context
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
raise exception
cursor, statement, parameters, context
cursor.execute(statement, parameters)
FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
Regarding how the deadlock happens, as we know, we have two statements: UPDATE and SELECT ... FOR UPDATE that cause a deadlock. As described previously, the UPDATE statement is here https://github.com/apache/airflow/blob/2.2.5/airflow/models/dagrun.py#L903-L909. It needs to get a shared lock on matching rows of task_instance
and dag_run
tables in order to execute the update. However, it fails to do that. Before executing the schedule_tis
function, the scheduler job already gets the lock in the dag_run
table:
https://github.com/apache/airflow/blob/2.2.5/airflow/jobs/scheduler_job.py#L884
https://github.com/apache/airflow/blob/2.2.5/airflow/models/dagrun.py#L287
So it means the UPDATE statement is failing at acquiring the matching row lock of the task_instance
table.
In the meantime, the SELECT ... FOR UPDATE statement select ... from task_instance join dag_run ... for update
also needs a lock of the matching rows in both task_instance and dag_run tables. It first acquires such a lock of the task_instance
table, but then attempts to get the lock of the dag_run
table, which was already acquired by the UPDATE statement. Therefore, a deadlock happens.
Regarding how this might be fixed, the suspicion is TaskInstance.refresh_from_db()
only needs to lock the rows in the task_instance table, not the dag_run table. This implies the deadlocks might be worked around by replacing with_for_update()
with with_for_update(of=TaskInstance)
here https://github.com/apache/airflow/blob/2.2.5/airflow/models/taskinstance.py#L736, at least for PostgreSQL and Oracle.
cc: @ashb - I did not have a look at this yet (will shortly) , but maybe you can take a look and see if any of this ring a bell.
I'm able to reliably force a DeadlockDetected exception by creating a DAG with many (i.e. over a hundred) concurrent tasks, setting max_active_tasks_per_dag
to 75, setting parallelism
to 100, triggering the DAG, waiting a couple minutes, then deleting the DAG Run. Maybe this is further evidence to @dstaple's proposed workaround limiting the lock to the task_instance
table or at least a way to test if that workaround resolves this particular deadlock.
Thanks for REALLY detailed investigation @dstaple.
I finally had some time to take a look at this and I think your assesment was very correct.
However the solution you proposed is not good, because I think we DO want to run "SELECT FOR UPDATE" on DagRun table. The whole scheduling is based on the fact that DagRun row gets locked and no changes are happening to DagRun and any TaskInstances of that DagRun while Scheduler processes those task instances. And since local_task_run
potentially changes the state of the task instance it runs (that's why it locks it for update), if the whole task DagRun is currently "being processed" by any of the schedulers. we should hold-off with running the task before scheduler finishes this particular DagRun processing and releases the lock.
And in this case the "local_task_run" actually locks the DagRun table too (though I am not entirely sure why this is one thing that I do not understand completely - see below). So it does what it should but with one very little caveat - it locks the TaskInstance and DagRun in REVERSE ORDER comparing to what Scheduler does. This is actually the root cause of ALL Deadlocks (at least in Postgres, MySQL has it's own fair share of other kinds of deadlocks) - non-consistent order. The deadlock appears when two threads want two (or more) resources and gets lock on them in reverse order. This is actually the only reason for any kind of deadlocks and your investigation was really nicely showing what's going on.
The solution to that is simple - since we are going to get the DagRun lock in a moment anyway in "refresh_from_db", we should simply get the lock on DagRun table FIRST. This should fix the problem as we will then perform lock grabbing in the same sequence in scheduler and task_run - > first DagRun, then TaskInstance. This is what my proposed #25266 does.
The only thing I do not know is WHY the TaskInstance.refresh_from_db
actually does the JOIN query:
SELECT FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id ... FOR UPDATE
The original query in the code looks like this:
qry = session.query(TaskInstance).filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id == self.task_id,
TaskInstance.run_id == self.run_id,
TaskInstance.map_index == self.map_index,
)
if lock_for_update:
for attempt in run_with_db_retries(logger=self.log):
with attempt:
ti: Optional[TaskInstance] = qry.with_for_update().first()
And there is no obvious reason why the last line joins the dag_run table?
I hope someone else in this thread might shed some light on it, I have a suspicion, that SQLALchemy will add the join in case there is a ForeignKey with ONCASCADE with the dag_id (which we have) - but I could not find any reference or documentation that would point to such behaviour.
@RNHTTR - since you mentioned you can reproduce the issue - maybe you could apply my fix and see if it solves the problem (there is a bit of leap of faith with this change).
Not related to Deadlocks, however is it necessary use FOR UPDATE
lock rather than FOR NO KEY UPDATE
(doc)?
Not related to Deadlocks, however is it necessary use
FOR UPDATE
lock rather thanFOR NO KEY UPDATE
(doc)?
Can you do it cross-db (MySQL/Postgres/MsSQL?) and in sqlalchemy? @Taragolis ?
Also see the discussion I started today - https://lists.apache.org/thread/pgo1qxqsnzwg2do955961tkvxk76m8gw - we have enough trouble about "some" DB features not available in some databases. so we should strive for lowest-common-denominator I am afraid.
@potiuk Thanks a ton for the feedback!
Regarding why the dag_run table is locked when with_for_update()
is called on the TaskInstance table, I believe this is due to lazy='joined'
in the relationship between TaskInstance and DagRun:
https://github.com/apache/airflow/blob/2.2.5/airflow/models/taskinstance.py#L408
This behavior is described in the following sqlalchemy issue: https://github.com/sqlalchemy/sqlalchemy/issues/4100
The behavior was not changed after the above issue was filed, but the following warning was added to the SQLAlchemy documentation:
Using with_for_update in the context of eager loading relationships is not officially supported or recommended by SQLAlchemy and may not work with certain queries on various database backends. When with_for_update is successfully used with a query that involves joinedload(), SQLAlchemy will attempt to emit SQL that locks all involved tables. https://docs.sqlalchemy.org/en/14/orm/loading_relationships.html
Can you do it cross-db (MySQL/Postgres/MsSQL?) and in sqlalchemy? @Taragolis ?
I do not actually know about is it supported by other DB engine and is it has exactly the same behaviour.
Definitely it works with all modern PostgreSQL (9.3+) and sqlalchemy, basically we need to set key_share=True
.
One of the difference that with FOR NO KEY UPDATE
we can insert/delete new record if it referenced (FK) to locked value in other table, if use FOR UPDATE
then transaction would lock until row would released and this also could cause in some cases Deadlocks.
Might be FOR UPDATE
strictly required for HA in Airflow, unfortunately I do not check this yet (that why I ask). I recently have some performance issues when row level locks on and tasks access in templates to dag_run
and ti
on LocalExecutor.
The behavior was not changed after the above issue was filed, but the following warning was added to the SQLAlchemy documentation:
Ahhhhh. That Would indeed explain it. I tink then that my solution is actually the right approach :)
I do not actually know about is it supported by other DB engine and is it has exactly the same behaviour.
@Taragolis would be worth checking. The DagRun lock `SELECT FOR UPDATE SKIP LOCKED' is very much the "Key" (pun intended) to make multiple schedulers work and it also (as you can see) spilled a bit to mini-scheduler and task run" in form of just 'SELECT FOR UPDATE". The "SELECT FOR UPDATE SKIP LOCKED" is precisely the mechanism that allows multiple schedulers to run in parallel with basically no serialization and no "accidental state overrides".
And we need to make sure that it works - for MySQL 8 and Postgres, because this is our 'baseline". We cannot rely on Postgres-only features, though we would love to - I started some threads in the past mostly starting along the lines "we are fed--up with MySQL, let's dump it". See for example this "Elephant in the Room" thread at the devlist https://lists.apache.org/thread/dp78j0ssyhx62008lbtblrc856nbmlfb . The answer so far and the wisdom of crowd is "No, as much as we would like to, we cannot get rid of MySQL". And if you see the results of our Survey https://airflow.apache.org/blog/airflow-survey-2022/ - while Postgres is by far strongest (also because it is now the only supported DB for ALL managed Airlfow services), there are still ~ 20% of people who use MySQL (or MariaDB but we finally decided that we explicitly exclude MariaDB from supported databases and actively encourage people to migrate out if they use it).
So while I would love to start the dicsussion with "Can we use this Postgres feature ?". when we think about the product development, the question is "Is this feature supported in both Postgres AND MySQL 8+". If not - we won't even discuss it, because if we start actively using Postgres-only features to optimize stuff, we are going to impair our MySQL users and eventually we will implement things that only work for Postgres, and behaviours that will differ between Postgres and MySQL and we certainly do not want that. Something that looks good for you as a user (using Postgres only) might not be acceptable for product (with both Postgres and MySQL being supported).
I looked (very briefly) if similar feature exists in MySQL, and it seems no, but I did not look too much. But If you think it is worth considering and if you think it's good to think of it, starting with deeper investigation and justifying both - benefits and cross-db-portability is something I would advise you to start with :).
I think your question is phrased a bit wrongly:
is it necessary use FOR UPDATE lock rather than FOR NO KEY UPDATE ?
It should rather be:
"I see that we can use that new feature NO KEY in Postgres and also equivalent in MySQL. It has those and those benefits and we can make it cross-db - doc here, doc here". Is this good enough reason to switch to it ?
it locks the TaskInstance and DagRun in REVERSE ORDER comparing to what Scheduler does.
@potiuk I re-read your explanation and agree completely, the essence of the specific deadlock I did the deep-dive on above is TaskInstance.refresh_from_db
and DagRun.schedule_tis
both lock task_instance
and dag_run
, but in opposite orders, which causes a deadlock. I agree if we change TaskInstance.refresh_from_db
to guarantee the order, that should prevent deadlocks between those two statements.
Regarding @RNHTTR repro'ing a deadlock by deleting a running DAG run, it might be related, but it isn't exactly the same situation. At the minimum the deadlock forced by @RNHTTR probably involves a DELETE statement rather than an UPDATE. Indeed @argibbs reported deadlocks when deleting DAGs and they had DELETE statements for one of the queries (the other was not reported). It's possible that the "other query" in that case was the same SELECT ... FOR UPDATE from TaskInstance.refresh_from_db
I reported above, but I don't want to get ahead of myself here. (Of course it's still worth checking if #25266 resolves the DELETE deadlocks in case we're lucky and they're the same issue.)
Regarding with_for_update(of=TaskInstance)
I previously suggested, understanding you've rejected this as a potential solution, it's worth reporting that I internally forked Airflow 2.2.5 and added this change about 10 days ago and it completely eliminated the deadlocks between TaskInstance.refresh_from_db
and DagRun.schedule_tis
, which were occuring every 2h or so and have not occurred since. At the minimum this confirms our understanding above.
I can check your alternative proposed fix https://github.com/apache/airflow/pull/25266 in a staging environment but it may be a week or two before I can deploy more widely and conclusively report on whether or not it fixes the scheduler UPDATE deadlocking with the task instance SELECT ... FOR UPDATE above.
@dstaple - would be great if you check. I think we can merge it regardlless (it's super easy to revert) - so there is no problem with some later checking. I also was not sure if the DELETE issue is the same. It could be (and I have the scenario in my head):
DELETE DagRun with CASCADE on TI - first creates lock on the DagRun an only THEN an the TaskInstamce - very similarly to what Scheduler does.
And in this case the fix above should also help so @RNHTTR I'd appreciate checking it :)
I'm on it!
I have an update on the above.
We are running a fork of 2.2.5 with the fix from https://github.com/apache/airflow/pull/25312 cherry-picked and are no longer seeing the specific deadlock outlined above. We were seeing deadlocks every ~2h when running with vanilla Airflow 2.2.5, so this is a big improvement.
Other types of deadlocks are still possible, and we have observed a handful of these even with the fix to TaskInstance.refresh_from_db()
. An example deadlock which is present in 2.2.5 and not resolved by https://github.com/apache/airflow/pull/25312 can be seen in this Postgres log:
2022-08-23 17:06:35.347 UTC [262090] ERROR: deadlock detected
2022-08-23 17:06:35.347 UTC [262090] DETAIL: Process 262090 waits for ShareLock on transaction 366414866; blocked by process 56942.
Process 56942 waits for ExclusiveLock on tuple (1466,144) of relation 165112 of database 165033; blocked by process 56420.
Process 56420 waits for ShareLock on transaction 366414616; blocked by process 262090.
Process 262090: UPDATE task_instance SET state='scheduled' WHERE task_instance.dag_id = 'dummy_dag_id' AND task_instance.run_id = 'scheduled__2022-08-19T20:00:00+00:00' AND task_instance.task_id IN ('dummy_task_1', 'dummy_task_2', 'dummy_task_N')
Process 56942: UPDATE dag_run SET last_scheduling_decision='2022-08-23T16:59:29.934171+00:00'::timestamptz WHERE [dag_run.id](http://dag_run.id/) = 132182
Process 56420: UPDATE dag_run SET last_scheduling_decision='2022-08-23T16:59:29.934171+00:00'::timestamptz WHERE [dag_run.id](http://dag_run.id/) = 132182
2022-08-23 17:06:35.347 UTC [262090] HINT: See server log for query details.
2022-08-23 17:06:35.347 UTC [262090] CONTEXT: while updating tuple (56209,27) in relation "task_instance"
2022-08-23 17:06:35.347 UTC [262090] STATEMENT: UPDATE task_instance SET state='scheduled' WHERE task_instance.dag_id = 'dummy_dag_id' AND task_instance.run_id = 'scheduled__2022-08-19T20:00:00+00:00' AND task_instance.task_id IN ('dummy_task_1', 'dummy_task_2', 'dummy_task_N')
I believe it is also possible to get deadlocks when manually (via the UI) updating the state of large numbers of tasks, deleting DAG runs while they have many running tasks, and other invasive methods, but I don't have logs to show for that at the moment.
I think it will be difficult to completely eliminate deadlocks as long as we have situations where multiple tables need to be simultaneously locked, and we don't standardize the order in which such mutual locks are applied across the Airflow codebase. Brainstorming along these lines, one approach that could reduce the potential for deadlocks would be to favor database access via methods like TaskInstance.refresh_from_db()
and disfavor direct usage of the ORM elsewhere in the codebase. Then, if one is systematic about applying the minimal locks / controlling the order in which locks are applied in the methods where the ORM is directly used, all the downstream code will benefit.
Regardless I would like to stress that we were seeing deadlocks every ~2h and are now seeing them rarely and under different circumstances. So the fix to TaskInstance.refresh_from_db()
made a huge difference.
Just to follow up here on the deadlock issue I saw which, I think, is unrelated to the load issues associated with the new GridView though I'm happy to hear those will be fixed in 2.3.3.
Since I was only running one scheduler, I tried setting
use_row_level_locking
to false and have not seen thepsycopg2.errors.DeadlockDetected: deadlock detected
error since. I also experience the problem on both 2.2.4 and 2.3.2 and 2.2.4 didn't have the GridView.Not sure if this helps and I'll try toggling the setting back once 2.3.3 is released to see if that has an impact, but wanted to give an update.
Also of note, I was experiencing this with only one
LocalExecutor
scheduler when many tasks were kicked off at the same time so I don't think my issue has to do with multi-scheduler interaction. It's more related to overall task load.
Hi @eitanme, I am also using LocalExecutor and seeing deadlocks here and there. I am considering switching off use_row_level_locking
, just wondering if your experience has changed since your last post?
Thanks
Just to follow up here on the deadlock issue I saw which, I think, is unrelated to the load issues associated with the new GridView though I'm happy to hear those will be fixed in 2.3.3. Since I was only running one scheduler, I tried setting
use_row_level_locking
to false and have not seen thepsycopg2.errors.DeadlockDetected: deadlock detected
error since. I also experience the problem on both 2.2.4 and 2.3.2 and 2.2.4 didn't have the GridView. Not sure if this helps and I'll try toggling the setting back once 2.3.3 is released to see if that has an impact, but wanted to give an update. Also of note, I was experiencing this with only oneLocalExecutor
scheduler when many tasks were kicked off at the same time so I don't think my issue has to do with multi-scheduler interaction. It's more related to overall task load.Hi @eitanme, I am also using LocalExecutor and seeing deadlocks here and there. I am considering switching off
use_row_level_locking
, just wondering if your experience has changed since your last post?Thanks
Upgrading would be best, as this appears to have been fixed about a year ago in 2.3.4. Note that it's only safe to disable use_row_level_locking
if you're only using a single scheduler.
Just to follow up here on the deadlock issue I saw which, I think, is unrelated to the load issues associated with the new GridView though I'm happy to hear those will be fixed in 2.3.3. Since I was only running one scheduler, I tried setting
use_row_level_locking
to false and have not seen thepsycopg2.errors.DeadlockDetected: deadlock detected
error since. I also experience the problem on both 2.2.4 and 2.3.2 and 2.2.4 didn't have the GridView. Not sure if this helps and I'll try toggling the setting back once 2.3.3 is released to see if that has an impact, but wanted to give an update. Also of note, I was experiencing this with only oneLocalExecutor
scheduler when many tasks were kicked off at the same time so I don't think my issue has to do with multi-scheduler interaction. It's more related to overall task load.Hi @eitanme, I am also using LocalExecutor and seeing deadlocks here and there. I am considering switching off
use_row_level_locking
, just wondering if your experience has changed since your last post? ThanksUpgrading would be best, as this appears to have been fixed about a year ago in 2.3.4. Note that it's only safe to disable
use_row_level_locking
if you're only using a single scheduler.
Fwiw, I believe we have able to run without issue using the LocalExecutor
with use_row_level_locking
set to false
though as @RNHTTR mentions it's likely better to upgrade and see how that goes (we are still running 2.3.2 at the moment, so maybe we should upgrade as well).
Apache Airflow version
2.2.5 (latest released)
What happened
Customer has a dag that generates around 2500 tasks dynamically using a task group. While running the dag, a subset of the tasks (~1000) run successfully with no issue and (~1500) of the tasks are getting "skipped", and the dag fails. The same DAG runs successfully in Airflow v2.1.3 with same Airflow configuration.
While investigating the Airflow processes, We found that both the scheduler got restarted with below error during the DAG execution.
This issue seems to be related to #19957
What you think should happen instead
This issue was observed while running huge number of concurrent task created dynamically by a DAG. Some of the tasks are getting skipped due to restart of scheduler with Deadlock exception.
How to reproduce
DAG file:
Operating System
kubernetes cluster running on GCP linux (amd64)
Versions of Apache Airflow Providers
pip freeze | grep apache-airflow-providers
apache-airflow-providers-amazon==1!3.2.0 apache-airflow-providers-cncf-kubernetes==1!3.0.0 apache-airflow-providers-elasticsearch==1!2.2.0 apache-airflow-providers-ftp==1!2.1.2 apache-airflow-providers-google==1!6.7.0 apache-airflow-providers-http==1!2.1.2 apache-airflow-providers-imap==1!2.2.3 apache-airflow-providers-microsoft-azure==1!3.7.2 apache-airflow-providers-mysql==1!2.2.3 apache-airflow-providers-postgres==1!4.1.0 apache-airflow-providers-redis==1!2.0.4 apache-airflow-providers-slack==1!4.2.3 apache-airflow-providers-snowflake==2.6.0 apache-airflow-providers-sqlite==1!2.1.3 apache-airflow-providers-ssh==1!2.4.3
Deployment
Astronomer
Deployment details
Airflow v2.2.5-2 Scheduler count: 2 Scheduler resources: 20AU (2CPU and 7.5GB) Executor used: Celery Worker count : 2 Worker resources: 24AU (2.4 CPU and 9GB) Termination grace period : 2mins
Anything else
This issue happens in all the dag runs. Some of the tasks are getting skipped and some are getting succeeded and the scheduler fails with the Deadlock exception error.
Are you willing to submit PR?
Code of Conduct