apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.55k stars 14.16k forks source link

Add support for SqlAlchemy statement in the SqlExecuteQueryOperator #40066

Open AlexisBRENON opened 4 months ago

AlexisBRENON commented 4 months ago

Description

Allow to pass a (list of) SqlAlchemy statement in addition to str to the sql parameter of the SQLExecuteQueryOperator.

Use case/motivation

I need to execute a query either from Airflow or manually. So I generate a SqlStatement (sa.select(...).where(...)), using bindparam. I can then do: session.execute(stmt, params=dict(data_interval_start=mydate)) for "manual" execution, and I expect to be able to do: SQLExecuteQueryOperator(sql=stmt, params=dict(data_interval_start="{{ data_interval_start }}")) for Airflow.

However, this construct fails, because Airflow operator expect a string. But passing sql=str(stmt) replaces all bind parameters following a schema like :data_interval_start, and this does not work for my Postgresql backend (psycopg2.errors.SyntaxError: syntax error at or near ":")

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

RNHTTR commented 4 months ago

This feels kinda risky from a security perspective. Could you use SqlAlchemy to dump the SA query to a SQL string instead?

AlexisBRENON commented 4 months ago

Can you explain which kind of security issues you see?

Actually yes, I finally found a way to dump the query as a SQL string. However, it requires that you know which kind of backend will execute the query.

import sqlalchemy as sa

stmt = sa.select(sa.bindparam("foo"))

from sqlalchemy.dialects import postgresql, sqlite, mysql

print("str: ", str(stmt))
print("compile(): ", str(stmt.compile()))
print("compile(postgresql): ", str(stmt.compile(dialect=postgresql.dialect())))
print("compile(mysql): ", str(stmt.compile(dialect=mysql.dialect())))
print("compile(sqlite): ", str(stmt.compile(dialect=sqlite.dialect())))

Will output:

str:  SELECT :foo AS anon_1
compile():  SELECT :foo AS anon_1
compile(postgresql):  SELECT %(foo)s AS anon_1
compile(mysql):  SELECT %s AS anon_1
compile(sqlite):  SELECT ? AS anon_1

As you can see, depending on your backend, the generated string is not the same. But I often use a sqlite backend during my dev/test process, while my production DB use Postgres. So it could be nice to abstract such kind of implementation, as long as your Airflow connection is a valid DbApi connection.