apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.15k stars 14.04k forks source link

Flaky `test_xcom_map_error_fails_task` test #33178

Closed potiuk closed 1 year ago

potiuk commented 1 year ago

Body

This flaky test appears recently in our jobs and it seems this is a real problem with our code - after few attempts of fixing it, it still appears in our builds:

tests/models/test_xcom_arg_map.py:174: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
airflow/utils/session.py:74: in wrapper
    return func(*args, **kwargs)
airflow/models/taskinstance.py:1840: in run
    self._run_raw_task(
airflow/utils/session.py:74: in wrapper
    return func(*args, **kwargs)
airflow/models/taskinstance.py:1494: in _run_raw_task
    session.commit()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1454: in commit
    self._transaction.commit(_to_root=self.future)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:832: in commit
    self._prepare_impl()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:811: in _prepare_impl
    self.session.flush()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3449: in flush
    self._flush(objects)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3589: in _flush
    transaction.rollback(_capture_exception=True)
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: in __exit__
    compat.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3549: in _flush
    flush_context.execute()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:456: in execute
    rec.execute(self)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:630: in execute
    util.preloaded.orm_persistence.save_obj(
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:237: in save_obj
    _emit_update_statements(
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:1001: in _emit_update_statements
    c = connection._execute_20(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1710: in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py:334: in _execute_on_connection
    return connection._execute_clauseelement(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1577: in _execute_clauseelement
    ret = self._execute_context(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1953: in _execute_context
    self._handle_dbapi_exception(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:2134: in _handle_dbapi_exception
    util.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1910: in _execute_context
    self.dialect.do_execute(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:736: in do_execute
    cursor.execute(statement, parameters)
/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:174: in execute
    self._discard()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <MySQLdb.cursors.Cursor object at 0x7f52bc978a60>

    def _discard(self):
        self.description = None
        self.description_flags = None
        # Django uses some member after __exit__.
        # So we keep rowcount and lastrowid here. They are cleared in Cursor._query().
        # self.rowcount = 0
        # self.lastrowid = None
        self._rows = None
        self.rownumber = None

        if self._result:
            self._result.discard()
            self._result = None

        con = self.connection
        if con is None:
            return
>       while con.next_result() == 0:  # -1 means no more data.
E       sqlalchemy.exc.ProgrammingError: (MySQLdb.ProgrammingError) (2014, "Commands out of sync; you can't run this command now")
E       [SQL: UPDATE task_instance SET pid=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
E       [parameters: (90, datetime.datetime(2023, 8, 7, 14, 44, 7, 580365), 'test_dag', 'pull', 'test', 0)]
E       (Background on this error at: https://sqlalche.me/e/14/f405)
E       sqlalchemy.exc.ProgrammingError: (MySQLdb.ProgrammingError) (2014, "Commands out of sync; you can't run this command now")
E       [SQL: UPDATE task_instance SET pid=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
E       [parameters: (90, datetime.datetime(2023, 8, 7, 14, 44, 7, 580365), 'test_dag', 'pull', 'test', 0)]
E       (Background on this error at: https://sqlalche.me/e/14/f405)

Example failures:

Committer

potiuk commented 1 year ago

cc: @uranusjr

potiuk commented 1 year ago

Another incarnation of the same problem (SQLite produces different error):

https://github.com/apache/airflow/actions/runs/5775967682/job/15654656922

________________________ test_xcom_map_error_fails_task ________________________

self = <sqlalchemy.future.engine.Connection object at 0x7f5a70b28c70>
dialect = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x7f5ad1451db0>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext'>>
statement = <sqlalchemy.dialects.sqlite.base.SQLiteCompiler object at 0x7f5ad06c7970>
parameters = []
execution_options = immutabledict({'_sa_orm_load_options': default_load_options(_legacy_uniquing=True), '_result_disable_adapt_to_context': True, 'future_result': True})
args = (<sqlalchemy.dialects.sqlite.base.SQLiteCompiler object at 0x7f5ad06c7970>, [], <sqlalchemy.sql.selectable.Select obje...0720 run_id)s', 'test', type_=String(length=250)), BindParameter('%(140026414543824 map_index)s', 0, type_=Integer())])
kw = {'cache_hit': symbol('CACHE_HIT')}
branched = <sqlalchemy.future.engine.Connection object at 0x7f5a70b28c70>
yp = None
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f5a70b2add0>

    def _execute_context(
        self,
        dialect,
        constructor,
        statement,
        parameters,
        execution_options,
        *args,
        **kw
    ):
        """Create an :class:`.ExecutionContext` and execute, returning
        a :class:`_engine.CursorResult`."""

        branched = self
        if self.__branch_from:
            # if this is a "branched" connection, do everything in terms
            # of the "root" connection, *except* for .close(), which is
            # the only feature that branching provides
            self = self.__branch_from

        if execution_options:
            yp = execution_options.get("yield_per", None)
            if yp:
                execution_options = execution_options.union(
                    {"stream_results": True, "max_row_buffer": yp}
                )

        try:
            conn = self._dbapi_connection
            if conn is None:
                conn = self._revalidate_connection()

>           context = constructor(
                dialect, self, conn, execution_options, *args, **kw
            )

/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1810: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1020: in _init_compiled
    self.cursor = self.create_cursor()
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1391: in create_cursor
    return self.create_default_cursor()
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1394: in create_default_cursor
    return self._dbapi_connection.cursor()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f5a70b2add0>
args = (), kwargs = {}

    def cursor(self, *args, **kwargs):
        """Return a new DBAPI cursor for the underlying connection.

        This method is a proxy for the ``connection.cursor()`` DBAPI
        method.

        """
>       return self.dbapi_connection.cursor(*args, **kwargs)
E       sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 140026872592128 and this is thread id 140028343548800.

/usr/local/lib/python3.10/site-packages/sqlalchemy/pool/base.py:1133: ProgrammingError

The above exception was the direct cause of the following exception:

    @contextlib.contextmanager
    def create_session() -> Generator[settings.SASession, None, None]:
        """Contextmanager that will create and teardown a session."""
        Session = getattr(settings, "Session", None)
        if Session is None:
            raise RuntimeError("Session must be set before!")
        session = Session()
        try:
>           yield session

airflow/utils/session.py:36: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
airflow/utils/session.py:77: in wrapper
    return func(*args, session=session, **kwargs)
airflow/models/taskinstance.py:1840: in run
    self._run_raw_task(
airflow/utils/session.py:74: in wrapper
    return func(*args, **kwargs)
airflow/models/taskinstance.py:1488: in _run_raw_task
    self.refresh_from_db(session=session)
airflow/utils/session.py:74: in wrapper
    return func(*args, **kwargs)
airflow/models/taskinstance.py:866: in refresh_from_db
    ti = qry.one_or_none()
/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2850: in one_or_none
    return self._iter().one_or_none()
/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2916: in _iter
    result = self.session.execute(
/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py:1717: in execute
    result = conn._execute_20(statement, params or {}, execution_options)
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1710: in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:334: in _execute_on_connection
    return connection._execute_clauseelement(
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1577: in _execute_clauseelement
    ret = self._execute_context(
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1816: in _execute_context
    self._handle_dbapi_exception(
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:2134: in _handle_dbapi_exception
    util.raise_(
/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1810: in _execute_context
    context = constructor(
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1020: in _init_compiled
    self.cursor = self.create_cursor()
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1391: in create_cursor
    return self.create_default_cursor()
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1394: in create_default_cursor
    return self._dbapi_connection.cursor()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f5a70b2add0>
args = (), kwargs = {}

    def cursor(self, *args, **kwargs):
        """Return a new DBAPI cursor for the underlying connection.

        This method is a proxy for the ``connection.cursor()`` DBAPI
        method.

        """
>       return self.dbapi_connection.cursor(*args, **kwargs)
E       sqlalchemy.exc.ProgrammingError: (sqlite3.ProgrammingError) SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 140026872592128 and this is thread id 140028343548800.
E       [SQL: SELECT task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.map_index AS task_instance_map_index, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.try_number AS task_instance_try_number, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.custom_operator_name AS task_instance_custom_operator_name, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.updated_at AS task_instance_updated_at, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs 
E       FROM task_instance 
E       WHERE task_instance.dag_id = ? AND task_instance.task_id = ? AND task_instance.run_id = ? AND task_instance.map_index = ?]
E       (Background on this error at: https://sqlalche.me/e/14/f405)

/usr/local/lib/python3.10/site-packages/sqlalchemy/pool/base.py:1133: ProgrammingError
potiuk commented 1 year ago

Possibly related case:

https://github.com/apache/airflow/actions/runs/5778361625/job/15659550497#step:5:8291

This is test_upstream_in_mapped_group_triggers_only_relevant, but likely has similar root cause (SQL commands by out of sync/in a wrong sequence).

_____________ test_upstream_in_mapped_group_triggers_only_relevant _____________

self = Engine(***postgres/airflow)
fn = <bound method Pool.connect of <sqlalchemy.pool.impl.QueuePool object at 0x7fc52cc83700>>
connection = None

    def _wrap_pool_connect(self, fn, connection):
        dialect = self.dialect
        try:
>           return fn()

/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3371: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:327: in connect
    return _ConnectionFairy._checkout(self)
/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:995: in _checkout
    del fairy
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: in __exit__
    compat.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:928: in _checkout
    result = pool._dialect.do_ping(fairy.dbapi_connection)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0x7fc5e0105370>
dbapi_connection = <connection object at 0x7fc5df537180; dsn: 'user=postgres *** dbname=airflow host=postgres', closed: 1>

    def do_ping(self, dbapi_connection):
        cursor = None
        before_autocommit = dbapi_connection.autocommit
        try:
            if not before_autocommit:
                dbapi_connection.autocommit = True
            cursor = dbapi_connection.cursor()
            try:
                cursor.execute(self._dialect_specific_select_one)
            finally:
                cursor.close()
                if not before_autocommit and not dbapi_connection.closed:
>                   dbapi_connection.autocommit = before_autocommit
E                   psycopg2.ProgrammingError: set_session cannot be used inside a transaction

/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py:877: ProgrammingError

The above exception was the direct cause of the following exception:

dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at 0x7fc525ff6ee0>
session = <sqlalchemy.orm.session.Session object at 0x7fc5c0e661c0>

    def test_upstream_in_mapped_group_triggers_only_relevant(dag_maker, session):
        from airflow.decorators import task, task_group

        with dag_maker(session=session):

            @task
            def t(x):
                return x

            @task_group
            def tg(x):
                t1 = t.override(task_id="t1")(x=x)
                return t.override(task_id="t2")(x=t1)

            t2 = tg.expand(x=[1, 2, 3])
            t.override(task_id="t3")(x=t2)

        dr: DagRun = dag_maker.create_dagrun()

        def _one_scheduling_decision_iteration() -> dict[tuple[str, int], TaskInstance]:
            decision = dr.task_instance_scheduling_decisions(session=session)
            return {(ti.task_id, ti.map_index): ti for ti in decision.schedulable_tis}

        # Initial decision.
        tis = _one_scheduling_decision_iteration()
        assert sorted(tis) == [("tg.t1", 0), ("tg.t1", 1), ("tg.t1", 2)]

        # After running the first t1, the first t2 becomes immediately available.
        tis["tg.t1", 0].run()
        tis = _one_scheduling_decision_iteration()
        assert sorted(tis) == [("tg.t1", 1), ("tg.t1", 2), ("tg.t2", 0)]

        # Similarly for the subsequent t2 instances.
        tis["tg.t1", 2].run()
        tis = _one_scheduling_decision_iteration()
        assert sorted(tis) == [("tg.t1", 1), ("tg.t2", 0), ("tg.t2", 2)]

        # But running t2 partially does not make t3 available.
        tis["tg.t1", 1].run()
        tis["tg.t2", 0].run()
        tis["tg.t2", 2].run()
        tis = _one_scheduling_decision_iteration()
        assert sorted(tis) == [("tg.t2", 1)]

        # Only after all t2 instances are run does t3 become available.
>       tis["tg.t2", 1].run()

tests/ti_deps/deps/test_trigger_rule_dep.py:1158: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
airflow/utils/session.py:77: in wrapper
    return func(*args, session=session, **kwargs)
airflow/models/taskinstance.py:1840: in run
    self._run_raw_task(
airflow/utils/session.py:74: in wrapper
    return func(*args, **kwargs)
airflow/models/taskinstance.py:1514: in _run_raw_task
    context = self.get_template_context(ignore_param_exceptions=False)
airflow/models/taskinstance.py:2204: in get_template_context
    "prev_data_interval_start_success": get_prev_data_interval_start_success(),
airflow/models/taskinstance.py:2077: in get_prev_data_interval_start_success
    data_interval = _get_previous_dagrun_data_interval_success()
airflow/models/taskinstance.py:2071: in _get_previous_dagrun_data_interval_success
    dagrun = _get_previous_dagrun_success()
airflow/models/taskinstance.py:2068: in _get_previous_dagrun_success
    return self.get_previous_dagrun(state=DagRunState.SUCCESS, session=session)
airflow/utils/session.py:74: in wrapper
    return func(*args, **kwargs)
airflow/models/taskinstance.py:1024: in get_previous_dagrun
    last_dagrun = dr.get_previous_dagrun(session=session, state=state)
airflow/utils/session.py:74: in wrapper
    return func(*args, **kwargs)
airflow/models/dagrun.py:533: in get_previous_dagrun
    return session.scalar(select(DagRun).where(*filters).order_by(DagRun.execution_date.desc()))
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1747: in scalar
    return self.execute(
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1716: in execute
    conn = self._connection_for_bind(bind)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1555: in _connection_for_bind
    return self._transaction._connection_for_bind(
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:750: in _connection_for_bind
    conn = bind.connect()
/usr/local/lib/python3.8/site-packages/sqlalchemy/future/engine.py:406: in connect
    return super(Engine, self).connect()
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3325: in connect
    return self._connection_cls(self, close_with_result=close_with_result)
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:96: in __init__
    else engine.raw_connection()
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3404: in raw_connection
    return self._wrap_pool_connect(self.pool.connect, _connection)
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3374: in _wrap_pool_connect
    Connection._handle_dbapi_exception_noconnection(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:2208: in _handle_dbapi_exception_noconnection
    util.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3371: in _wrap_pool_connect
    return fn()
/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:327: in connect
    return _ConnectionFairy._checkout(self)
/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:995: in _checkout
    del fairy
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: in __exit__
    compat.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:928: in _checkout
    result = pool._dialect.do_ping(fairy.dbapi_connection)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0x7fc5e0105370>
dbapi_connection = <connection object at 0x7fc5df537180; dsn: 'user=postgres *** dbname=airflow host=postgres', closed: 1>

    def do_ping(self, dbapi_connection):
        cursor = None
        before_autocommit = dbapi_connection.autocommit
        try:
            if not before_autocommit:
                dbapi_connection.autocommit = True
            cursor = dbapi_connection.cursor()
            try:
                cursor.execute(self._dialect_specific_select_one)
            finally:
                cursor.close()
                if not before_autocommit and not dbapi_connection.closed:
>                   dbapi_connection.autocommit = before_autocommit
E                   sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) set_session cannot be used inside a transaction
E                   (Background on this error at: https://sqlalche.me/e/14/f405)

/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py:877: ProgrammingError
uranusjr commented 1 year ago

This is what test_mode is “designed” (read: uglily hack) to work around. The simplest solution (I think) is to change ti.run() to ti.run(test_mode=True).

Alternatively I think doing ti.run(session=session) can be the solution as well, since the issue seems to be around session synchronisation.

potiuk commented 1 year ago

I think I applied the run(session=) in all those tests allready thinking that it might help @uranusjr . But to no avail. This is still happening even after I did (or maybe there are other places I missed ?)

potiuk commented 1 year ago

See alll my attempts from last weekend:

I really reach out to you becasue I already exhausted everything I could come up with and look for help.

potiuk commented 1 year ago

BTW., That's the first time I hear about test_mode, but I have a feeling, tha that ths is a 'real` problem we have

uranusjr commented 1 year ago

I looked up the error messages an it seems like the errors generally happen when there are multiple concurrently running sessions, and the recommended fix is generally to close one of them. But since this happens in a test, and more importantly flaky, I think the error points to there’s another open session somewhere else that is breaking the test. In this case it’d probably be very difficult to pinpoint the offending test since there’s no log on it.

The test_mode flag essentially tells various parts of the code to not commit database changes. This (I believe) is a hack introduced a long time ago to avoid polluting the database during tests, since a lot of the older parts of Airflow really handle database access quite poorly and there’s not a good way to clean up all of them.

It’s obviously not a very good fix, but considering the situation I wonder if we have a better choice. I’ll see if there’s a good way to toggle that flag more easily (than to pass it manually everywhere).

potiuk commented 1 year ago

I have several hypotheses around that.

1) run() should not be used in tests

I had run() is wrong hypothesis (that you always need to set session=session when you dag_run.run() or ti.run() ). But I looked closer and @provide_session decorator already closes the session when it exits, so that should not be the case (unless of course those run method has some bug in them that session is not really passed down but re-created again).

So I think now that changing everywhere run(session=session) is not a solution

2) It's possible that other tests are leaking open sessions as you explained

However I find it pretty strange that those test failures are occuring only in the "mapped" versions of tests. There is - I believe - an easy way to verify this hypothesis. I just created this PR #33190 with "full tests needed" - where I added an autouse=True fixture which will close all open sqlalchemy sessions before AND after every test function. This should handle all cases and even cases where pytest will generally run each tests in a different thread - pytest fixture is guaranteed to run in one test, before the other test is even attempted - so even if there are other threads around, this should handle it.

I believe this is even something that we can geneally leave "for good" in our conftest.py. I don't think it will slow down our test suite that much (it will be maybe 10s of seconds rather than minutes) - but it should give use perfect isolation in case there are in-fact any sessions leaked from other tests. Let's see if it will change anything.

However, when we exclude that (let's see if we can see the flaky tests with the "close_all_sessions". then whatever is left must be the reason.

3) My 3rd hypothesis is that there is a bug somewhere deep in the mapped code that causes it.

The flaky tests are only mapping related. I run probably few 100s (if not thousands) of test jobs recently in my quest to squash the flaky tests and those errors only occur in tests which are related to mapped task instances. Not even once I saw it in other tests. And I saw about 30-40 of such failuers already.

For me this is likely that somewhere deep-down the stack there is one forgotten place where we do not pass session but create a new one and they are used together. We just need to find that place. But let's see after I run the "close_all_sessions" change quite a few times and let's see if we can see it happening there.

potiuk commented 1 year ago

BTW. Yes. I absolutely agree that test_mode is a very wrong idea . I think we have very few usages of it (if at all) and I will attempt to remove it next.

potiuk commented 1 year ago

Unfortunately no dice.

https://github.com/apache/airflow/actions/runs/5794267587/job/15703726213?pr=33190#step:6:9138

_________________________ test_xcom_map_raise_to_skip __________________________

self = <sqlalchemy.future.engine.Connection object at 0x7f123306cac0>
dialect = <sqlalchemy.dialects.mysql.mysqldb.MySQLDialect_mysqldb object at 0x7f128bc4d340>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.mysql.mysqldb.MySQLExecutionContext_mysqldb'>>
statement = 'UPDATE task_instance SET pid=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s'
parameters = (90, datetime.datetime(2023, 8, 8, 8, 12, 24, 176037), 'test_dag', 'push', 'test', -1)
execution_options = immutabledict({'autocommit': True, 'compiled_cache': {(<sqlalchemy.dialects.mysql.mysqldb.MySQLDialect_mysqldb object ...rigger_id'), False, False), <sqlalchemy.dialects.mysql.mysqldb.MySQLCompiler_mysqldb object at 0x7f12542e6f70>, 548]}})
args = (<sqlalchemy.dialects.mysql.mysqldb.MySQLCompiler_mysqldb object at 0x7f128b002fd0>, [{'pid': 90, 'task_instance_dag_i..., type_=String(length=250, collation='utf8mb3_bin')), BindParameter('task_instance_map_index', None, type_=Integer())])
kw = {'cache_hit': symbol('CACHE_HIT')}
branched = <sqlalchemy.future.engine.Connection object at 0x7f123306cac0>
yp = None
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f123306cc70>
context = <sqlalchemy.dialects.mysql.mysqldb.MySQLExecutionContext_mysqldb object at 0x7f123306cfa0>
cursor = <MySQLdb.cursors.Cursor object at 0x7f123306c2b0>, evt_handled = False

    def _execute_context(
        self,
        dialect,
        constructor,
        statement,
        parameters,
        execution_options,
        *args,
        **kw
    ):
        """Create an :class:`.ExecutionContext` and execute, returning
        a :class:`_engine.CursorResult`."""

        branched = self
        if self.__branch_from:
            # if this is a "branched" connection, do everything in terms
            # of the "root" connection, *except* for .close(), which is
            # the only feature that branching provides
            self = self.__branch_from

        if execution_options:
            yp = execution_options.get("yield_per", None)
            if yp:
                execution_options = execution_options.union(
                    {"stream_results": True, "max_row_buffer": yp}
                )

        try:
            conn = self._dbapi_connection
            if conn is None:
                conn = self._revalidate_connection()

            context = constructor(
                dialect, self, conn, execution_options, *args, **kw
            )
        except (exc.PendingRollbackError, exc.ResourceClosedError):
            raise
        except BaseException as e:
            self._handle_dbapi_exception(
                e, util.text_type(statement), parameters, None, None
            )

        if (
            self._transaction
            and not self._transaction.is_active
            or (
                self._nested_transaction
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:2134: in _handle_dbapi_exception
    util.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1910: in _execute_context
    self.dialect.do_execute(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:736: in do_execute
    cursor.execute(statement, parameters)
/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:174: in execute
    self._discard()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <MySQLdb.cursors.Cursor object at 0x7f123306c2b0>

    def _discard(self):
        self.description = None
        self.description_flags = None
        # Django uses some member after __exit__.
        # So we keep rowcount and lastrowid here. They are cleared in Cursor._query().
        # self.rowcount = 0
        # self.lastrowid = None
        self._rows = None
        self.rownumber = None

        if self._result:
            self._result.discard()
            self._result = None

        con = self.connection
        if con is None:
            return
>       while con.next_result() == 0:  # -1 means no more data.
E       sqlalchemy.exc.ProgrammingError: (MySQLdb.ProgrammingError) (2014, "Commands out of sync; you can't run this command now")
E       [SQL: UPDATE task_instance SET pid=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
E       [parameters: (90, datetime.datetime(2023, 8, 8, 8, 12, 24, 176037), 'test_dag', 'push', 'test', -1)]
E       (Background on this error at: https://sqlalche.me/e/14/f405)

For me it looks like REALLY there is a bug somewhere in mapping code that we need to track.

potiuk commented 1 year ago

Not yet closed ...

potiuk commented 1 year ago

@uranusjr -> let's see maybe that one will show some case where we have two sessions being active at the same time https://github.com/apache/airflow/pull/33259

potiuk commented 1 year ago

I think my deduction and observations were quite right @uranusjr.

There is indeed a session management problem in only mapped operators.

The out-of-sync operatiors are with allmost 100% certainty caused by this code in airflow/midels/mapped_operator.py and in particular by not passing down the session to render_template_fields method:

        # Ideally we'd like to pass in session as an argument to this function,
        # but we can't easily change this function signature since operators
        # could override this. We can't use @provide_session since it closes and
        # expunges everything, which we don't want to do when we are so "deep"
        # in the weeds here. We don't close this session for the same reason.
        session = settings.Session()

        mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
        unmapped_task = self.unmap(mapped_kwargs)
        context_update_for_unmapped(context, unmapped_task)

        # Since the operators that extend `BaseOperator` are not subclasses of
        # `MappedOperator`, we need to call `_do_render_template_fields` from
        # the unmapped task in order to call the operator method when we override
        # it to customize the parsing of nested fields.
        unmapped_task._do_render_template_fields(
            parent=unmapped_task,
            template_fields=self.template_fields,

By addind logs for opening and destroying session I caught one case when it happeened and the only extra sessions created were there in render_template_fields - which caught my attention to this ^^

https://github.com/apache/airflow/actions/runs/5826237682/job/15799895643?pr=33259#step:6:9927

I think we should find out a better solution than this hack - it is going to cause us more troubles than it's worth IMHO. I am happy to brainstorm on that or see whether I can come up with some proposal.

But yes - it looks like our tests are actually detecting a real problem with our implementation.

uranusjr commented 1 year ago

That makes sense, it’s indeed a dangling session object. In a real deployment this code is always only run in a worker process, so the session is collected when the worker terminates and does not cause a leak. Test are however obviously different. Maybe we can use some monkey patching to deal with this.

uranusjr commented 1 year ago

I opened #33307 as a possible solution. How can this be validated?

potiuk commented 1 year ago

It happened frequently enough that just running PR with "full tests needed' label few times should give us the answer

potiuk commented 1 year ago

Though, I think it will not fix it. I think the problem is not that we are leaving the dangling session between tests - the problem is that we are running two run_tasks one after the other in the same test - so we are likely keeping the dangling session from the first run while the second is running. That would basically mean that we will have to close the session in between them.

uranusjr commented 1 year ago

The monkey-patching fixture in the PR makes all tasks use the same session so I think it has a chance (since there’s nothing to conflict with)

potiuk commented 1 year ago

Hmm.. But we willl still have at least two sessions at the same time that might be used in parallel - potentially:

I was thinking - looking at the logging I added:

[2023-08-10 21:43:01,362] {session.py:39} INFO - CREATE SESSION wrapper:_execute_task_with_callbacks:_run_raw_task:
[2023-08-10T21:43:01.362+0000] {session.py:39} INFO - CREATE SESSION wrapper:_execute_task_with_callbacks:_run_raw_task:
[2023-08-10 21:43:01,364] {session.py:51} INFO - DESTROY_SESSION wrapper:_execute_task_with_callbacks:_run_raw_task:
[2023-08-10 21:43:01,365] {session.py:39} INFO - CREATE SESSION wrapper:render_template_fields:render_templates:
[2023-08-10 21:43:01,365] {session.py:51} INFO - DESTROY_SESSION wrapper:render_template_fields:render_templates:
[2023-08-10 21:43:01,365] {session.py:39} INFO - CREATE SESSION wrapper:_execute_task_with_callbacks:_run_raw_task:
[2023-08-10T21:43:01.364+0000] {session.py:51} INFO - DESTROY_SESSION wrapper:_execute_task_with_callbacks:_run_raw_task:
[2023-08-10T21:43:01.365+0000] {session.py:39} INFO - CREATE SESSION wrapper:render_template_fields:render_templates:
[2023-08-10T21:43:01.365+0000] {session.py:51} INFO - DESTROY_SESSION wrapper:render_template_fields:render_templates:
[2023-08-10T21:43:01.365+0000] {session.py:39} INFO - CREATE SESSION wrapper:_execute_task_with_callbacks:_run_raw_task:
[2023-08-10 21:43:01,369] {session.py:51} INFO - DESTROY_SESSION wrapper:_execute_task_with_callbacks:_run_raw_task:
[2023-08-10 21:43:01,369] {session.py:39} INFO - CREATE SESSION wrapper:_execute_task_with_callbacks:_run_raw_task:
[2023-08-10T21:43:01.369+0000] {session.py:51} INFO - DESTROY_SESSION wrapper:_execute_task_with_callbacks:_run_raw_task:
[2023-08-10T21:43:01.369+0000] {session.py:39} INFO - CREATE SESSION wrapper:_execute_task_with_callbacks:_run_raw_task:
[2023-08-10 21:43:01,372] {session.py:51} INFO - DESTROY_SESSION wrapper:_execute_task_with_callbacks:_run_raw_task:

That seems like an unnecesary resource usage. Opening a new session is generally expensive (both client and server side) - yes - we have pgbouncer etc. - but this means that every mapped task needs minimum 2 sessions to run, where 1 would be enough - rendering templated fields could be easily done using the _execute_task_with_callbacks one.

Since rendering mapped tasks is ALWAYS done in task maybe a better solution will be to store the session in a singleton in inside the _run_raw_task and use that stored session in mapped_operator.py . That sounds like much more reasonable usage of the session we already have, and I don't see an obvious problem with such an approach..

It would save quite a lot of resources.

potiuk commented 1 year ago

I might draft a pr about that really, quickly - but maybe you can think of some drawbacks of such approach ? If we do it this way then it won't only solve the test issue but also reduce the overhead for running mapped tasks.

potiuk commented 1 year ago

Created #33309 -> likely some tests will fail that will need the context manager in tests, but I think that solution is better resource-wide.

potiuk commented 1 year ago

BTW. From the tests (and after a bit thinking) it's clear that it's not only in run_task. The sessions were leaking also in www and this is quite a bit worse.

potiuk commented 1 year ago

BTW. From the tests (and after a bit thinking) it's clear that it's not only in run_task. The sessions were leaking also in www and this is quite a bit worse.

OR maybe not... The www test was something else so maybe that was jumping to conclusions.

potiuk commented 1 year ago

All right - I got my change green https://github.com/apache/airflow/pull/33309 and I think it has a chance to address both - test flakiness and resource usage @uranusjr