apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.02k stars 14.27k forks source link

Getting Unauthorized error messages with GKEStartPodOperator if pod execution is over 1 hour #31648

Closed sgomezf closed 1 year ago

sgomezf commented 1 year ago

Apache Airflow version

2.6.1

What happened

After installing 2.6.1 with fix https://github.com/apache/airflow/pull/31391 we could see our DAGs running normally, except that when they take more than 60 minutes, it stops reporting the log/status, and even if task is completed within job pod, is always marked as failure due to the "Unauthorized" errors.

Log of job running starting and authenticating (some info redacted):

[2023-05-31, 07:00:17 UTC] {base.py:73} INFO - Using connection ID 'gcp_conn' for task execution.
[2023-05-31, 07:00:17 UTC] {kubernetes_engine.py:288} INFO - Fetching cluster (project_id=<PROJECT-ID>, location=<REGION>, cluster_name=<CLUSTER-NAME>)
[2023-05-31, 07:00:17 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2023-05-31, 07:00:17 UTC] {_default.py:213} DEBUG - Checking None for explicit credentials as part of auth process...
[2023-05-31, 07:00:17 UTC] {_default.py:186} DEBUG - Checking Cloud SDK credentials as part of auth process...
[2023-05-31, 07:00:17 UTC] {_default.py:192} DEBUG - Cloud SDK credentials not found on disk; not using them
[2023-05-31, 07:00:17 UTC] {_http_client.py:104} DEBUG - Making request: GET http://169.254.169.254
[2023-05-31, 07:00:17 UTC] {_http_client.py:104} DEBUG - Making request: GET http://metadata.google.internal/computeMetadata/v1/project/project-id
[2023-05-31, 07:00:17 UTC] {requests.py:192} DEBUG - Making request: GET http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/?recursive=true
[2023-05-31, 07:00:17 UTC] {requests.py:192} DEBUG - Making request: GET http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/airflow@<PROJECT-ID>.iam.gserviceaccount.com/token?scopes=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform
[2023-05-31, 07:00:17 UTC] {pod.py:769} DEBUG - Creating pod for KubernetesPodOperator task update
[2023-05-31, 07:00:17 UTC] {pod.py:850} INFO - Building pod mydag-vfqdiqzm with labels: {'dag_id': 'mydag', 'task_id': 'update', 'run_id': 'scheduled__2023-05-30T0700000000-fa9d70c83', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2023-05-31, 07:00:17 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-05-31, 07:00:17 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2023-05-31, 07:00:17 UTC] {_default.py:213} DEBUG - Checking None for explicit credentials as part of auth process...
[2023-05-31, 07:00:17 UTC] {_default.py:186} DEBUG - Checking Cloud SDK credentials as part of auth process...
[2023-05-31, 07:00:17 UTC] {_default.py:192} DEBUG - Cloud SDK credentials not found on disk; not using them
[2023-05-31, 07:00:17 UTC] {_http_client.py:104} DEBUG - Making request: GET http://169.254.169.254
[2023-05-31, 07:00:17 UTC] {_http_client.py:104} DEBUG - Making request: GET http://metadata.google.internal/computeMetadata/v1/project/project-id
[2023-05-31, 07:00:17 UTC] {requests.py:192} DEBUG - Making request: GET http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/?recursive=true
[2023-05-31, 07:00:17 UTC] {requests.py:192} DEBUG - Making request: GET http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/airflow@<PROJECT-ID>.iam.gserviceaccount.com/token?scopes=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform
[2023-05-31, 07:00:17 UTC] {rest.py:231} DEBUG - response body: {"kind":"PodList","apiVersion":"v1","metadata":{"resourceVersion":"797683242"},"items":[]}
[2023-05-31, 07:00:17 UTC] {pod.py:500} DEBUG - Starting pod:
api_version: v1
kind: Pod
metadata:
  annotations: {}
  cluster_name: null
...

Periodically we can see heartbeats and status:

[2023-05-31, 07:06:48 UTC] {rest.py:231} DEBUG - response body: {"kind":"Pod","apiVersion":"v1","metadata":{"name":"mydag-vfqdiqzm","namespace":"airflow-namespace","uid":"314deb6c-3c2b-41ae-b49f-1e0c89cf6950","resourceVersion":"797683421","creationTimestamp":"2023-05-31T07:00:17Z"...<REDACTING DETAILS POD>}
[2023-05-31, 07:06:48 UTC] {taskinstance.py:789} DEBUG - Refreshing TaskInstance <TaskInstance: mydag.update scheduled__2023-05-30T07:00:00+00:00 [running]> from DB
[2023-05-31, 07:06:48 UTC] {job.py:213} DEBUG - [heartbeat]

Exactly at the 1 hour mark this error occurs:

[2023-05-31, 08:00:55 UTC] {rest.py:231} DEBUG - response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}
[2023-05-31, 08:00:56 UTC] {rest.py:231} DEBUG - response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}
[2023-05-31, 08:00:58 UTC] {taskinstance.py:789} DEBUG - Refreshing TaskInstance <TaskInstance: mydag.update scheduled__2023-05-30T07:00:00+00:00 [running]> from DB
[2023-05-31, 08:00:58 UTC] {rest.py:231} DEBUG - response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}
[2023-05-31, 08:00:58 UTC] {job.py:213} DEBUG - [heartbeat]
[2023-05-31, 08:00:58 UTC] {rest.py:231} DEBUG - response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}
[2023-05-31, 08:00:58 UTC] {pod.py:905} ERROR - (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'a9cfb5cd-9915-4490-8813-e8392a0e20d2', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Wed, 31 May 2023 08:00:58 GMT', 'Content-Length': '129'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 543, in execute_sync
    self.pod_manager.fetch_container_logs(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 361, in fetch_container_logs
    last_log_time = consume_logs(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 339, in consume_logs
    for raw_line in logs:
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 166, in __iter__
    if not self.logs_available():
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 182, in logs_available
    remote_pod = self.read_pod()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 200, in read_pod
    self.read_pod_cache = self.pod_manager.read_pod(self.pod)
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 289, in wrapped_f
    return self(f, *args, **kw)
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 379, in __call__
    do = self.iter(retry_state=retry_state)
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 325, in iter
    raise retry_exc.reraise()
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 158, in reraise
    raise self.last_attempt.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 382, in __call__
    result = fn(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 490, in read_pod
    return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace)
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py", line 23483, in read_namespaced_pod
    return self.read_namespaced_pod_with_http_info(name, namespace, **kwargs)  # noqa: E501
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py", line 23570, in read_namespaced_pod_with_http_info
    return self.api_client.call_api(
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 373, in request
    return self.rest_client.GET(url,
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/rest.py", line 240, in GET
    return self.request("GET", url,
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/rest.py", line 234, in request
    raise ApiException(http_resp=r)

Job retries with same result and is marked as failed.

What you think should happen instead

Pod continues to report log and status of pod until completion (even if it takes over 1 hr), and job is marked as successful.

How to reproduce

Create a DAG that makes use of GKEStartPodOperator with a task that will take over one hour.

Operating System

cos_coaintainerd

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==5.2.2 apache-airflow-providers-google==10.1.1

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

hussein-awala commented 1 year ago

I believe #31651 could help in increasing the lifetime of the GCP token. However, I have labaled it as a new feature related to this issue, rather than a fix which closes it. Can you try it?

While addressing this, I am trying to find a clean method for refreshing the GCP token within the GKEPodHook when it's expired.

sgomezf commented 1 year ago

I got some errors in DAG import regarding deprecated objets in cncf so I upgraded to apache-airflow-providers-cncf-kubernetes==7.0.0 Updated connections to be (I use workload identity):

google-cloud-platform://?lifetime=7200&num_retries=5 Installed the google provider with code in fix, and got the following error:

[2023-06-01, 15:48:04 UTC] {__init__.py:155} DEBUG - inlets: [], outlets: []
[2023-06-01, 15:48:04 UTC] {crypto.py:83} WARNING - empty cryptography key - values will not be stored encrypted.
[2023-06-01, 15:48:04 UTC] {base.py:73} INFO - Using connection ID 'gcp_conn' for task execution.
[2023-06-01, 15:48:04 UTC] {kubernetes_engine.py:288} INFO - Fetching cluster (project_id=<PROJECT-ID>, location=<LOCATION>, cluster_name=<CLUSTER-NAME>)
[2023-06-01, 15:48:04 UTC] {credentials_provider.py:364} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2023-06-01, 15:48:04 UTC] {_default.py:213} DEBUG - Checking None for explicit credentials as part of auth process...
[2023-06-01, 15:48:04 UTC] {_default.py:186} DEBUG - Checking Cloud SDK credentials as part of auth process...
[2023-06-01, 15:48:04 UTC] {_default.py:192} DEBUG - Cloud SDK credentials not found on disk; not using them
[2023-06-01, 15:48:04 UTC] {_http_client.py:104} DEBUG - Making request: GET http://169.254.169.254
[2023-06-01, 15:48:04 UTC] {_http_client.py:104} DEBUG - Making request: GET http://metadata.google.internal/computeMetadata/v1/project/project-id
[2023-06-01, 15:48:04 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/tmp/apache-airflow-providers-google-10.1.1/airflow/providers/google/cloud/operators/kubernetes_engine.py", line 501, in execute
    self.fetch_cluster_info()
  File "/tmp/apache-airflow-providers-google-10.1.1/airflow/providers/google/cloud/operators/kubernetes_engine.py", line 506, in fetch_cluster_info
    cluster = self.cluster_hook.get_cluster(
  File "/tmp/apache-airflow-providers-google-10.1.1/airflow/providers/google/common/hooks/base_google.py", line 484, in inner_wrapper
    return func(self, *args, **kwargs)
  File "/tmp/apache-airflow-providers-google-10.1.1/airflow/providers/google/cloud/hooks/kubernetes_engine.py", line 295, in get_cluster
    return self.get_cluster_manager_client().get_cluster(
  File "/tmp/apache-airflow-providers-google-10.1.1/airflow/providers/google/cloud/hooks/kubernetes_engine.py", line 95, in get_cluster_manager_client
    self._client = ClusterManagerClient(credentials=self.get_credentials(), client_info=CLIENT_INFO)
  File "/tmp/apache-airflow-providers-google-10.1.1/airflow/providers/google/common/hooks/base_google.py", line 297, in get_credentials
    credentials, _ = self.get_credentials_and_project_id()
  File "/tmp/apache-airflow-providers-google-10.1.1/airflow/providers/google/common/hooks/base_google.py", line 273, in get_credentials_and_project_id
    credentials, project_id = get_credentials_and_project_id(
  File "/tmp/apache-airflow-providers-google-10.1.1/airflow/providers/google/cloud/utils/credentials_provider.py", line 373, in get_credentials_and_project_id
    return _CredentialProvider(*args, **kwargs).get_credentials_and_project()
  File "/tmp/apache-airflow-providers-google-10.1.1/airflow/providers/google/cloud/utils/credentials_provider.py", line 269, in get_credentials_and_project
    source_credentials=credentials.source_credentials,
AttributeError: 'Credentials' object has no attribute 'source_credentials'
joonsan-kim-moloco commented 1 year ago

I got some errors in DAG import regarding deprecated objets in cncf so I upgraded to apache-airflow-providers-cncf-kubernetes==7.0.0 Updated connections to be (I use workload identity):

google-cloud-platform://?lifetime=7200&num_retries=5 Installed the google provider with code in fix, and got the following error:

...
AttributeError: 'Credentials' object has no attribute 'source_credentials'

FYI, https://github.com/apache/airflow/pull/31651/files#r1213959948

sgomezf commented 1 year ago

For now we have downgraded to 2.5.1. But I'm guessing the issue is that credentials are not being refreshed, is there a workaround for this issue?

hussein-awala commented 1 year ago

For now we have downgraded to 2.5.1. But I'm guessing the issue is that credentials are not being refreshed, is there a workaround for this issue?

I'm working on a new solution, but I have a small question, does it work with 2.5.1?

Jlrine2 commented 1 year ago

I'm on Google Cloud Composer with airflow version 2.5.1 and and seeing this issue still

chasdevs commented 1 year ago

We are on Google Cloud Composer and experience this issue on composer-2.3.1-airflow-2.5.1 but not on composer-2.1.15-airflow-2.5.1, without a consistent pattern in terms of time. The tasks work for a while and stream logs back from the pods, but then we get this error message. The launched pod continues to run successfully but the connection to the Airflow worker is broken and the task fails.

nanchano commented 1 year ago

Same issue on composer-2.3.2-airflow-2.5.1. Given it's not possible to downgrade the environment, we'll need to wait for a fix.

glevineLeap commented 1 year ago

We are on Google Cloud Composer and experience this issue on composer-2.3.1-airflow-2.5.1 but not on composer-2.1.15-airflow-2.5.1, without a consistent pattern in terms of time. The tasks work for a while and stream logs back from the pods, but then we get this error message. The launched pod continues to run successfully but the connection to the Airflow worker is broken and the task fails.

Want to confirm the latest "safe" version. Looks like the issue was introduced somewhere between 2.1.15 and 2.3.1. Anyone running a 2.2.x release able to vouch for its stability with regards to this?

potiuk commented 1 year ago

We are on Google Cloud Composer and experience this issue on composer-2.3.1-airflow-2.5.1 but not on composer-2.1.15-airflow-2.5.1, without a consistent pattern in terms of time. The tasks work for a while and stream logs back from the pods, but then we get this error message. The launched pod continues to run successfully but the connection to the Airflow worker is broken and the task fails.

Want to confirm the latest "safe" version. Looks like the issue was introduced somewhere between 2.1.15 and 2.3.1. Anyone running a 2.2.x release able to vouch for its stability with regards to this?

Question: Did anyone raise a ticket to Composer about that one? For me this looks awffully like a change in composer networking not in Airflow. Did anyone try to downgrade (or upgrade) google provider on composer to a version that was installed in "working" version of composer?

I guess in any case - if you have problem with a specific version of a google provider, the whole idea of providers is that you should be able to upgrade/downgrade providers to different version rather independently from airflow core version.

Can anyone in the conversation make such a check and narrow it down - whether it is a version with "composer release of a certain airflow version" or "specific version of a google provider" and report back which combination of airlfow/google provider work?

I have a feeling that the discusion is happening not where it shoudl (but I saw @hussein-awala chiming in before so maybe it has already been settled that this is the fix referred to at the very beginning? Do you think this is it @hussein-awala ?

hussein-awala commented 1 year ago

I have a feeling that the discusion is happening not where it shoudl (but I saw @hussein-awala chiming in before so maybe it has already been settled that this is the fix referred to at the very beginning? Do you think this is it @hussein-awala ?

@potiuk I believe there's a bug in GKEStartPodOperator related to #29266.

Before the PR mentioned, we used to create a temporary configuration file by running the command gcloud container clusters get-credentials. We would then use this file in the Kubernetes client. The get-credentials command configures kubectl to automatically refresh its credentials using the same identity as gcloud.

However, the PR introduced a new pod GKE hook that uses the following client code:

ApiClient(
    configuration,
    header_name="Authorization",
    header_value=f"Bearer {access_token}",
)

This client uses static oauth2 credentials and cannot refresh them when they expire. Since the default lifetime for GCP credentials is 3600 seconds, the operator fails with an Unauthorized exception after one hour.

I'm currently working on fixing this issue and hope to have it resolved before the next providers wave.

Want to confirm the latest "safe" version. Looks like the issue was introduced somewhere between 2.1.15 and 2.3.1. Anyone running a 2.2.x release able to vouch for its stability with regards to this?

@glevineLeap This issue is not related to the Airflow version, but rather to the version of the Google provider. All versions prior to 9.0.0 should work without any problems.

hussein-awala commented 1 year ago

I created this draft PR #32333, I will not have time to test it before Wednesday, if someone can test it on a GKE it would be great. I will also test if it masks the passed kubeconfig dict in the logs and the Webserver UI.

fdemiane commented 1 year ago

@hussein-awala I tested your fix but it didn't work. I created this draft PR #32673 , where I override the refresh_api_key_hook method to refresh the tokens before API calls. It fixed the issue from my side. I wasn't able to reproduce for deferrable mode, so I didn't add the fix there.

JonnyDaenen commented 1 year ago

FYI, for other people who end up here, I posted on SO how to fix this using the latest release candidate: https://stackoverflow.com/questions/76812106/cloud-composer-gke-coupling-broken-since-composer-2-3-5-airflow-2-5-3/76817137#76817137