apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.13k stars 14.31k forks source link

Airflow doesn't fail fast in case of invalid executor_config or mutation hook #10278

Open aneesh-joseph opened 4 years ago

aneesh-joseph commented 4 years ago

Apache Airflow version: 1.10.10

Kubernetes version (if you are using kubernetes) (use kubectl version):

Environment:

What happened:

Airflow makes way too many API requests in case of invalid executor_config or mutation hook

What you expected to happen:

Airflow fails fast instead of sending way too many requests to the k8s API

How to reproduce it:

Update your pod mutation hook or executor_config such that it results in an invalid Pod request to the k8s API. Example use an executor_config like

executor_config = {
  "KubernetesExecutor": {
    "volumes": [
      {
        "name": "correct_name",
        "emptyDir": {}
      }
    ],
    "volume_mounts": [
      {
        "name": "incorrect_name",
        "mountPath": "/opt/airflow/mount"
      }
    ]
  }
}

Airflow will prepare a Pod request and send it to the k8s API, which will respond back suggesting that the Pod request is invalid,

[2020-08-10 15:42:16,631] {rest.py:228} DEBUG - response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod \"newdagtesttask1-92fff529de5b42cdbb153cc7bbfade9c\" is invalid: spec.containers[0].volumeMounts[3].name: Not found: \"incorrect_name\"","reason":"Invalid","details":{"name":"newdagtesttask1-92fff529de5b42cdbb153cc7bbfade9c","kind":"Pod","causes":[{"reason":"FieldValueNotFound","message":"Not found: \"incorrect_name\"","field":"spec.containers[0].volumeMounts[3].name"}]},"code":422}
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod \"newdagtesttask1-92fff529de5b42cdbb153cc7bbfade9c\" is invalid: spec.containers[0].volumeMounts[3].name: Not found: \"incorrect_name\"","reason":"Invalid","details":{"name":"newdagtesttask1-92fff529de5b42cdbb153cc7bbfade9c","kind":"Pod","causes":[{"reason":"FieldValueNotFound","message":"Not found: \"incorrect_name\"","field":"spec.containers[0].volumeMounts[3].name"}]},"code":422}

but Airflow will keep retrying the same request. I counted till 250 in couple of minutes and then deleted my test deployment :) It would be great if Airflow fails fast in such cases where the k8s API response suggests that the request is invalid

Anything else we need to know:

kaxil commented 3 years ago

cc @Dr-Denzy

Dr-Denzy commented 3 years ago

I am on it.

kaxil commented 3 years ago

This probably might be fixed by https://github.com/apache/airflow/pull/14323 for Airflow >= 2.0.2 , @Dr-Denzy will verify it and see if this fixed for 2.0.2 and above

Dr-Denzy commented 3 years ago

Reproduced this issue with Airflow 2.0.2 and 2.1.0dev0. Produced the error log below:


...
                "name": "airflow-fernet-key"
              }
            }
          },
          {
            "name": "AIRFLOW__CORE__SQL_ALCHEMY_CONN",
            "valueFrom": {
              "secretKeyRef": {
                "key": "connection",
                "name": "airflow-airflow-metadata"
              }
            }
          },
          {
            "name": "AIRFLOW_CONN_AIRFLOW_DB",
            "valueFrom": {
              "secretKeyRef": {
                "key": "connection",
                "name": "airflow-airflow-metadata"
              }
            }
          },
          {
            "name": "AIRFLOW_IS_K8S_EXECUTOR_POD",
            "value": "True"
          }
        ],
        "image": "k8s-dags:1.0.8",
        "imagePullPolicy": "IfNotPresent",
        "name": "base",
        "volumeMounts": [
          {
            "mountPath": "/opt/airflow/logs",
            "name": "airflow-logs"
          },
          {
            "mountPath": "/opt/airflow/airflow.cfg",
            "name": "config",
            "readOnly": true,
            "subPath": "airflow.cfg"
          },
          {
            "name": "incorrect_name",
            "mountPath": "/opt/airflow/mount"
          }
        ]
      }
    ],
    "restartPolicy": "Never",
    "securityContext": {
      "fsGroup": 0,
      "runAsUser": 50000
    },
    "serviceAccountName": "airflow-worker",
    "volumes": [
      {
        "emptyDir": {},
        "name": "airflow-logs"
      },
      {
        "configMap": {
          "name": "airflow-airflow-config"
        },
        "name": "config"
      },
      {
        "name": "correct_name",
        "emptyDir": {}
      }
    ]
  }
}
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/kubernetes/pod_launcher.py", line 81, in run_pod_async
    resp = self._client.create_namespaced_pod(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 6174, in create_namespaced_pod
    (data) = self.create_namespaced_pod_with_http_info(namespace, body, **kwargs)  # noqa: E501
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 6251, in create_namespaced_pod_with_http_info
    return self.api_client.call_api(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 340, in call_api
    return self.__call_api(resource_path, method,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 172, in __call_api
    response_data = self.request(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 382, in request
    return self.rest_client.POST(url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 272, in POST
    return self.request("POST", url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 231, in request
    raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (422)
Reason: Unprocessable Entity
HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'f9015e00-ae0c-4c93-ac2e-511661d3ef91', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'ac9cdcce-9878-432a-a4a3-e0b76e5acc54', 'Date': 'Mon, 26 Apr 2021 13:00:31 GMT', 'Content-Length': '1152'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod \"kpopodtemplatefiledagexecutorconfig10278.fc985e1fce7243b49f7bd6929b1b77f7\" is invalid: [spec.volumes[2[].name: Invalid value: \"correct_name\": a lowercase RFC 1123 label must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc', regex used for validation is '[a-z0-9[]([-a-z0-9[]*[a-z0-9[])?'), spec.containers[0[].volumeMounts[2[].name: Not found: \"incorrect_name\"]","reason":"Invalid","details":{"name":"kpopodtemplatefiledagexecutorconfig10278.fc985e1fce7243b49f7bd6929b1b77f7","kind":"Pod","causes":[{"reason":"FieldValueInvalid","message":"Invalid value: \"correct_name\": a lowercase RFC 1123 label must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc', regex used for validation is '[a-z0-9[]([-a-z0-9[]*[a-z0-9[])?')","field":"spec.volumes[2[].name"},{"reason":"FieldValueNotFound","message":"Not found: \"incorrect_name\"","field":"spec.containers[0[].volumeMounts[2[].name"}]},"code":422}
[2021-04-26 13:00:31,317[] {kubernetes_executor.py:563} WARNING - ApiException when attempting to run task, re-queueing. Message: Pod "kpopodtemplatefiledagexecutorconfig10278.fc985e1fce7243b49f7bd6929b1b77f7" is invalid: [spec.volumes[2[].name: Invalid value: "correct_name": a lowercase RFC 1123 label must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc', regex used for validation is '[a-z0-9[]([-a-z0-9[]*[a-z0-9[])?'), spec.containers[0[].volumeMounts[2[].name: Not found: "incorrect_name"]
[2021-04-26 13:00:31,318[] {dag_processing.py:385} DEBUG - Received message of type DagParsingStat
[2021-04-26 13:00:31,318[] {dag_processing.py:385} DEBUG - Received message of type DagParsingStat
[2021-04-26 13:00:31,328[] {scheduler_job.py:1399} DEBUG - Next timed event is in 0.360804
[2021-04-26 13:00:31,328[] {scheduler_job.py:1401} DEBUG - Ran scheduling loop in 0.07 seconds
[2021-04-26 13:00:31,692[] {scheduler_job.py:1588} DEBUG - Running SchedulerJob._create_dagruns_for_dags with retries. Try 1 of 3
[2021-04-26 13:00:31,706[] {scheduler_job.py:1569} DEBUG - Running SchedulerJob._get_dagmodels_and_create_dagruns with retries. Try 1 of 3
[2021-04-26 13:00:31,721[] {scheduler_job.py:1795} DEBUG - DAG kpo-pod-template-file-dag not changed structure, skipping dagrun.verify_integrity
[2021-04-26 13:00:31,725[] {dagrun.py:491} DEBUG - number of tis tasks for <DagRun kpo-pod-template-file-dag @ 2021-01-01 00:00:00+00:00: scheduled__2021-01-01T00:00:00+00:00, externally triggered: False>: 4 task(s)
[2021-04-26 13:00:31,725[] {dagrun.py:506} DEBUG - number of scheduleable tasks for <DagRun kpo-pod-template-file-dag @ 2021-01-01 00:00:00+00:00: scheduled__2021-01-01T00:00:00+00:00, externally triggered: False>: 1 task(s)
[2021-04-26 13:00:31,725[] {taskinstance.py:887} DEBUG - <TaskInstance: kpo-pod-template-file-dag.end 2021-01-01 00:00:00+00:00 [None[]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2021-04-26 13:00:31,725[] {taskinstance.py:887} DEBUG - <TaskInstance: kpo-pod-template-file-dag.end 2021-01-01 00:00:00+00:00 [None[]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2021-04-26 13:00:31,726[] {taskinstance.py:887} DEBUG - <TaskInstance: kpo-pod-template-file-dag.end 2021-01-01 00:00:00+00:00 [None[]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 2, 'successes': 1, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids={'task-one', 'executor_config_10278'}
[2021-04-26 13:00:31,726[] {taskinstance.py:867} DEBUG - Dependencies not met for <TaskInstance: kpo-pod-template-file-dag.end 2021-01-01 00:00:00+00:00 [None[]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 2, 'successes': 1, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids={'task-one', 'executor_config_10278'}
[2021-04-26 13:00:31,728[] {taskinstance.py:887} DEBUG - <TaskInstance: kpo-pod-template-file-dag.executor_config_10278 2021-01-01 00:00:00+00:00 [queued[]> dependency 'Not In Retry Period' PASSED: True, The context specified that being in a retry period was permitted.
[2021-04-26 13:00:31,729[] {taskinstance.py:887} DEBUG - <TaskInstance: kpo-pod-template-file-dag.executor_config_10278 2021-01-01 00:00:00+00:00 [queued[]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2021-04-26 13:00:31,729[] {taskinstance.py:877} DEBUG - Dependencies all met for <TaskInstance: kpo-pod-template-file-dag.executor_config_10278 2021-01-01 00:00:00+00:00 [queued[]>
[2021-04-26 13:00:31,729[] {scheduler_job.py:1823} DEBUG - Skipping SLA check for <DAG: kpo-pod-template-file-dag> because no tasks in DAG have SLAs
[2021-04-26 13:00:31,738[] {scheduler_job.py:940} DEBUG - No tasks to consider for execution.
[2021-04-26 13:00:31,740[] {base_executor.py:150} DEBUG - 1 running task instances
[2021-04-26 13:00:31,740[] {base_executor.py:151} DEBUG - 0 in queue
[2021-04-26 13:00:31,741[] {base_executor.py:152} DEBUG - 31 open slots
[2021-04-26 13:00:31,741[] {base_executor.py:161} DEBUG - Calling the <class 'airflow.executors.kubernetes_executor.KubernetesExecutor'> sync method
[2021-04-26 13:00:31,741[] {kubernetes_executor.py:510} DEBUG - self.running: {TaskInstanceKey(dag_id='kpo-pod-template-file-dag', task_id='executor_config_10278', execution_date=datetime.datetime(2021, 1, 1, 0, 0, tzinfo=Timezone('UTC')), try_number=1)}
[2021-04-26 13:00:31,741[] {kubernetes_executor.py:335} DEBUG - Syncing KubernetesExecutor
[2021-04-26 13:00:31,741[] {kubernetes_executor.py:261} DEBUG - KubeJobWatcher alive, continuing
[2021-04-26 13:00:31,743[] {kubernetes_executor.py:275} INFO - Kubernetes job is (TaskInstanceKey(dag_id='kpo-pod-template-file-dag', task_id='executor_config_10278', execution_date=datetime.datetime(2021, 1, 1, 0, 0, tzinfo=Timezone('UTC')), try_number=1), ['airflow', 'tasks', 'run', 'kpo-pod-template-file-dag', 'executor_config_10278', '2021-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/pod_template_dag.py'], {'api_version': 'v1',
 'kind': 'Pod',
 'metadata': {'annotations': None,
              'cluster_name': None,
              'creation_timestamp': None,
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': None,
              'initializers': None,
              'labels': None,
              'managed_fields': None,
              'name': None,
              'namespace': None,
              'owner_references': None,
              'resource_version': None,
              'self_link': None,
              'uid': None},
 'spec': {'active_deadline_seconds': None,
          'affinity': None,
          'automount_service_account_token': None,
          'containers': [{'args': [],
                          'command': [],
                          'env': [],
                          'env_from': [],
                          'image': None,
                          'image_pull_policy': None,
                          'lifecycle': None,
                          'liveness_probe': None,
                          'name': 'base',
                          'ports': [],
                          'readiness_probe': None,
                          'resources': None,
                          'security_context': None,
                          'stdin': None,
                          'stdin_once': None,
                          'termination_message_path': None,
                          'termination_message_policy': None,
                          'tty': None,
                          'volume_devices': None,
                          'volume_mounts': [{'mountPath': '/opt/airflow/mount',
                                             'name': 'incorrect_name'}],
                          'working_dir': None}],
          'dns_config': None,
          'dns_policy': None,
          'enable_service_links': None,
          'host_aliases': None,
          'host_ipc': None,
          'host_network': False,
          'host_pid': None,
          'hostname': None,
          'image_pull_secrets': [],
          'init_containers': None,
          'node_name': None,
          'node_selector': None,
          'preemption_policy': None,
          'priority': None,
          'priority_class_name': None,
          'readiness_gates': None,
          'restart_policy': None,
          'runtime_class_name': None,
          'scheduler_name': None,
          'security_context': None,
          'service_account': None,
          'service_account_name': None,
          'share_process_namespace': None,
          'subdomain': None,
          'termination_grace_period_seconds': None,
          'tolerations': None,
          'volumes': [{'emptyDir': {}, 'name': 'correct_name'}]},
 'status': None}, None)
  },
  "spec": {
    "affinity": {},
    "containers": [
      {
        "args": [
          "airflow",
          "tasks",
          "run",
          "kpo-pod-template-file-dag",
          "executor_config_10278",
          "2021-01-01T00:00:00+00:00",
          "--local",
          "--pool",
          "default_pool",
          "--subdir",
          "/opt/airflow/dags/pod_template_dag.py"
        ],
        "env": [
          {
            "name": "AIRFLOW__CORE__EXECUTOR",
            "value": "LocalExecutor"
          },
          {
            "name": "AIRFLOW__CORE__FERNET_KEY",
            "valueFrom": {
              "secretKeyRef": {
                "key": "fernet-key",
                "name": "airflow-fernet-key"
              }
            }
          },
          {
            "name": "AIRFLOW__CORE__SQL_ALCHEMY_CONN",
            "valueFrom": {
              "secretKeyRef": {
                "key": "connection",
                "name": "airflow-airflow-metadata"
              }
            }
          },
          {
            "name": "AIRFLOW_CONN_AIRFLOW_DB",
            "valueFrom": {
              "secretKeyRef": {
                "key": "connection",
                "name": "airflow-airflow-metadata"
              }
            }
          },
          {
            "name": "AIRFLOW_IS_K8S_EXECUTOR_POD",
            "value": "True"
          }
        ],
        "image": "k8s-dags:1.0.8",
        "imagePullPolicy": "IfNotPresent",
        "name": "base",
        "volumeMounts": [
          {
            "mountPath": "/opt/airflow/logs",
            "name": "airflow-logs"
          },
          {
            "mountPath": "/opt/airflow/airflow.cfg",
            "name": "config",
            "readOnly": true,
            "subPath": "airflow.cfg"
          },
          {
            "name": "incorrect_name",
            "mountPath": "/opt/airflow/mount"
          }
        ]
      }
    ],
    "restartPolicy": "Never",
    "securityContext": {
      "fsGroup": 0,
      "runAsUser": 50000
    },
    "serviceAccountName": "airflow-worker",
    "volumes": [
      {
        "emptyDir": {},
        "name": "airflow-logs"
      },
      {
        "configMap": {
          "name": "airflow-airflow-config"
        },
        "name": "config"
      },
      {
        "name": "correct_name",
        "emptyDir": {}
      }
    ]
  }
}
          },
          {
            "name": "AIRFLOW__CORE__FERNET_KEY",
            "valueFrom": {
              "secretKeyRef": {
                "key": "fernet-key",
                "name": "airflow-fernet-key"
              }
            }
          },
          {
            "name": "AIRFLOW__CORE__SQL_ALCHEMY_CONN",
            "valueFrom": {
              "secretKeyRef": {
                "key": "connection",
                "name": "airflow-airflow-metadata"
              }
            }
          },
          {
            "name": "AIRFLOW_CONN_AIRFLOW_DB",
            "valueFrom": {
              "secretKeyRef": {
                "key": "connection",
                "name": "airflow-airflow-metadata"
              }
            }
          },
          {
            "name": "AIRFLOW_IS_K8S_EXECUTOR_POD",
            "value": "True"
          }
        ],
        "image": "k8s-dags:1.0.8",
        "imagePullPolicy": "IfNotPresent",
        "name": "base",
        "volumeMounts": [
          {
            "mountPath": "/opt/airflow/logs",
            "name": "airflow-logs"
          },
          {
            "mountPath": "/opt/airflow/airflow.cfg",
            "name": "config",
            "readOnly": true,
            "subPath": "airflow.cfg"
          },
          {
            "name": "incorrect_name",
            "mountPath": "/opt/airflow/mount"
          }
        ]
      }
    ],
    "restartPolicy": "Never",
    "securityContext": {
      "fsGroup": 0,
      "runAsUser": 50000
    },
    "serviceAccountName": "airflow-worker",
    "volumes": [
      {
        "emptyDir": {},
        "name": "airflow-logs"
      },
      {
        "configMap": {
          "name": "airflow-airflow-config"
        },
        "name": "config"
      },
      {
        "name": "correct_name",
        "emptyDir": {}
      }
    ]
  }
}
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/kubernetes/pod_launcher.py", line 81, in run_pod_async
    resp = self._client.create_namespaced_pod(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 6174, in create_namespaced_pod
    (data) = self.create_namespaced_pod_with_http_info(namespace, body, **kwargs)  # noqa: E501
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 6251, in create_namespaced_pod_with_http_info
    return self.api_client.call_api(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 340, in call_api
    return self.__call_api(resource_path, method,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 172, in __call_api
    response_data = self.request(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 382, in request
    return self.rest_client.POST(url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 272, in POST
    return self.request("POST", url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 231, in request
    raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (422)
Reason: Unprocessable Entity
HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'f9015e00-ae0c-4c93-ac2e-511661d3ef91', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'ac9cdcce-9878-432a-a4a3-e0b76e5acc54', 'Date': 'Mon, 26 Apr 2021 13:00:31 GMT', 'Content-Length': '1152'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod \"kpopodtemplatefiledagexecutorconfig10278.46f14279bf9c48799b809e17d7d72eea\" is invalid: [spec.volumes[2[].name: Invalid value: \"correct_name\": a lowercase RFC 1123 label must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc', regex used for validation is '[a-z0-9[]([-a-z0-9[]*[a-z0-9[])?'), spec.containers[0[].volumeMounts[2[].name: Not found: \"incorrect_name\"]","reason":"Invalid","details":{"name":"kpopodtemplatefiledagexecutorconfig10278.46f14279bf9c48799b809e17d7d72eea","kind":"Pod","causes":[{"reason":"FieldValueInvalid","message":"Invalid value: \"correct_name\": a lowercase RFC 1123 label must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc', regex used for validation is '[a-z0-9[]([-a-z0-9[]*[a-z0-9[])?')","field":"spec.volumes[2[].name"},{"reason":"FieldValueNotFound","message":"Not found: \"incorrect_name\"","field":"spec.containers[0[].volumeMounts[2[].name"}]},"code":422}
[2021-04-26 13:00:31,769[] {kubernetes_executor.py:563} WARNING - ApiException when attempting to run task, re-queueing. Message: Pod "kpopodtemplatefiledagexecutorconfig10278.46f14279bf9c48799b809e17d7d72eea" is invalid: [spec.volumes[2[].name: Invalid value: "correct_name": a lowercase RFC 1123 label must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc', regex used for validation is '[a-z0-9[]([-a-z0-9[]*[a-z0-9[])?'), spec.containers[0[].volumeMounts[2[].name: Not found: "incorrect_name"]
[2021-04-26 13:00:31,792[] {scheduler_job.py:1399} DEBUG - Next timed event is in 1.342707
[2021-04-26 13:00:31,792[] {scheduler_job.py:1401} DEBUG - Ran scheduling loop in 0.10 seconds
[2021-04-26 13:00:32,120[] {settings.py:292} DEBUG - Disposing DB connection pool (PID 261)
[2021-04-26 13:00:32,127[] {scheduler_job.py:310} DEBUG - Waiting for <ForkProcess name='DagFileProcessor18-Process' pid=261 parent=43 stopped exitcode=0>
[2021-04-26 13:00:32,148[] {settings.py:292} DEBUG - Disposing DB connection pool (PID 265)
[2021-04-26 13:00:32,154[] {scheduler_job.py:310} DEBUG - Waiting for <ForkProcess name='DagFileProcessor19-Process' pid=265 parent=43 stopped exitcode=0>
[2021-04-26 13:00:32,794[] {scheduler_job.py:1588} DEBUG - Running SchedulerJob._create_dagruns_for_dags with retries. Try 1 of 3
[2021-04-26 13:00:32,807[] {scheduler_job.py:1569} DEBUG - Running SchedulerJob._get_dagmodels_and_create_dagruns with retries. Try 1 of 3
[2021-04-26 13:00:32,828[] {scheduler_job.py:1795} DEBUG - DAG kpo-pod-template-file-dag not changed structure, skipping dagrun.verify_integrity
[2021-04-26 13:00:32,833[] {dagrun.py:491} DEBUG - number of tis tasks for <DagRun kpo-pod-template-file-dag @ 2021-01-01 00:00:00+00:00: scheduled__2021-01-01T00:00:00+00:00, externally triggered: False>: 4 task(s)
[2021-04-26 13:00:32,833[] {dagrun.py:506} DEBUG - number of scheduleable tasks for <DagRun kpo-pod-template-file-dag @ 2021-01-01 00:00:00+00:00: scheduled__2021-01-01T00:00:00+00:00, externally triggered: False>: 1 task(s)
[2021-04-26 13:00:32,833[] {taskinstance.py:887} DEBUG - <TaskInstance: kpo-pod-template-file-dag.end 2021-01-01 00:00:00+00:00 [None[]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2021-04-26 13:00:32,833[] {taskinstance.py:887} DEBUG - <TaskInstance: kpo-pod-template-file-dag.end 2021-01-01 00:00:00+00:00 [None[]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2021-04-26 13:00:32,834[] {taskinstance.py:887} DEBUG - <TaskInstance: kpo-pod-template-file-dag.end 2021-01-01 00:00:00+00:00 [None[]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 2, 'successes': 1, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids={'task-one', 'executor_config_10278'}
[2021-04-26 13:00:32,834[] {taskinstance.py:867} DEBUG - Dependencies not met for <TaskInstance: kpo-pod-template-file-dag.end 2021-01-01 00:00:00+00:00 [None[]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 2, 'successes': 1, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids={'task-one', 'executor_config_10278'}
[2021-04-26 13:00:32,837[] {taskinstance.py:887} DEBUG - <TaskInstance: kpo-pod-template-file-dag.executor_config_10278 2021-01-01 00:00:00+00:00 [queued[]> dependency 'Not In Retry Period' PASSED: True, The context specified that being in a retry period was permitted.
[2021-04-26 13:00:32,837[] {taskinstance.py:887} DEBUG - <TaskInstance: kpo-pod-template-file-dag.executor_config_10278 2021-01-01 00:00:00+00:00 [queued[]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2021-04-26 13:00:32,837[] {taskinstance.py:877} DEBUG - Dependencies all met for <TaskInstance: kpo-pod-template-file-dag.executor_config_10278 2021-01-01 00:00:00+00:00 [queued[]>
[2021-04-26 13:00:32,837[] {scheduler_job.py:1823} DEBUG - Skipping SLA check for <DAG: kpo-pod-template-file-dag> because no tasks in DAG have SLAs
[2021-04-26 13:00:32,848[] {scheduler_job.py:940} DEBUG - No tasks to consider for execution.
[2021-04-26 13:00:32,850[] {base_executor.py:150} DEBUG - 1 running task instances
[2021-04-26 13:00:32,851[] {base_executor.py:151} DEBUG - 0 in queue
[2021-04-26 13:00:32,851[] {base_executor.py:152} DEBUG - 31 open slots
[2021-04-26 13:00:32,851[] {base_executor.py:161} DEBUG - Calling the <class 'airflow.executors.kubernetes_executor.KubernetesExecutor'> sync method
[2021-04-26 13:00:32,851[] {kubernetes_executor.py:510} DEBUG - self.running: {TaskInstanceKey(dag_id='kpo-pod-template-file-dag', task_id='executor_config_10278', execution_date=datetime.datetime(2021, 1, 1, 0, 0, tzinfo=Timezone('UTC')), try_number=1)}
[2021-04-26 13:00:32,851[] {kubernetes_executor.py:335} DEBUG - Syncing KubernetesExecutor
[2021-04-26 13:00:32,851[] {kubernetes_executor.py:261} DEBUG - KubeJobWatcher alive, continuing
[2021-04-26 13:00:32,853[] {kubernetes_executor.py:275} INFO - Kubernetes job is (TaskInstanceKey(dag_id='kpo-pod-template-file-dag', task_id='executor_config_10278', execution_date=datetime.datetime(2021, 1, 1, 0, 0, tzinfo=Timezone('UTC')), try_number=1), ['airflow', 'tasks', 'run', 'kpo-pod-template-file-dag', 'executor_config_10278', '2021-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/pod_template_dag.py'], {'api_version': 'v1',
 'kind': 'Pod',
 'metadata': {'annotations': None,
              'cluster_name': None,
              'creation_timestamp': None,
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': None,
              'initializers': None,
              'labels': None,
              'managed_fields': None,
              'name': None,
              'namespace': None,
              'owner_references': None,
              'resource_version': None,
              'self_link': None,
              'uid': None},
 'spec': {'active_deadline_seconds': None,
          'affinity': None,
          'automount_service_account_token': None,
          'containers': [{'args': [],
                          'command': [],
                          'env': [],
                          'env_from': [],
                          'image': None,
                          'image_pull_policy': None,
                          'lifecycle': None,
                          'liveness_probe': None,
                          'name': 'base',
                          'ports': [],
                          'readiness_probe': None,
                          'resources': None,
                          'security_context': None,
                          'stdin': None,
                          'stdin_once': None,
                          'termination_message_path': None,
                          'termination_message_policy': None,
                          'tty': None,
                          'volume_devices': None,
                          'volume_mounts': [{'mountPath': '/opt/airflow/mount',
                                             'name': 'incorrect_name'}],
                          'working_dir': None}],
          'dns_config': None,
          'dns_policy': None,
          'enable_service_links': None,
          'host_aliases': None,
          'host_ipc': None,
          'host_network': False,
          'host_pid': None,
          'hostname': None,
          'image_pull_secrets': [],
          'init_containers': None,
          'node_name': None,
          'node_selector': None,
          'preemption_policy': None,
          'priority': None,
          'priority_class_name': None,
          'readiness_gates': None,
          'restart_policy': None,
          'runtime_class_name': None,
          'scheduler_name': None,
          'security_context': None,
          'service_account': None,
          'service_account_name': None,
          'share_process_namespace': None,
          'subdomain': None,
          'termination_grace_period_seconds': None,
          'tolerations': None,
          'volumes': [{'emptyDir': {}, 'name': 'correct_name'}]},
 'status': None}, None)
  },
  "spec": {
    "affinity": {},
    "containers": [
      {
        "args": [
          "airflow",
          "tasks",
          "run",
          "kpo-pod-template-file-dag",
          "executor_config_10278",
          "2021-01-01T00:00:00+00:00",
          "--local",
          "--pool",
          "default_pool",
          "--subdir",
          "/opt/airflow/dags/pod_template_dag.py"
        ],
        "env": [
          {
            "name": "AIRFLOW__CORE__EXECUTOR",
            "value": "LocalExecutor"
          },
          {
            "name": "AIRFLOW__CORE__FERNET_KEY",
            "valueFrom": {
              "secretKeyRef": {
                "key": "fernet-key",
                "name": "airflow-fernet-key"
              }
            }
          },
          {
            "name": "AIRFLOW__CORE__SQL_ALCHEMY_CONN",
            "valueFrom": {
              "secretKeyRef": {
                "key": "connection",
                "name": "airflow-airflow-metadata"
              }
            }
          },
          {
            "name": "AIRFLOW_CONN_AIRFLOW_DB",
            "valueFrom": {
              "secretKeyRef": {
                "key": "connection",
                "name": "airflow-airflow-metadata"
              }
            }
          },
          {
            "name": "AIRFLOW_IS_K8S_EXECUTOR_POD",
            "value": "True"
          }
        ],
        "image": "k8s-dags:1.0.8",
        "imagePullPolicy": "IfNotPresent",
        "name": "base",
        "volumeMounts": [
          {
            "mountPath": "/opt/airflow/logs",
            "name": "airflow-logs"
          },
          {
            "mountPath": "/opt/airflow/airflow.cfg",
            "name": "config",
            "readOnly": true,
            "subPath": "airflow.cfg"
          },
          {
            "name": "incorrect_name",
            "mountPath": "/opt/airflow/mount"
          }
        ]
      }
    ],
    "restartPolicy": "Never",
    "securityContext": {
      "fsGroup": 0,
      "runAsUser": 50000
    },
    "serviceAccountName": "airflow-worker",
    "volumes": [
      {
        "emptyDir": {},
        "name": "airflow-logs"
      },
      {
        "configMap": {
          "name": "airflow-airflow-config"
        },
        "name": "config"
      },
      {
        "name": "correct_name",
        "emptyDir": {}
      }
    ]
  }
}
              "secretKeyRef": {
                "key": "fernet-key",
                "name": "airflow-fernet-key"
              }
            }
          },
          {
            "name": "AIRFLOW__CORE__SQL_ALCHEMY_CONN",
            "valueFrom": {
              "secretKeyRef": {
                "key": "connection",
                "name": "airflow-airflow-metadata"
              }
            }
          },
          {
            "name": "AIRFLOW_CONN_AIRFLOW_DB",
            "valueFrom": {
              "secretKeyRef": {
                "key": "connection",
                "name": "airflow-airflow-metadata"
              }
            }
          },
          {
            "name": "AIRFLOW_IS_K8S_EXECUTOR_POD",
            "value": "True"
          }
        ],
        "image": "k8s-dags:1.0.8",
        "imagePullPolicy": "IfNotPresent",
        "name": "base",
        "volumeMounts": [
          {
            "mountPath": "/opt/airflow/logs",
            "name": "airflow-logs"
          },
          {
            "mountPath": "/opt/airflow/airflow.cfg",
            "name": "config",
            "readOnly": true,
            "subPath": "airflow.cfg"
          },
          {
            "name": "incorrect_name",
            "mountPath": "/opt/airflow/mount"
          }
        ]
      }
    ],
    "restartPolicy": "Never",
    "securityContext": {
      "fsGroup": 0,
      "runAsUser": 50000
    },
    "serviceAccountName": "airflow-worker",
    "volumes": [
      {
        "emptyDir": {},
        "name": "airflow-logs"
      },
      {
        "configMap": {
          "name": "airflow-airflow-config"
        },
        "name": "config"
      },
      {
        "name": "correct_name",
        "emptyDir": {}
      }
    ]
  }
}
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/kubernetes/pod_launcher.py", line 81, in run_pod_async
    resp = self._client.create_namespaced_pod(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 6174, in create_namespaced_pod
    (data) = self.create_namespaced_pod_with_http_info(namespace, body, **kwargs)  # noqa: E501
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 6251, in create_namespaced_pod_with_http_info
    return self.api_client.call_api(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 340, in call_api
    return self.__call_api(resource_path, method,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 172, in __call_api
    response_data = self.request(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 382, in request
    return self.rest_client.POST(url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 272, in POST
    return self.request("POST", url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 231, in request
    raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (422)
Reason: Unprocessable Entity
HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'f9015e00-ae0c-4c93-ac2e-511661d3ef91', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'ac9cdcce-9878-432a-a4a3-e0b76e5acc54', 'Date': 'Mon, 26 Apr 2021 13:00:32 GMT', 'Content-Length': '1152'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod \"kpopodtemplatefiledagexecutorconfig10278.752db5fe1a6849b18bf853e89d6ba727\" is invalid: [spec.volumes[2[].name: Invalid value: \"correct_name\": a lowercase RFC 1123 label must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc', regex used for validation is '[a-z0-9[]([-a-z0-9[]*[a-z0-9[])?'), spec.containers[0[].volumeMounts[2[].name: Not found: \"incorrect_name\"]","reason":"Invalid","details":{"name":"kpopodtemplatefiledagexecutorconfig10278.752db5fe1a6849b18bf853e89d6ba727","kind":"Pod","causes":[{"reason":"FieldValueInvalid","message":"Invalid value: \"correct_name\": a lowercase RFC 1123 label must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc', regex used for validation is '[a-z0-9[]([-a-z0-9[]*[a-z0-9[])?')","field":"spec.volumes[2[].name"},{"reason":"FieldValueNotFound","message":"Not found: \"incorrect_name\"","field":"spec.containers[0[].volumeMounts[2[].name"}]},"code":422}
[2021-04-26 13:00:32,869[] {kubernetes_executor.py:563} WARNING - ApiException when attempting to run task, re-queueing. Message: Pod "kpopodtemplatefiledagexecutorconfig10278.752db5fe1a6849b18bf853e89d6ba727" is invalid: [spec.volumes[2[].name: Invalid value: "correct_name": a lowercase RFC 1123 label must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc', regex used for validation is '[a-z0-9[]([-a-z0-9[]*[a-z0-9[])?'), spec.containers[0[].volumeMounts[2[].name: Not found: "incorrect_name"]
[2021-04-26 13:00:32,870[] {dag_processing.py:385} DEBUG - Received message of type DagParsingStat
[2021-04-26 13:00:32,870[] {dag_processing.py:385} DEBUG - Received message of type DagParsingStat
[2021-04-26 13:00:32,870[] {dag_processing.py:385} DEBUG - Received message of type DagParsingStat
[2021-04-26 13:00:32,870[] {dag_processing.py:385} DEBUG - Received message of type DagParsingStat
[2021-04-26 13:00:32,880[] {scheduler_job.py:1399} DEBUG - Next timed event is in 0.254282
[2021-04-26 13:00:32,881[] {scheduler_job.py:1401} DEBUG - Ran scheduling loop in 0.09 seconds
[2021-04-26 13:00:33,136[] {scheduler_job.py:1588} DEBUG - Running SchedulerJob._create_dagruns_for_dags with retries. 
...
Dr-Denzy commented 3 years ago

dag file used:


from airflow import DAG
from datetime import datetime, timedelta
from airflow.example_dags.libs.helper import print_stuff
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow import configuration as conf

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

namespace = conf.get('kubernetes', 'NAMESPACE')

if namespace =='default':
    config_file = '/usr/local/airflow/include/.kube/config'
    in_cluster=False
else:
    in_cluster=True
    config_file=None

with DAG('kpo-pod-template-file-dag', schedule_interval='@once', default_args=default_args) as dag:
    start = DummyOperator(task_id='start')

    kpo = KubernetesPodOperator(
        namespace=namespace,
        image="hello-world",
        pod_template_file="/opt/airflow/dags/privileged_runner.yaml",
        labels={"foo": "bar"},
        name="airflow-test-pod",
        task_id="task-one",
        in_cluster=in_cluster, 
        cluster_context='kind', 
        config_file=config_file,
        is_delete_operator_pod=False,
        get_logs=True)

    executor_config_10278 = PythonOperator(
            task_id="executor_config_10278",
            python_callable=print_stuff,
            executor_config={
                "KubernetesExecutor": {
                    "volumes": [
                        {
                            "name": "correct_name",
                            "emptyDir": {}
                        }
                    ],
                    "volume_mounts": [
                        {
                            "name": "incorrect_name",
                            "mountPath": "/opt/airflow/mount"
                        }
                    ]
                }
            }
        )

    end = DummyOperator(task_id='end')

    start >> kpo >> end
    start >> executor_config_10278 >> end
eladkal commented 3 years ago

@Dr-Denzy is there an issue here or we can close as can't reproduce?

JieChenAtPonyai commented 2 years ago

This probably might be fixed by #14323 for Airflow >= 2.0.2 , @Dr-Denzy will verify it and see if this fixed for 2.0.2 and above

I encountered this issue too. #14323 doesn't fast fail when File "/home/airflow/.local/lib/python3.8/site-packages/airflow/kubernetes/pod_launcher.py", line 81, in run_pod_async resp = self._client.create_namespaced_pod( throws exceptions.

potiuk commented 2 years ago

Would you like to provide a fix for that @JieChenAtPonyai ? Seems you have an easy way to test and reproduce it and you could become one of > 2000 contributors to Airflow.