LamaAni / KubernetesJobOperator

An airflow operator that executes a task in a kubernetes cluster, given a kubernetes yaml configuration or an image refrence.
57 stars 8 forks source link

Any specific setup on kubernetes? #74

Closed Elsayed91 closed 1 year ago

Elsayed91 commented 1 year ago

I developed a pipeline using this lovely package locally, truly loved it, however upon transitioning all my code to run on Kubernetes, I have not had much success. I get this error (Official airflow docker image + pip3 install airflow_kubernetes_job_operator

Broken DAG: [/git/repo/components/airflow/dags/full_refresh_dag.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kubernetes_legacy_job_operator.py", line 15, in <module>
    from airflow_kubernetes_job_operator.kubernetes_legacy_pod_generators import (
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kubernetes_legacy_pod_generators.py", line 21, in <module>
    from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv
ModuleNotFoundError: No module named 'airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env'

Now I have tried using different airflow and different python versions, no avail. I have tried to patch these files (files in backcompact dir) by loading them into a dockerfile, although not optimal, but it did in fact bypass this error, however upon running the task I got return [k for k in all_kinds if k.api_version == "v1" or k.api_version in apis] TypeError: argument of type 'NoneType' is not iterable (queries.py) I have tried different files, pods, jobs, nothing works.

Everything works smoothy still with same code synced from the same github repo on my windows pc and my Linux laptop, which makes me suspect that the image itself might be an issue, however I 've had no time to test out that theory. I have sadly reverted to the KubernetesPodOperator but I hope you find this helpful.

Thank you for your wonderful work.

LamaAni commented 1 year ago

Ah, this is a bug actually. The issue is with the legacy operator .. its looking for a deprecated package. The error is that when loading the "PodRuntimeInfoEnv" its not using the proper location for the pod.

Questions:

  1. What operator are you using?
  2. What airflow version are you using?
  3. What image are you using?
Elsayed91 commented 1 year ago

1. What operator are you using? KubernetesJobOperator - no legacy operators 2. What airflow version are you using? I have tried 2.5.0 and 2.4.1 (which I have locally) 3. What image are you using? public airflow image

I have just tried with the following image (I don't use slim but I opted for it to be able to download the same packages as the ones I have locally, since they are usually pre-installed in the normal version) and I did not get the legacy error

FROM apache/airflow:slim-2.5.0-python3.10
RUN pip install kubernetes==23.6.0 apache-airflow-providers-cncf-kubernetes==4.4.0 airflow-kubernetes-job-operator==2.0.8 psycopg2-binary==2.9.5

however the 2nd error still happens

airflow-6b946445bd-d6gsl
*** Log file does not exist: /opt/airflow/logs/dag_id=full-refresh/run_id=manual__2022-12-15T20:39:54.994293+00:00/task_id=example_kubernetes_job_operator/attempt=1.log
*** Fetching from: http://airflow-6b946445bd-d6gsl:8793/log/dag_id=full-refresh/run_id=manual__2022-12-15T20:39:54.994293+00:00/task_id=example_kubernetes_job_operator/attempt=1.log

[2022-12-15, 20:39:56 UTC] {taskinstance.py:1087} INFO - Dependencies all met for <TaskInstance: full-refresh.example_kubernetes_job_operator manual__2022-12-15T20:39:54.994293+00:00 [queued]>
[2022-12-15, 20:39:56 UTC] {taskinstance.py:1087} INFO - Dependencies all met for <TaskInstance: full-refresh.example_kubernetes_job_operator manual__2022-12-15T20:39:54.994293+00:00 [queued]>
[2022-12-15, 20:39:56 UTC] {taskinstance.py:1283} INFO - 
--------------------------------------------------------------------------------
[2022-12-15, 20:39:56 UTC] {taskinstance.py:1284} INFO - Starting attempt 1 of 1
[2022-12-15, 20:39:56 UTC] {taskinstance.py:1285} INFO - 
--------------------------------------------------------------------------------
[2022-12-15, 20:39:56 UTC] {taskinstance.py:1304} INFO - Executing <Task(KubernetesJobOperator): example_kubernetes_job_operator> on 2022-12-15 20:39:54.994293+00:00
[2022-12-15, 20:39:56 UTC] {standard_task_runner.py:55} INFO - Started process 453 to run task
[2022-12-15, 20:39:56 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'full-refresh', 'example_kubernetes_job_operator', 'manual__2022-12-15T20:39:54.994293+00:00', '--job-id', '5', '--raw', '--subdir', 'DAGS_FOLDER/full_refresh_dag.py', '--cfg-path', '/tmp/tmpzdpue9_q']
[2022-12-15, 20:39:56 UTC] {standard_task_runner.py:83} INFO - Job 5: Subtask example_kubernetes_job_operator
[2022-12-15, 20:39:56 UTC] {task_command.py:389} INFO - Running <TaskInstance: full-refresh.example_kubernetes_job_operator manual__2022-12-15T20:39:54.994293+00:00 [running]> on host airflow-6b946445bd-d6gsl
[2022-12-15, 20:39:56 UTC] {taskinstance.py:1511} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=full-refresh
AIRFLOW_CTX_TASK_ID=example_kubernetes_job_operator
AIRFLOW_CTX_EXECUTION_DATE=2022-12-15T20:39:54.994293+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-12-15T20:39:54.994293+00:00
[2022-12-15, 20:39:56 UTC] {taskinstance.py:1772} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_kubernetes_job_operator/kubernetes_job_operator.py", line 373, in execute
    rslt = self.job_runner.execute_job(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_kubernetes_job_operator/job_runner.py", line 302, in execute_job
    watchable_kinds = GetAPIVersions.get_existing_api_kinds(self.client, all_kinds)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 410, in get_existing_api_kinds
    return [k for k in all_kinds if k.api_version == "v1" or k.api_version in apis]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 410, in <listcomp>
    return [k for k in all_kinds if k.api_version == "v1" or k.api_version in apis]
TypeError: argument of type 'NoneType' is not iterable

the yaml file looks like this

apiVersion: v1
kind: Pod
metadata:
  name: transferpod
spec:
  template:
    metadata:
      labels:
        app: transferpod
  nodeSelector:
    iam.gke.io/gke-metadata-server-enabled: "true"
  containers:
    - name: transferpod
      image: "{{ job.IMAGE }}" 
      resources:  {}

and the dag like this

    t1 = KubernetesJobOperator(
        task_id="example_kubernetes_job_operator",
        body_filepath=f"{dir_path}/manifests/transferjob.yaml",
        in_cluster=True,
        jinja_job_args={
            "IMAGE": f"eu.gcr.io/{GOOGLE_CLOUD_PROJECT}/transferpod",
        },
    )

let me know if you need further info

LamaAni commented 1 year ago

Hi, I think this is still an issue. The legacy support was extended to before airflow 2.0.0, but this legacy period should now be over (Otherwise I'll be chasing this all my life). I therefore removed support for legacy before 2.0.0.

Fix in this PR. May take an hour or two https://github.com/LamaAni/KubernetesJobOperator/pull/75

(The PR contains a linting fix as well... so may be a little cluttered)

LamaAni commented 1 year ago

Fixed in https://github.com/LamaAni/KubernetesJobOperator/releases/tag/2.0.9

Please notify when you have tested and close this Issue?

LamaAni commented 1 year ago

I was able to execute with the following dockerfile:

FROM apache/airflow

RUN pip3 install --user airflow-kubernetes-job-operator
Elsayed91 commented 1 year ago

Hello! I can confirm that the legacy issue has stopped happening across different setups with the latest version. thanks a lot! However I am still facing the issue with the _all_kinds_ being none.

I have tried the following to troubleshoot this:

  1. using body option with a yaml string
  2. I have used the example in the repo
    # First resource: this resource will be tracked by the operator. Other resources will not be tracked.
    apiVersion: batch/v1
    kind: Job
    metadata:
    name: test-job # not required. Will be a prefix to task name
    finalizers:
    - foregroundDeletion
    spec:
    template:
    metadata:
      labels:
        app: test-task-pod
    spec:
      restartPolicy: Never
      containers:
        - name: job-executor
          image: ubuntu
          command:
            - bash
            - -c
            - |
              #/usr/bin/env bash
              echo "OK"
    backoffLimit: 0
    ---
    apiVersion: v1
    kind: Service
    metadata:
    name: test-service # not required, will be a prefex to task name.
    spec:
    selector:
    app: test-task-pod
    ports:
    - port: 8080
      targetPort: 8080
  3. I have foregone the yamls all together and tried out this
    job_task = KubernetesJobOperator(
        task_id="from-image",
        dag=dag,
        image="ubuntu",
        command=["bash", "-c", 'echo "all ok"'],
    )

    It still fails instantly with the same message.

  4. I have even disabled gitsync and baked dags into the image, but nada. all while using the following of course
    
    FROM apache/airflow

RUN pip3 install --user airflow-kubernetes-job-operator


After a lot of squinting, I have nothing to show for it, I cannot think of why 2 exact images would behave differently, I keep thinking it is my code, but then again, I have tried dummy code, dummy yamls, and always the same issue.  

Let me know if you need anymore input, and thanks again for your prompt resolution of the first issue! 

Extra background info:
- running on google GKE
- official apache airflow helm chart with some modifications
- regular kubernetespodoperator works 
LamaAni commented 1 year ago

Hum... the all_kinds is a collection that is initialized when the operator is loaded, it defines what kinds are to be expected when deploying to the cluster. I'm gonna add a full example with a docker image to the repo. Can you try and run that?

LamaAni commented 1 year ago

Please see this example under examples/docker for an example docker deployment. Can you run the script run_in_docker?

The scripts also requires you to have a valid kubernetes config under "$HOME/.kube/config"

Elsayed91 commented 1 year ago

will give it a try asap and report back, sorry for delay, ooo rn

arnor2000 commented 1 year ago

Hi, I've the same issue TypeError: argument of type 'NoneType' is not iterable with

apache-airflow==2.4.3
airflow-kubernetes-job-operator==2.0.9
kubernetes==23.6.0

No problem before airflow upgrade with

apache-airflow==2.2.5
airflow-kubernetes-job-operator==2.0.8
kubernetes==11.0.0
LamaAni commented 1 year ago

@arnor2000 Can you also please check and run the example and report back? I'll try your example airflow.

LamaAni commented 1 year ago

Wow, that was a weird one.

You both probably have an error when connecting to the kube service (please see RBAC section in the help). This error was not reported properly and returned without being raised. This would mean that error you both experienced is actually an api connection error. e.g. the the connection to the api could not be established and returned a None value instead of throwing an error.

Resolved in this pr. https://github.com/LamaAni/KubernetesJobOperator/pull/81

Please validate and let me know.

LamaAni commented 1 year ago

Any update on this?

Elsayed91 commented 1 year ago

Apologizing once more for opening issue and leaving the appearance of deserting it. I will be in office tomorrow morning and will give it a try and let you know. Sorry for delay.

Elsayed91 commented 1 year ago

None of the previous issues occur anymore, and the error message has indeed been updated. Now I get

airflow_kubernetes_job_operator.kube_api.exceptions.KubeApiClientException: airflow_kubernetes_job_operator.kube_api.queries.GetAPIVersions, Unauthorized: Unauthorized

I have created the exact RBAC role as in the main page, and bound it to the default service account which was already admin of all, the error above sadly persists. thing is, running a KubernetesPodOperator on the same instance works fine, meaning it is able to connect to k8 apis in general.

LamaAni commented 1 year ago

None of the previous issues occur anymore, and the error message has indeed been updated. Now I get

airflow_kubernetes_job_operator.kube_api.exceptions.KubeApiClientException: airflow_kubernetes_job_operator.kube_api.queries.GetAPIVersions, Unauthorized: Unauthorized

I have created the exact RBAC role as in the main page, and bound it to the default service account which was already admin of all, the error above sadly persists. thing is, running a KubernetesPodOperator on the same instance works fine, meaning it is able to connect to k8 apis in general.

This means that the client dose not have access, or it cannot find the right config. You need to define the right RBAC configuration for the using account running in the pod. You can find the RBAC config in the help here: https://github.com/LamaAni/KubernetesJobOperator#kubernetes-rbac-rules

You can also find how to configure a kubernetes service account here: https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/

Elsayed91 commented 1 year ago

As mentioned, I have already done so, but I will play around a bit and see, will update with findings.

LamaAni commented 1 year ago

Oh, sorry. Missed that. Then its the auto detection of the credentials. I will try and test that locally and report back.

LamaAni commented 1 year ago

Found the issue. The automatic bearer token string was invalid, the 'bearer' was already in there. Maybe since there was a code change in airflow 2.5.0?

Apologies, I have the kube config deployed with my configuration, and therefore my tests did not catch this. The bearer token is now correctly identified and added.

I hope this is the last issue with this.

PR is here: https://github.com/LamaAni/KubernetesJobOperator/pull/83/files Release is here: https://github.com/LamaAni/KubernetesJobOperator/releases/tag/2.0.12

Elsayed91 commented 1 year ago

Good day! this was indeed the issue, upon upgrading to 2.0.12 everything worked flawlessly. I will mark this as solved, thanks a lot for your hard work.

LamaAni commented 1 year ago

Hi, I've the same issue TypeError: argument of type 'NoneType' is not iterable with

apache-airflow==2.4.3
airflow-kubernetes-job-operator==2.0.9
kubernetes==23.6.0

No problem before airflow upgrade with

apache-airflow==2.2.5
airflow-kubernetes-job-operator==2.0.8
kubernetes==11.0.0

@arnor2000 Dose this work for you as well?

arnor2000 commented 1 year ago

Sorry, I didn't have an accessible environment to test easily.

Yes, it's fixed, thank you!