apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.01k stars 14.27k forks source link

`DbApiHook.insert_rows` unnecessarily restarting connections #40609

Open potiuk opened 4 months ago

potiuk commented 4 months ago

Discussed in https://github.com/apache/airflow/discussions/40608

Originally posted by **plutaniano** July 4, 2024 ### Apache Airflow Provider(s) common-sql ### Versions of Apache Airflow Providers ``` apache-airflow-providers-common-sql==1.14.0 apache-airflow-providers-mysql==5.6.1 apache-airflow-providers-postgres==5.11.1 ``` ### Apache Airflow version 2.9.2 ### Operating System MacOS Sonoma 14.5 (docker host) ### Deployment Docker-Compose ### Deployment details I'm using the official Airflow `docker-compose.yaml` + a MySQL database, details in the reproduction steps. ### What happened The database connection is restarted multiple times during a single `DbApiHook.insert_rows` call. ### What you think should happen instead `DbApiHook.insert_rows` should create and maintain a single db connection. ### How to reproduce Creating a temporary test project ```bash mkdir /tmp/airflow/ cd /tmp/airflow/ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.2/docker-compose.yaml' ``` Add the following mysql db to the docker-compose file ```yaml mysql: image: mysql:latest environment: MYSQL_DATABASE: 'db' MYSQL_ROOT_PASSWORD: 'airflow' ports: - '3306:3306' ``` Run the docker compose ```bash docker compose up -d ``` Add the following connections to Airflow using `docker exec -it airflow-airflow-triggerer-1 bash` ```console airflow connections add postgres_default --conn-uri postgresql://airflow:airflow@postgres airflow connections add mysql_default --conn-uri mysql://root:airflow@mysql/db ``` Then open a python shell and execute the following scripts: ```python from airflow.providers.postgres.hooks.postgres import PostgresHook pg = PostgresHook() pg.run("CREATE TABLE IF NOT EXISTS t (a int)") pg.insert_rows( table="t", rows=[[i] for i in range(10_000)], target_fields="a", ) ``` And for MySQL ```python from airflow.providers.mysql.hooks.mysql import MySqlHook mysql = MySqlHook() mysql.run("CREATE TABLE IF NOT EXISTS t (a int)") mysql.insert_rows( table="t", rows=[[i] for i in range(100)], target_fields="a", ) ``` Both scripts will open up multiple connections to database while inserting, instead of maintaining just one. Postgres seems to recreate the connection every 1000 inserts, mysql does it after every insert. Postgres: ```console >>> pg.insert_rows( ... table="t", ... rows=[[i] for i in range(10_000)], ... target_fields="a", ... ) [2024-07-04T15:08:13.940+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution. [2024-07-04T15:08:13.942+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution. [2024-07-04T15:08:13.996+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far [2024-07-04T15:08:13.997+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution. [2024-07-04T15:08:14.043+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far [2024-07-04T15:08:14.044+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution. [2024-07-04T15:08:14.090+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far [2024-07-04T15:08:14.091+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution. [2024-07-04T15:08:14.145+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far [2024-07-04T15:08:14.146+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution. [2024-07-04T15:08:14.200+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far [2024-07-04T15:08:14.201+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution. [2024-07-04T15:08:14.245+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far [2024-07-04T15:08:14.246+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution. [2024-07-04T15:08:14.290+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far [2024-07-04T15:08:14.291+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution. [2024-07-04T15:08:14.341+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far [2024-07-04T15:08:14.342+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution. [2024-07-04T15:08:14.393+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far [2024-07-04T15:08:14.394+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution. [2024-07-04T15:08:14.441+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far [2024-07-04T15:08:14.441+0000] {sql.py:611} INFO - Done loading. Loaded a total of 10000 rows into t ``` MySQL ```console >>> mysql.insert_rows( ... table="t", ... rows=[[i] for i in range(100)], ... target_fields="a", ... ) [2024-07-04T15:08:54.551+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution. [2024-07-04T15:08:54.554+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution. [2024-07-04T15:08:54.555+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution. [2024-07-04T15:08:54.556+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution. [2024-07-04T15:08:54.557+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution. [2024-07-04T15:08:54.557+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution. [2024-07-04T15:08:54.558+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution. ... [2024-07-04T15:08:54.616+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution. [2024-07-04T15:08:54.617+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution. [2024-07-04T15:08:54.617+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution. [2024-07-04T15:08:54.618+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution. [2024-07-04T15:08:54.619+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution. [2024-07-04T15:08:54.620+0000] {sql.py:611} INFO - Done loading. Loaded a total of 100 rows into t ``` ### Anything else _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
potiuk commented 4 months ago

@dabla -> can you please take a look. I am not sure but I think this is a side-effect of:

    @contextmanager
    def _create_autocommit_connection(self, autocommit: bool = False):
        """Context manager that closes the connection after use and detects if autocommit is supported."""
        with closing(self.get_conn()) as conn:
            if self.supports_autocommit:
                self.set_autocommit(conn, autocommit)
            yield conn
potiuk commented 4 months ago

Actually it's a side effect of https://github.com/apache/airflow/pull/38528

potiuk commented 4 months ago

@dabla -> the root cause of the problem is now the connection is created every time "placeholder" property is accessed. Would you like to take a stab and fix it ?

dabla commented 4 months ago

@dabla -> the root cause of the problem is now the connection is created every time "placeholder" property is accessed. Would you like to take a stab and fix it ?

Will check it today

dabla commented 4 months ago

@dabla -> the root cause of the problem is now the connection is created every time "placeholder" property is accessed. Would you like to take a stab and fix it ?

I could make this a @cached_property but dunno if that will fix the issue

dabla commented 4 months ago

Create PR for this issue that should fix it.

potiuk commented 4 months ago

Create PR for this issue that should fix it.

Yeah . I thought about and cached property is "good enough". To @plutaniano and @dabla -> this did not actuallly cause restarting of the connection that often (so it was not that bad). What it did, it retrieved Connection object from secret and read it's 'placeholderextra - which had the side effect of producingINFO' log every time placeholder property was accessed - which in case of mysql was "every single row" and for executemany compatible drivers it was every chunk.

It had the side effect of making it everything slower by a) printing logs b) accessing secrets manager / DB to read the connection extra.

Turning placeholder into cached_property will keep this extra lookup, but it's a small price to pay for the flexibility with placeholder we get this way, so I'd say it's good-enough.

dabla commented 4 months ago

Create PR for this issue that should fix it.

Yeah . I thought about and cached property is "good enough". To @plutaniano and @dabla -> this did not actuallly cause restarting of the connection that often (so it was not that bad). What it did, it retrieved Connection object from secret and read it's 'placeholderextra - which had the side effect of producingINFO' log every time placeholder property was accessed - which in case of mysql was "every single row" and for executemany compatible drivers it was every chunk.

It had the side effect of making it everything slower by a) printing logs b) accessing secrets manager / DB to read the connection extra.

Turning placeholder into cached_property will keep this extra lookup, but it's a small price to pay for the flexibility with placeholder we get this way, so I'd say it's good-enough.

No the messag you see has indeed nothing to do with database connections, it just retrieving each time the connection details from Airflow which allow you to create a database connection, but anyway it will be a good improvement nonetheless.

potiuk commented 4 months ago

No the messag you see has indeed nothing to do with database connections, it just retrieving each time the connection details from Airflow which allow you to create a database connection, but anyway it will be a good improvement nonetheless.

Correct. No new connection. But it is much slower now because:

So yeah - caching property solves both problems, and speeds things up and makes them far less costly

potiuk commented 4 months ago

Ah also logging migtht be expensive (money) as well :D depends on whether you use remote logging solution and whether it charges "per message".

dabla commented 4 months ago

Ah also logging migtht be expensive (money) as well :D depends on whether you use remote logging solution and whether it charges "per message".

Completely agree on that, it will cost as well in performance as in money (disk). Hopefully, in the future, AIP-59 will help us detect such regressions/side-effects ;)

potiuk commented 4 months ago

Completely agree on that, it will cost as well in performance as in money (disk). Hopefully, in the future, AIP-59 will help us detect such regressions/side-effects ;)

Indeed ... Cases like that are very difficult to spot with regular unit-testing/code reviews - this one was like a side-effect going three levels deep + it's not obvious that [ self.placeholder ] * N actually calls the placeholder() method n times.

potiuk commented 4 months ago

cc: @bjankie1 :D ^^

plutaniano commented 4 months ago

Thanks a lot, guys. Really appreciate the attention put into this.

plutaniano commented 4 months ago

For anyone who has the same problem, this should work as a temporary fix while 2.9.3 is not out. Just import these hooks instead of the ones from airflow.providers.

from functools import cached_property
from airflow.providers.mysql.hooks.mysql import MySqlHook as _MySqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook as _PostgresHook

class MySqlHook(_MySqlHook):
    @cached_property
    def placeholder(self):
        return super().placeholder

class PostgresHook(_PostgresHook):
    @cached_property
    def placeholder(self):
        return super().placeholder
potiuk commented 4 months ago

For anyone who has the same problem, this should work as a temporary fix while 2.9.3 is not out. Just import these hooks instead of the ones from airflow.providers.

You can also downgrade common.sql provider to 1.11.1 which did not have placeaholder configurable (it was added in 1.12.0) or upgrade to a new common.sql provider that will be released soon (I wm thinking @eladkal ? ) maybe we should release ad-hoc common.sql because of that before 2.9.3 release ?

eladkal commented 4 months ago

I plan to cut provider wave tommorow

dabla commented 3 months ago

@potiuk @eladkal PR 40615 partially fixes the issue, it doesn't fix it for the JdbcHook. It seems following properties of the JdbcHook have the same issue as the placeholder property:

[2024-07-11, 09:38:56 UTC] {o365.py:64} INFO - Called by method: __get__
[2024-07-11, 09:38:56 UTC] {o365.py:67} INFO - Using connection ID 'jdbc_hana' for task execution.
[2024-07-11, 09:38:56 UTC] {o365.py:64} INFO - Called by method: _create_autocommit_connection
[2024-07-11, 09:38:56 UTC] {o365.py:67} INFO - Using connection ID 'jdbc_hana' for task execution.
[2024-07-11, 09:38:56 UTC] {o365.py:64} INFO - Called by method: driver_class
[2024-07-11, 09:38:56 UTC] {o365.py:67} INFO - Using connection ID 'jdbc_hana' for task execution.
[2024-07-11, 09:38:56 UTC] {o365.py:64} INFO - Called by method: driver_path
[2024-07-11, 09:38:56 UTC] {o365.py:67} INFO - Using connection ID 'jdbc_hana' for task execution.
[2024-07-11, 09:38:56 UTC] {o365.py:64} INFO - Called by method: driver_path
[2024-07-11, 09:38:56 UTC] {o365.py:67} INFO - Using connection ID 'jdbc_hana' for task execution.

Maybe we should cache the connection within the Hook instance so it can be reused without having to worry which property is using it? Problem is get_connection is a classmethod, and I would not want to cache the result of the lookup into a static class variable which isn't a good idea, it would be better if it would be cached on the instance level of the Hook, but then that would mean we would need to changed the signature of the get_connection method in BaseHook.

Following methods would need to be changed from:

    @classmethod
    def get_connections(cls, conn_id: str) -> list[Connection]:
        """
        Get all connections as an iterable, given the connection id.

        :param conn_id: connection id
        :return: array of connections
        """
        warnings.warn(
            "`BaseHook.get_connections` method will be deprecated in the future."
            "Please use `BaseHook.get_connection` instead.",
            RemovedInAirflow3Warning,
            stacklevel=2,
        )
        return [cls.get_connection(conn_id)]

    @classmethod
    def get_connection(cls, conn_id: str) -> Connection:
        """
        Get connection, given connection id.

        :param conn_id: connection id
        :return: connection
        """
        from airflow.models.connection import Connection

        conn = Connection.get_connection_from_secrets(conn_id)
        log.info("Using connection ID '%s' for task execution.", conn.conn_id)
        return conn

To:

    def get_connections(self, conn_id: str) -> list[Connection]:
        """
        Get all connections as an iterable, given the connection id.

        :param conn_id: connection id
        :return: array of connections
        """
        warnings.warn(
            "`BaseHook.get_connections` method will be deprecated in the future."
            "Please use `BaseHook.get_connection` instead.",
            RemovedInAirflow3Warning,
            stacklevel=2,
        )
        return [cls.get_connection(conn_id)]

    def get_connection(self, conn_id: str) -> Connection:
        """
        Get connection, given connection id.

        :param conn_id: connection id
        :return: connection
        """
        if self._connection is None:
          from airflow.models.connection import Connection

          self._connection= Connection.get_connection_from_secrets(conn_id)
          log.info("Using connection ID '%s' for task execution.", self._connection.conn_id)
        return self._connection
potiuk commented 3 months ago

Ah .. bummer. But we can fix it in the next round - it's very localized and it's just a lower performance of the "insert_rows" issue as we know.

dabla commented 3 months ago

Ah .. bummer. But we can fix it in the next round - it's very localized and it's just a lower performance of the "insert_rows" issue as we know.

What do you think of the proposed solution above? Or is this to invasive?

potiuk commented 3 months ago

What do you think of the proposed solution above? Or is this to invasive?

A bit too invasive, I think. This actually changes semantics of the methods - someone could rely on the fact that they are returning a new connection object every time.

I think maybe a variation of that - add an optional and default to False reuse_connection - or similar - flag and set it to True when you access connection for just retrieving the extra value?

potiuk commented 3 months ago

Or even better - add "get_connection_extra" method that will set that flag - this way anyone who wants to just retrieve the extra will use that method - then we will not have to remember to set the flag to True.

dabla commented 3 months ago

Or even better - add "get_connection_extra" method that will set that flag - this way anyone who wants to just retrieve the extra will use that method - then we will not have to remember to set the flag to True.

Good idea, think I saw something similar in JdbcHook already, will do that instead.

dabla commented 3 months ago

Something like that in DbApiBook maybe:

    @cached_property
    def connection_extra(self) -> dict:
        conn = self.get_connection(getattr(self, self.conn_name_attr))  # TODO: replace getattr with get_conn_id
        return conn.extra_dejson

Will wait until 40665 is merged, as then I can also use the get_conn_id method which is cleaner.

dabla commented 3 months ago

PR 40751 will even go further an cache the connection on the DbApiHook instance, as some hooks were already doing it, it is have now become a property in DbApiHook.

dabla commented 2 months ago

Some help/hints/tips of random failing tests in PR 40751 would be handy so we can merge the PR and also close this issue with it.

potiuk commented 2 months ago

They look like real compatibility issues with Airflow 2.7 - 2.9