apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.4k stars 14.11k forks source link

BigQueryOperator never uses the location parameter #10016

Closed muscovitebob closed 4 years ago

muscovitebob commented 4 years ago

Apache Airflow version: composer-1.10.4-airflow-1.10.6

Kubernetes version (if you are using kubernetes) (use kubectl version):

Server Version: version.Info{Major:"1", Minor:"14+", GitVersion:"v1.14.10-gke.42", GitCommit:"42bef28c2031a74fc68840fce56834ff7ea08518", GitTreeState:"clean", BuildDate:"2020-06-02T16:07:00Z", GoVersion:"go1.12.12b4", Compiler:"gc", Platform:"linux/amd64"}

What happened and What you think went wrong:

BigQueryOperator does not use the location parameter in order to specify query job location. Instead, it retrieves the automatically determined location from the HTTP request.

This happens because of the following code:

        jobs = self.service.jobs()
        job_data = {'configuration': configuration}

        # Send query and wait for reply.
        query_reply = jobs \
            .insert(projectId=self.project_id, body=job_data) \
            .execute(num_retries=self.num_retries)
        self.running_job_id = query_reply['jobReference']['jobId']
        if 'location' in query_reply['jobReference']:
            location = query_reply['jobReference']['location']
        else:
            location = self.location

The configuration block does not contain a location. The subsequent call in query_reply apparently triggers some internal BigQuery logic to detect the location. This in practice falls back to US more often than not, leading to the job to quit with an error saying the datasets/tables referenced in the query do not exist. Specifying the location argument, e.g. location='EU' in the operator is thus not obeyed.

What you expected to happen: Specifying location as a BigQueryOperator argument leads to execution of the query job in the correct location.

How to reproduce it: Set up a project and dataset in EU containing an example table.

Then, with an initialised local Airflow (airflow initdb) that has been supplied with GCP/BigQuery default connection details, you may run the following code:

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.models import TaskInstance

dag = DAG(dag_id="anydag", start_date=datetime.now())
task = BigQueryOperator(
   dag=dag,
   task_id="query_task",
   name="query_task",
   write_disposition="WRITE_TRUNCATE",
   use_legacy_sql=False,
   destination_dataset_table=f"example_project.example_dataset.example_table",
   sql="select * from `example_project.example_dataset.example_input_table`",
   location="US"
)
ti = TaskInstance(task=task, execution_date=datetime.now())
task.execute(ti.get_template_context())

The location parameter will probably not be respected, instead your job will execute in EU.

Occasionally, regardless of location specified, your job will execute in US. This is difficult to reliably reproduce as it appears to be flaky and depend on which location the BigQuery service itself decided the query should run in.

boring-cyborg[bot] commented 4 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

turbaszek commented 4 years ago

Thanks for reporting this issue. We are aware of this limitation and we believe it's already solved on the master. If you wish to use new operators please check https://github.com/apache/airflow#backport-packages

Here's a guide for the new operators: https://airflow.readthedocs.io/en/latest/howto/operator/google/cloud/bigquery.html

muscovitebob commented 4 years ago

Thanks for getting back to me so quickly @turbaszek. I will try installing the back port package for Google operators on my Cloud Composer instance.

muscovitebob commented 4 years ago

I seem to be unable to import from the back ports package having installed it for local testing.

(.venv) user@IMB dir % pip freeze
airflow-plugins==0.0.0
alembic==1.4.2
apache-airflow==1.10.11
apache-airflow-backport-providers-google==2020.6.24
...
(.venv) user@IMB dir % python
Python 3.7.7 (default, Mar 10 2020, 15:43:33) 
[Clang 11.0.0 (clang-1100.0.33.17)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ModuleNotFoundError: No module named 'airflow.providers'

Is there a trick or step I am missing here?

I've installed the packages via python setup.py install, my setup.py looks like:

dependencies = [
    "apache-airflow[gcp]~=1.10.6",
    "pymongo~=3.10.1",
    "google-cloud-bigquery~=1.25.0",
    "google-cloud-storage~=1.25.0",
    "apache-airflow-backport-providers-google==2020.6.24"
]

setuptools.setup(
    install_requires=dependencies,
    packages=setuptools.find_packages(),
    python_requires=">=3.7",
    version="0.0.0",
)

I see this is mentioned in the readme, actually. Looking into it.

muscovitebob commented 4 years ago

I fixed this error by switching to a requirements.txt file and installing these dependencies via pip install -r requirements.txt, for now.

muscovitebob commented 4 years ago

I see that the location attribute is indeed correctly obeyed with the plug-and-play replacement for BigQueryOperator in the backports package, BigQueryExecuteQueryOperator, in my local testing.

muscovitebob commented 4 years ago

And I am facing the same ModuleNotFoundError after installing the backports package onto Cloud Composer via the standard PyPi package installation mechanism and attempting to import it in a DAG. Looking into how to perform the symlink from the readme on the Composer environment.

muscovitebob commented 4 years ago

At the moment I have a scheduler pod and several Celery executor worker pods running in Composer. I have created a symlink inside the scheduler pod to Google's custom mounted Airflow version:

airflow@airflow-scheduler-7bd674b5d9-sb68h:/usr/local/lib/airflow/airflow$ pip freeze | grep airflow
# Editable install with no version control (apache-airflow===1.10.6-composer)
-e /usr/local/lib/airflow
apache-airflow-backport-providers-google==2020.6.24
airflow@airflow-scheduler-7bd674b5d9-sb68h:~$ cd /usr/local/lib/airflow/airflow
airflow@airflow-scheduler-7bd674b5d9-sb68h:/usr/local/lib/airflow/airflow$ sudo ln -s /opt/python3.6/lib/python3.6/site-packages/airflow/providers providers

This gets rid of the import warning, but I now get DAG seems to be missing. on the GUI when I try to navigate to the DAG page. In the scheduler logs I see instances of the following uninformative error:

[2020-07-27 14:56:55,216] {dagbag.py:246} ERROR - Failed to import: /home/airflow/gcs/dags/dag.py
  File "/home/airflow/gcs/dags/dag.py", line 6, in <module>
[2020-07-27 14:57:07,527] {dagbag.py:407} INFO - Filling up the DagBag from /home/airflow/gcs/dags/dag.py

There are no errors specific to the DAG in question in the workers far as I can see. I suspect that I also need to create the same symlink on the workers for this to work.

Of course this setup is rather brittle as the symlink will be destroyed with each pod rotation, especially common for the workers.

I know this is now a Cloud Composer specific issue, so I will migrate further exploration of making this work to the Cloud Composer User Group. Please see here for the discussion.

turbaszek commented 4 years ago

Closing as the reported problem itself seems to be solved

mik-laj commented 4 years ago

@muscovitebob please upgrade Cloud Composer to the latest version.

Old environments do not support these packages.

June 24, 2020 Airflow Providers can now be installed inside Cloud Composer.

muscovitebob commented 4 years ago

Thanks for the heads up @mik-laj, I indeed had some success after upgrading as I mentioned in the linked Composer User Group post :)