apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.15k stars 14.04k forks source link

GKEStartPodOperator Error pod "No agent available" #40995

Open Cir02 opened 1 month ago

Cir02 commented 1 month ago

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==10.12.0

Apache Airflow version

2.8.0

Operating System

Windows 11

Deployment

Docker-Compose

Deployment details

Docker compose from official website

What happened

try to launch operator GKEStartPodOperator, to launch a simple test again GKE cluster in GCP. Using service account with GKE admin role.

task code:

JOB_NAME = "test-pi"
JOB_NAME_DEF = "test-pi-def"
JOB_NAMESPACE = "default"

    job_task_def =GKEStartPodOperator(
        task_id="job_task_def",
        project_id=GCP_PROJECT_ID,
        location=GCP_LOCATION,
        cluster_name=CLUSTER_NAME,
        namespace="default",
        image="bash:5.2.26",
        cmds=["echo"],
        arguments=["Hello world"],
        get_logs=True,
        name=JOB_NAME_DEF,
        in_cluster=True,     
        on_finish_action="delete_pod",
    )

After connection works, launch error message:

7edbcaf9e48a
*** Found local files:
***   * /opt/airflow/logs/dag_id=kubernetes_engine_job/run_id=manual__2024-07-24T11:08:43.561181+00:00/task_id=job_task_def/attempt=1.log
[2024-07-24, 11:08:46 UTC] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: kubernetes_engine_job.job_task_def manual__2024-07-24T11:08:43.561181+00:00 [queued]>
[2024-07-24, 11:08:46 UTC] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: kubernetes_engine_job.job_task_def manual__2024-07-24T11:08:43.561181+00:00 [queued]>
[2024-07-24, 11:08:46 UTC] {taskinstance.py:2171} INFO - Starting attempt 1 of 1
[2024-07-24, 11:08:46 UTC] {taskinstance.py:2192} INFO - Executing <Task(GKEStartPodOperator): job_task_def> on 2024-07-24 11:08:43.561181+00:00
[2024-07-24, 11:08:46 UTC] {standard_task_runner.py:60} INFO - Started process 3212 to run task
[2024-07-24, 11:08:46 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'kubernetes_engine_job', 'job_task_def', 'manual__2024-07-24T11:08:43.561181+00:00', '--job-id', '28', '--raw', '--subdir', 'DAGS_FOLDER/test_gke.py', '--cfg-path', '/tmp/tmpo1z_1xcf']
[2024-07-24, 11:08:46 UTC] {standard_task_runner.py:88} INFO - Job 28: Subtask job_task_def
[2024-07-24, 11:08:46 UTC] {task_command.py:423} INFO - Running <TaskInstance: kubernetes_engine_job.job_task_def manual__2024-07-24T11:08:43.561181+00:00 [running]> on host 7edbcaf9e48a
[2024-07-24, 11:08:46 UTC] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='kubernetes_engine_job' AIRFLOW_CTX_TASK_ID='job_task_def' AIRFLOW_CTX_EXECUTION_DATE='2024-07-24T11:08:43.561181+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-07-24T11:08:43.561181+00:00'
[2024-07-24, 11:08:46 UTC] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
[2024-07-24, 11:08:46 UTC] {connection.py:234} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2024-07-24, 11:08:46 UTC] {base.py:83} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-07-24, 11:08:46 UTC] {kubernetes_engine.py:285} INFO - Fetching cluster (project_id=***, location=***, cluster_name=***)
[2024-07-24, 11:08:47 UTC] {pod.py:974} INFO - Building pod test-pi-def with labels: {'dag_id': 'kubernetes_engine_job', 'task_id': 'job_task_def', 'run_id': 'manual__2024-07-24T110843.5611810000-715622171', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2024-07-24, 11:08:47 UTC] {connection.py:234} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2024-07-24, 11:08:47 UTC] {base.py:83} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-07-24, 11:08:48 UTC] {pod.py:549} INFO - Found matching pod test-pi-def with labels {'airflow_kpo_in_cluster': 'False', 'airflow_version': '2.8.0', 'dag_id': 'kubernetes_engine_job', 'kubernetes_pod_operator': 'True', 'run_id': 'manual__2024-07-24T110843.5611810000-715622171', 'task_id': 'job_task_def', 'try_number': '1'}
[2024-07-24, 11:08:48 UTC] {pod.py:550} INFO - `try_number` of task_instance: 1
[2024-07-24, 11:08:48 UTC] {pod.py:551} INFO - `try_number` of pod: 1
[2024-07-24, 11:08:48 UTC] {pod_manager.py:359} WARNING - Pod not yet started: test-pi-def
[2024-07-24, 11:08:49 UTC] {pod_manager.py:359} WARNING - Pod not yet started: test-pi-def
[2024-07-24, 11:08:50 UTC] {pod_manager.py:359} WARNING - Pod not yet started: test-pi-def
[2024-07-24, 11:08:51 UTC] {pod_manager.py:359} WARNING - Pod not yet started: test-pi-def
[2024-07-24, 11:08:55 UTC] {pod.py:841} INFO - Skipping deleting pod: test-pi-def
[2024-07-24, 11:08:55 UTC] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 613, in execute_sync
    self.pod_manager.fetch_requested_container_logs(
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 537, in fetch_requested_container_logs
    status = self.fetch_container_logs(pod=pod, container_name=c, follow=follow_logs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 462, in fetch_container_logs
    last_log_time = consume_logs(since_time=last_log_time)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 409, in consume_logs
    logs = self.read_pod_logs(
  File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 289, in wrapped_f
    return self(f, *args, **kw)
  File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 379, in __call__
    do = self.iter(retry_state=retry_state)
  File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 325, in iter
    raise retry_exc.reraise()
  File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 158, in reraise
    raise self.last_attempt.result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 382, in __call__
    result = fn(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 623, in read_pod_logs
    logs = self._client.read_namespaced_pod_log(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 23747, in read_namespaced_pod_log
    return self.read_namespaced_pod_log_with_http_info(name, namespace, **kwargs)  # noqa: E501
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 23866, in read_namespaced_pod_log_with_http_info
    return self.api_client.call_api(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 373, in request
    return self.rest_client.GET(url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 240, in GET
    return self.request("GET", url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 234, in request
    raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (500)
Reason: Internal Server Error
HTTP response headers: HTTPHeaderDict({'Audit-Id': '***', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Wed, 24 Jul 2024 11:08:55 GMT', 'Content-Length': '224'})
HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Get \\"***/containerLogs/default/test-pi-def/base?follow=true\\u0026timestamps=true\\": No agent available","code":500}\n'

What you think should happen instead

No response

How to reproduce

Create autopilot cluster in GCP

Crearte service account with GKE Admin role

Create docker compose airflow in local and insert service account key

create dag with GKEStartPodOperator

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 month ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.