apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.96k stars 14.26k forks source link

SnowflakeSqlApiOperator not resolving parameters in SQL #42033

Closed chris-okorodudu closed 1 month ago

chris-okorodudu commented 1 month ago

Apache Airflow version

2.9.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

The SnowflakeSqlApiOperator does not resolve parameters in SQL despite accepting this param:

:param parameters: (optional) the parameters to render the SQL query with.

This is due to the fact that it executes by initializing a SnowflakeSqlApiHook and then executing the queries without ever passing the parameters:

        self._hook = SnowflakeSqlApiHook(
            snowflake_conn_id=self.snowflake_conn_id,
            token_life_time=self.token_life_time,
            token_renewal_delta=self.token_renewal_delta,
            deferrable=self.deferrable,
            **self.hook_params,
        )
        self.query_ids = self._hook.execute_query(
            self.sql,  # type: ignore[arg-type]
            statement_count=self.statement_count,
            bindings=self.bindings,
        )

This means that parameters passed in and then referenced how they would be in other Snowflake operators - %(param)s - will not be resolved and cause the execution to fail.

What you think should happen instead?

The parameters should be resolved either before the sql is passed to the SnowflakeSqlApiHook, or as part of the SnowflakeSqlApiHook.

How to reproduce

To reproduce, try passing any parameter and referencing it in your SQL via this syntax %(param)s

Operating System

all

Versions of Apache Airflow Providers

Tested with multiple versions, most recently

apache-airflow-providers-snowflake==5.6.1

### Deployment

Official Apache Airflow Helm Chart

### Deployment details

Tested this both with the official helm chart locally and with MWAA. Issue occurred in both.

### Anything else?

I was able to get it working by adding these lines 
    quoted_params = {k: f"'{v}'" for k, v in self.parameters.items()}
    rendered_sql = self.sql % quoted_params
    self.sql = rendered_sql
before the call to 
    self.query_ids = self._hook.execute_query(
        self.sql,  # type: ignore[arg-type]
        statement_count=self.statement_count,
        bindings=self.bindings,
    )


However, it may make more sense for the parameters to be passed to execute_query and resolved there, which would require an update to the SnowflakeSqlApiHook instead. Let me know if there's some other way to pass the params that I'm missing.

### Are you willing to submit PR?

- [X] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
boring-cyborg[bot] commented 1 month ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

harjeevanmaan commented 1 month ago

@chris-okorodudu You can add bindings as a keyword argument within self._hook.execute_query.

Here is an example:

"bindings": {
  "1": {
    "type": "FIXED",
    "value": "123"
  }
}

or more details on the correct format, please refer to the following article: sql-api-bind-variables Please be mindful that Snowflake does not currently support bindings in multi-statement SQL requests.

potiuk commented 1 month ago

While no entirely fixed - seems that this is on the snowflake side and #42719 at least provides an explanation.