apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.26k stars 13.78k forks source link

Improve handling of Rendered Task Instance Fields #32342

Open potiuk opened 1 year ago

potiuk commented 1 year ago

Body

The current way how Rendered Task Instance Fields are handled (deleting old records and storing even big rendered task instance fields in the table) introduce a number of problems:

Those lead to workarounds such as #18616 and #32341

There are other reasons why big rendered fields should also be handled differently:

There was a discussion about deprecating it - that had never been concluded - but some good ideas and reasons why we can't just deprecate it have been explained there:

https://lists.apache.org/thread/jfzxq6xf8z5ss11hjl1yn41dd4mvk7cm

It would be great if someone picks up the discussion, lead it to conclusions and implement it.

Committer

EnableServices commented 3 weeks ago

We regularly suffer from deadlocks on mapped tasks - they always fail with the same SQL update statement (note this is on Airflow 2.9.1 but was present on 2.8.0):

[2024-06-07, 02:25:34 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
    self.dialect.do_execute(
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/MySQLdb/connections.py", line 261, in query
    _mysql.connection.query(self, query)
MySQLdb.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2479, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode, session=session)
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2637, in _execute_task_with_callbacks
    RenderedTaskInstanceFields.write(rtif)
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/airflow/utils/session.py", line 78, in wrapper
    with create_session() as session:
  File "/usr/lib/python3.11/contextlib.py", line 144, in __exit__
    next(self.gen)
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/airflow/utils/session.py", line 39, in create_session
    session.commit()
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1454, in commit
    self._transaction.commit(_to_root=self.future)
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 832, in commit
    self._prepare_impl()
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl
    self.session.flush()
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
    _emit_update_statements(
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 1001, in _emit_update_statements
    c = connection._execute_20(
        ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
    self._handle_dbapi_exception(
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
    self.dialect.do_execute(
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/opt/pyenvs/airflow/lib/python3.11/site-packages/MySQLdb/connections.py", line 261, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE rendered_task_instance_fields SET rendered_fields=%s WHERE rendered_task_instance_fields.dag_id = %s AND rendered_task_instance_fields.task_id = %s AND rendered_task_instance_fields.run_id = %s AND rendered_task_instance_fields.map_index = %s]
[parameters: ('{"templates_dict": null, "op_args": ["e2dc418f-7af8-4660-8be4-87b61c8fa57e"], "op_kwargs": {}}', 'epicor_salesorders', 'load_group.create_lines', 'scheduled__2024-06-07T02:23:49.136990+00:00', 3)]
(Background on this error at: https://sqlalche.me/e/14/e3q8)

@potiuk you hint that the size of the update may have a bearing on whether the issue is triggered or not. Is there anything a DAG author can do to avoid triggering the deadlock, to your knowledge?

Thanks, Jon

potiuk commented 2 weeks ago

Generally speaking - before someone have time and spend effort on solving that one - one of the ways how to solve this is to move to Postgres. Those kinds of deadlocks and lock times are ONLY happening on MYSQL and we had numerous examples of this. Simply MySQL tends to lock more things when running queries that are necessary for the query to succeed and it causes those kind of problems. That's our experience so far at least.

So if somoene experiencing it can move to Postgres and can confirm that it solved the problem, that would be a great input to our Airflow 3 discussions where we discuss about dropping MySQL in Airflow 3.

EnableServices commented 2 weeks ago

Thank you @potiuk

Personally, as attractive as Postgres is for various reasons, including this one, it is challenging to deploy Postgres in a way that fails over elegantly. With MySQL, replication configurations make fail over and fail back a simpler job. For that reason alone we would be very disappointed if there were moves to stop supporting MySQL in version 3 of Airflow.

We run various different applications that utilise MySQL as a backend, with our MySQL servers regularly showing an average of over 2000 queries per second over a period of months. My initial feeling is that it should be possible to solve this problem without taking such significant action as changing the database backend.

Note that Uber Engineering and Airbnb themselves are heavy users of MySQL (or RDS in AWS), the former having migrated away from Postgres for reasons of scale - perhaps a lesson there if Airflow is to retain its widest appeal.

I will attempt to uncover some more useful detail about this issue and report back.

Thanks again, Jon

potiuk commented 2 weeks ago

Note that Uber Engineering and Airbnb themselves are heavy users of MySQL (or RDS in AWS), the former having migrated away from Postgres for reasons of scale - perhaps a lesson there if Airflow is to retain its widest appeal.

I know others like GitLab, who successfully used Postgres at huge scale serving millions of repositories. That also taught me a lesson. You also should take into account that it's not the sheer scale or number of queries per second that is the problem, but the fact that we are heavily depending on locking, OTLP properties, short transactions we have plus a number of database connections that are run from mutiple parallell workers.

The good thing (and one that might actually make us still support MySQL) is that in Airflow 3 we are moving away from using DB access from all components, and that should generally optimize and streamline the way how transactions are run in Airflow. You can see the discussions we just started having, take part in our dev calls (weekly for now) and devlist discussions we have https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+3+Dev+call%3A+Meeting+Notes

Particularly this draft "Task Execution interface" Airflow Improvement Proposal - very directly targets the DB limitations we have https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-72+Task+Execution+Interface+aka+Task+SDK - any feedback, comments, analysis related to it are most welcome and we invite anyone from our community to participate.