apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

Bigquery operator gets "[Errno 32] Broken pipe" after getting 401 error #16288

Closed eladshabi closed 3 years ago

eladshabi commented 3 years ago

Hi,

When using the Bigquery operator on Cloud Composer, and the query takes more than 1 hour, we get an "[Errno 32] Broken pipe" after getting a 401 error.

The 401 error appeared 1 hour after the task was created, and it looks the root cause is an expired API token.

Important to note that the BQ job itself keeps running, although the airflow task failed.

Please look at relevant log parts from the moment that the job BQ job was triggered (irrelevant information was removed):

[2021-05-27 02:30:26,577] {base_task_runner.py:98} INFO - Subtask: [2021-05-27 02:30:26,576] {discovery.py:852} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/<PROJECT-ID>/jobs/<BQ-JOB-ID>?alt=json

[2021-05-27 02:30:26,633] {base_task_runner.py:98} INFO - Subtask: [2021-05-27 02:30:26,633] {bigquery_hook.py:856} INFO - Waiting for job to complete : <PROJECT-ID>, <BQ-JOB-ID>
.
.
.
[2021-05-27 03:25:09,683] {base_task_runner.py:98} INFO - Subtask: [2021-05-27 03:25:09,682] {discovery.py:852} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/<PROJECT-ID>/jobs/<BQ-JOB-ID>?alt=json
[2021-05-27 03:25:09,711] {base_task_runner.py:98} INFO - Subtask: [2021-05-27 03:25:09,689] {client.py:631} INFO - Refreshing due to a 401 (attempt 1/2)
[2021-05-27 03:25:09,858] {base_task_runner.py:98} INFO - Subtask: [2021-05-27 03:25:09,858] {sendgrid.py:126} INFO - Email with subject Airflow alert: <...> [failed]> is successfully sent to recipients: <...>
[2021-05-27 03:25:09,898] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/bin/airflow", line 7, in <module>
[2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask:     exec(compile(f.read(), __file__, 'exec'))
[2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/bin/airflow", line 27, in <module>
[2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2021-05-27 03:25:09,899] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/bin/cli.py", line 392, in run
[2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
[2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/models.py", line 1492, in _run_raw_task
[2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
[2021-05-27 03:25:09,900] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line 98, in execute
[2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:     self.create_disposition, self.query_params)
[2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 499, in run_query
[2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:     return self.run_with_configuration(configuration)
[2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 846, in run_with_configuration
[2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:     jobId=self.running_job_id).execute()
[2021-05-27 03:25:09,901] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/oauth2client/util.py", line 135, in positional_wrapper
[2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask:     return wrapped(*args, **kwargs)
[2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 833, in execute
[2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask:     method=str(self.method), body=self.body, headers=self.headers)
[2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 173, in _retry_request
[2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask:     raise exception
[2021-05-27 03:25:09,902] {base_task_runner.py:98} INFO - Subtask: socket.error: [Errno 32] Broken pipe

Cloud Composer version - composer-1.7.2, airflow version - 1.9.0.

Thanks

boring-cyborg[bot] commented 3 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

eladkal commented 3 years ago

You are using an old Airflow version with an old version of the operator. It's possible that the issue has been resolved since. Can you please check if the issue is reproducable with latest operator version in google provider?

eladshabi commented 3 years ago

You are using an old Airflow version with an old version of the operator. It's possible that the issue has been resolved since. Can you please check if the issue is reproducable with latest operator version in google provider?

@eladkal The issue is hard to reproduce since we will need to run a 1 hour BQ query and it depends on the allocated BQ slot. Normally, the query takes 20 minutes. I'll try to reproduce it and will update this thread.

Moreover, I've tried to find any bug fix on newer versions, but I didn't find any.

eladshabi commented 3 years ago

@eladkal I've reproduced the issue on the Biguqery operator by running a loop on the SQL[1] for 80 minutes.

On version 1.9.0 - the task was failed with the same error.

On version 1.10.15 - the task ran successfully without any errors.

Looks like this bug fixed in a newer airflow version.

Thanks!

[1]:

DECLARE x timestamp;
set x = CURRENT_TIMESTAMP();
LOOP
  IF TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), x, MINUTE) >= 80 THEN
    LEAVE;
  END IF;
END LOOP;