apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.49k stars 14.13k forks source link

BigQueryUpdateTableSchemaOperator does not accept "location" parameter #40235

Closed mariayanovskaya closed 3 months ago

mariayanovskaya commented 3 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.6.3

What happened?

I am working in Cloud Composer on GCP. We are using composer-2.4.6-airflow-2.6.3.

I am building a script that is expected to use the BigQueryUpdateTableSchemaOperator to change the schema of a BigQuery table. The company I work for has very strict rule that all resources and jobs used in GCP are located in Europe, in respect of GDPR regulations. The BigQueryUpdateTableSchemaOperator in documentation is meant to accept "location" as one of the parameters. When I try to pass the code below Airflow raises DAG Import Error:

Broken DAG: [/home/airflow/gcs/dags/dags/ingestion_pipelines/ingestion_pipelines.py] Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 429, in apply_defaults result = func(self, kwargs, default_args=default_args) File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 788, in init raise AirflowException( airflow.exceptions.AirflowException: Invalid arguments were passed to BigQueryUpdateTableSchemaOperator (task_id: update_schema). Invalid arguments were: kwargs: {'location': 'EU'}

Code:

table_schema = {
    "name": "column",
    "type": "TIMESTAMP",
    "description": "column",
}

update_schema = BigQueryUpdateTableSchemaOperator(
    task_id="update_schema",
    dataset_id="dataset",
    table_id="table_name",
    project_id="PROJECT_ID",
    schema_fields_updates=table_schema,
    include_policy_tags=True,
    location="EU",
    impersonation_chain="SERVICE_ACCOUNT",
    retries=0,
)

What you think should happen instead?

The DAG should load without errors.

How to reproduce

upload this code into the bucket that is connected in your Cloud Composer.

from airflow.decorators import dag
from airflow.decorators import task
from airflow.decorators import task_group
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.bigquery import BigQueryUpdateTableSchemaOperator

@dag(
    dag_id="update_table",
    max_active_runs=1,
    start_date=datetime(2023, 1, 1),
    catchup=False,
)
def task_flow():
    @task_group(group_id="database")
    def task_groups():

        #this is the bigquery table
        dataset="covid19_ecdc_eu"
        table_name="covid_19_geographic_distribution_worldwide"
        project_id="bigquery-public-data"

        table_schema = {
            "name": "date",
            "description": "date column",
        }

        update_schema = BigQueryUpdateTableSchemaOperator(
                        task_id="update_bronze_schema_for_incr",
                        dataset_id=dataset,
                        table_id=table_name,
                        project_id=project_id,
                        schema_fields_updates=table_schema,
                        include_policy_tags=True,
                        location="EU",
                        retries=0,
                    )
        chain(
            update_schema
        )

    task_groups()

task_flow()

Operating System

macOS Sonoma 14.5 23F79

Versions of Apache Airflow Providers

https://cloud.google.com/composer/docs/concepts/versioning/composer-versions#:~:text=composer%2D2.4.6%2Dairflow%2D2.6.3

Deployment

Google Cloud Composer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 3 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.