apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.48k stars 14.37k forks source link

Build correct SQLAlchemy URI in hooks based on `DbApiHook` #38195

Open Taragolis opened 8 months ago

Taragolis commented 8 months ago

Body

Original discussion: https://lists.apache.org/thread/8rhmz3qh30hvkondct4sfmgk4vd07mn5

tl;dr; DbApiHook assumes that Airflow Connection URI representation is a valid SA URI, but it is not true and never was, it might work in simple cases, however it will fail in some complex one.

https://github.com/apache/airflow/blob/2ab60812a040cb8b760acaf396f625d0d719e4be/airflow/providers/common/sql/hooks/sql.py#L184-L192

This task intend to track fixes into the hooks which based on DbApiHook.

Hooks which don't overwrite invalid DbApiHook.get_uri implementation

Hooks which uses Connection.get_uri for construct SQLAlchemy URI

Committer

Bowrna commented 8 months ago

hello @Taragolis I was checking this issue and seeing if I could fix it.

https://github.com/apache/airflow/blob/0e8f108313d4af0b450581661aeb8ed35e82a8e6/airflow/models/connection.py#L271-L329

Is fixing the get_uri part so that Airflow Connection URI representation becomes a valid SA URI the right way to fix this issue?

Taragolis commented 8 months ago

As far as i remember, we recently discuss this case in slack. I guess better introduce sa_uri property (or some similar naming) which returns sqlalchemy.engine.URL into the DbApiHook without any implementation, e.g. raise NotImplementedError and implements it per hook which have implementation for SQAlchemy.

get_uri - not clear which URI it is implements, DBApi? SQLAlchemy or even some internal (ElasticsearchSQLHook)

cc: @potiuk @dstandish

rawwar commented 7 months ago

@Taragolis , I am working on this PR - https://github.com/apache/airflow/pull/38871 . This is a separate PR to just update the DbApiHook to include a method to return a SqlAlchemy URL that can be reused.

dabla commented 7 months ago

As far as i remember, we recently discuss this case in slack. I guess better introduce sa_uri property (or some similar naming) which returns sqlalchemy.engine.URL into the DbApiHook without any implementation, e.g. raise NotImplementedError and implements it per hook which have implementation for SQAlchemy.

get_uri - not clear which URI it is implements, DBApi? SQLAlchemy or even some internal (ElasticsearchSQLHook)

cc: @potiuk @dstandish

@Taragolis Why not implement another method named get_url and use that one to instantiate the SQLALchemy engine, as the engine can accept an SQLAlchemy URL object. Also the SQLAlchemy URL class has a factory method named create, which easily allows you to create the URL without the hastle of string manipulations, this is how I did it for my custom SQL operator (the one I mentioned on Slack):

    def get_url(self) -> URL:
        return URL.create(
            self.scheme,
            username=self.connection.login,
            password=self.connection.password,
            host=self.connection.host,
            port=self.connection.port,
        )

Also see here the SQL documentation concerning creating URL's programmatically: https://docs.sqlalchemy.org/en/20/core/engines.html#creating-urls-programmatically

I see @rawwar was first mentioning this, I think this would be a good idea and would simplify code and be less bug prone.

Taragolis commented 7 months ago

There is new property which intends to returns SQL Alchemy URL, current behaviour of get_url is ambiguous and non-all hook return SA URI here, for some of the DB there is no even SA dialects exists, so better keep get_url as is and step-by-step migrate to appropriate ones. So if dialect exists and it installed with provider I do not have any objections to add it implementation as sqlalchemy_url

dabla commented 7 months ago

There is new property which intends to returns SQL Alchemy URL, current behaviour of get_url is ambiguous and non-all hook return SA URI here, for some of the DB there is no even SA dialects exists, so better keep get_url as is and step-by-step migrate to appropriate ones. So if dialect exists and it installed with provider I do not have any objections to add it implementation as sqlalchemy_url

Hey Andrey think there is some confusion here. Yes we should definitly leave the get_uri method as it is (not get_url as it doesn’t exist yet) and implement a new get_url method which would then be called by the existing get_sqlalchemy_engine.

dabla commented 7 months ago

This is how I done it in my custom SQLRowsInsertOperator:

    @cached_property
    def connection(self) -> Connection:
        return BaseHook.get_connection(self.conn_id)  # type: ignore

    @cached_property
    def scheme(self) -> str:
        return self.connection.extra_dejson["sqlalchemy_scheme"]

    @cached_property
    def table_name(self) -> str:
        return self.get_columns().table_name_with_schema()

    @cached_property
    def driver(self) -> str:
        return self.connection.extra_dejson["driver"]

    # TODO: Maybe this should be moved to DbAPiHook
    def get_url(self) -> URL:
        self.log.info("Connection schema: %s", self.schema)
        self.log.info("Connection scheme: %s", self.scheme)
        self.log.info("Connection driver: %s", self.driver)
        self.log.info("Connection type: %s", self.connection.conn_type)
        self.log.info("Connection login: %s", self.connection.login)
        self.log.info("Connection password: %s", self.connection.password)
        self.log.info("Connection host: %s", self.connection.host)
        self.log.info("Connection port: %s", self.connection.port)

        return URL.create(
            self.scheme,
            username=self.connection.login,
            password=self.connection.password,
            host=self.connection.host,
            port=self.connection.port,
        )

   # TODO: Maybe this method in DbAPiHook should be changed to this instead of calling get_uri() method
    def get_sqlalchemy_engine(self, engine_kwargs=None):
        if engine_kwargs is None:
            engine_kwargs = {}
        return create_engine(self.get_url(), **engine_kwargs)

    # TODO: Maybe this should be moved to DbAPiHook
    @cached_property
    def inspector(self):
        engine = self.get_sqlalchemy_engine()
        self.log.debug("Engine drivername: %s", engine.url.drivername)
        self.log.debug("Engine username: %s", engine.url.username)
        self.log.debug("Engine password: %s", engine.url.password)
        self.log.debug("Engine host: %s", engine.url.host)
        self.log.debug("Engine port: %s", engine.url.port)
        self.log.debug("Engine database: %s", engine.url.database)
        self.log.debug("Engine dialect: %s", engine.dialect.name)

        return Inspector.from_engine(engine)
potiuk commented 7 months ago

Hey Andrey think there is some confusion here. Yes we should definitly leave the get_uri method as it is (not get_url as it doesn’t exist yet) and implement a new get_url method which would then be called by the existing get_sqlalchemy_engine.

Yes, adding new method is the way to go. How to do it, should be proposed and discussed in the PR, we have 20+ sql providers, and I think the proposal should be complete and explain how to deal with cases that can/cannot be mapped. Also the method should explicily - IMHO - call our sqlalachemy, (getSqlAlchemyURI ?) and likely allow variations - from what I understand SQLAlchemy can have different URLs not only for different databases, but also for different drivers. Also some of our Connections allows to add different kinds of extras and parameters - not only hosts but other parameters that might be used and passed differently to different drivers.

If we decide to introduce a new "standard" method, in a product like Airflow and common API like DBApiHook, the proposal should provide answers to all the questions that users might ask in order to be mergable. It does not have to handle all of those but at least get enough error handlling and interface specification to explain what is the transition from the Connection form (in all those cases we have DBAPI implementation) -> SQLAlchemy URL. Which variations are handled, how it should work, how to map engine args etc.

I don't think it can be handled by a single implementation, it's more a scaffloiding + implementation for all the implementations and it should be designed nicely with al least a handful (if not all) implementation from the above list to be quite sure that we will be able to handle those cases.

rawwar commented 7 months ago

Hey Andrey think there is some confusion here. Yes we should definitly leave the get_uri method as it is (not get_url as it doesn’t exist yet) and implement a new get_url method which would then be called by the existing get_sqlalchemy_engine.

@dabla , I think, we are indeed adding get_url functionality using sqlalchemy_url property.