apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.21k stars 13.76k forks source link

RedshiftDataOperator fails when `return_sql_result` is true, and SQL statements are provided #40427

Open thesuperzapper opened 3 days ago

thesuperzapper commented 3 days ago

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

Affects all current versions of apache-airflow-providers-amazon, including 8.24.0

Apache Airflow version

N/A - all versions

Operating System

NA

Deployment

Other

Deployment details

No response

What happened

There is a bug in RedshiftDataOperator if multiple sql queries are passed, and return_sql_result is set to true, then you will get the following error:

An error occurred (ValidationException) when calling the GetStatementResult operation: BatchExecuteStatement result can only be retrieved with sub-statement id.

What you think should happen instead

No response

How to reproduce

Run a RedshiftDataOperator like this:


from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
run_query = RedshiftDataOperator(
    task_id="run_query",
    aws_conn_id="MY_AWS_CONNECTION",
    #
    # Redshift parameters
    cluster_identifier="MY_REDSHIFT_CLUSTER",
    region="MY_REGION",
    database="MY_DB",
    sql=[
        "SELECT 1;",
        "SELECT 2;",
        "SELECT 3;",
    ],
    #
    # Redshift Data API parameters
    return_sql_result=True,
    statement_name="SOME_NAME",
    secret_arn=(
        "arn:aws:secretsmanager"
        ":MY_REGION:MY_ACCOUNT"
        ":secret:MY_DATA_API_CREDENTIAL_SECRET"
    ),
    #
    # Trigger parameters
    wait_for_completion=True,
    poll_interval=10,
)

Anything else

To fix this we will need to record the length of sql list, and loop through get_statement_result for each sub-statement id because you can only get one statement result at a time.

For example, if there are three, you would get the results of the following queries, starting from :1:


There are TWO difference places where we call get_statement_result that need to be updated:

  1. For non-deferred mode in execute():
  2. __For deferred mode in execute_complete()__

Are you willing to submit PR?

Code of Conduct

isatyamks commented 2 days ago

Hello @thesuperzapper,

I have made some changes to the RedshiftDataOperator to address the issue with handling multiple SQL statements. Please review my pull request https://github.com/apache/airflow/pull/40443 and let me know if my approach is correct.