LamaAni / KubernetesJobOperator

An airflow operator that executes a task in a kubernetes cluster, given a kubernetes yaml configuration or an image refrence.
57 stars 8 forks source link

Running task logs is incomplete #91

Closed zhangqinyang closed 11 months ago

zhangqinyang commented 11 months ago

Describe the bug Airflow: 2.5.0 kubernetesjoboperator: 2.0.12 description: The task running log output is incomplete. When the task runs for more than 4 hours, the Airflow task will stop updating and display error information "Task is not able to be run", but the container in the Kubernetes still outputs logs normally.

image

LamaAni commented 11 months ago

Hi @zhangqinyang

I could not reproduce the error. I may be able to check this more later on but it may take a couple of weeks.

Looking at the log, I think there are two possible scenarios,

  1. The kube watch was disconnected from the cluster - thus stopping the log. I am not seeing any warnings about this so that is less plausible.
  2. The underlying airflow task (in taskinstance.py) timed-out or has not received a task update. But you said the pod is still running. This is odd since I would expect airflow to send a "Stop" command to the task, therefore triggering the Task cleanup - killing the all the resources.

This leads me to think that you may have some time limit in the underlining airflow, or maybe there is a missing command in the operator to update the task in the case of long runs... not sure.

Also, I find the 4 hours mark very strange - if there was an error/disconnect and not a timeout I would expect random execution times, rather then a stable time span. Is there a max connection time there? Was there other runs that lasted less/more time?

To check that the timeout is not coming from airflow:

  1. Create an airflow Python Task (PythonOperator) or Bash Task (BashOperator) which sleeps for 8 hours.
  2. The task should log every 10 seconds a log line. (So log + sleep 10).
  3. Get the log print and see if the task is complete.

To check that the timeout is a connection/kubernetes time:

  1. Run a task - single task in dag - for more then 4 hours using the KubernetesJobOperator
  2. Run a task - single task in dag - for more then 4 hours using the KubernetesPodOperator (different connection scheme)
  3. Make sure the task is like the above, sleep and log print, in bash if possible.

Also can you specify what kind of cloud structure and which cloud system you are running on. I know that there is some issue with the AWS Kubernetes cloud specified in this thread: https://github.com/LamaAni/KubernetesJobOperator/issues/54

zhangqinyang commented 11 months ago

According to your suggestion, I ran three tasks, bashoperator, kubernetespodoperator, and bashoperator. Based on the log results, only the kubernetesjoboperator task experienced a four-hour log interruption. Here are my dag.py and the corresponding pod and job YAML files dag file of bashoperator

from airflow import DAG
from datetime import `datetime`
from airflow.operators.bash import BashOperator

dag = DAG(
    "c36e2d9000c54489a6fdd6b5f4e8bash",
    start_date=datetime(2022, 1, 1),
    schedule_interval=None,
    is_paused_upon_creation=False,
    max_active_runs=1,
    default_args={
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 0
    },
    tags=["test_dag"]
)

Bh6miAZv = BashOperator(
    task_id="Bh6miAZv",
    dag=dag,
    bash_command="while true;do echo 1 && sleep 10;done",
)

dag file of kubernetestpodoperator

from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

config_file = "/opt/airflow/dags/files/kubernetes-admin.kubeconfig"

dag = DAG(
    "c36e2d9000c54489a6fdd6b5f4e86pod",
    start_date=datetime(2022, 1, 1),
    schedule_interval=None,
    is_paused_upon_creation=False,
    max_active_runs=1,
    default_args={
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 0
    },
    tags=["test_dag"]
)

Bh8miAZv = KubernetesPodOperator(
    task_id="Bh8miAZv",
    namespace="queue-eva-cuda11-2gpu",
    in_cluster=False,
    config_file=config_file,
    dag=dag,
    pod_template_file="/opt/airflow/dags/yaml/pod.yaml",
)

pod.yaml

apiVersion: v1
kind: Pod
metadata:
  name: test-pod
  namespace: queue-eva-cuda11-2gpu
spec:
  containers:
  - name: cont-test-pod
    image: time_sleep_test:v1.1
    imagePullPolicy: IfNotPresent
    command: ["sh", "-c", "python3 run.py"]
    resources:
      limits:
        cpu: 1
        memory: 2G
        nvidia.com/gpu: 0
      requests:
        cpu: 1
        memory: 2G
        nvidia.com/gpu: 0
  imagePullSecrets:
  - name: registrykey
  nodeSelector:
    ns: queue-eva-cuda11-2gpu

dag file of kubernetesjoboperator

from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator
from datetime import datetime

config_file = "/opt/airflow/dags/files/kubernetes-admin.kubeconfig"

dag = DAG(
    "c36e2d9000c54489a6fdd6b5f4e86bfd",
    start_date=datetime(2022, 1, 1),
    schedule_interval=None,
    is_paused_upon_creation=False,
    max_active_runs=1,
    default_args={
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 0
    },
    tags=["test_dag"]
)

bh7miazv = KubernetesJobOperator(
    task_id="Bh7miAZv",
    namespace="queue-eva-cuda11-2gpu",
    in_cluster=False,
    config_file=config_file,
    body_filepath="/opt/airflow/dags/yaml/kube.yaml",
    dag=dag,
    image="dynamic_input_new:0.0.1",
    command=["bash", "-c", "while true;do echo 1 && sleep 10;done"],
)

job.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: test-job
  namespace: queue-eva-cuda11-2gpu
spec:
  suspend: true
  template:
    spec:
      containers:
        - name: cont-test-job
          image: dynamic_input_new:0.0.1
          command: ["bash", "-c", "while 1 do echo 1; sleep 1000; done"]
          resources:
            requests:
              cpu: 1
              nvidia.com/gpu: 0
              memory: 2Gi
              ephemeral-storage: 10Gi
            limits:
              cpu: 1
              nvidia.com/gpu: 0
              memory: 2Gi
              ephemeral-storage: 10Gi
          volumeMounts:
            - name: localtime
              mountPath: "/usr/share/zoneinfo/UTC"
              readOnly: True
            - name: timezone
              mountPath: "/etc/timezone"
              readOnly: True
      priorityClassName: high-priority
      restartPolicy: Never
      imagePullSecrets:
        - name: registrykey
      nodeSelector:
        ns: queue-eva-cuda11-2gpu
      volumes:
        - name: localtime
          hostPath:
            path: /usr/share/zoneinfo/Asia/Shanghai
        - name: timezone
          hostPath:
            path: /etc/timezone

I don't know if there are any other suggestions, look forward to your reply

LamaAni commented 11 months ago

Hi. Let me set up the system and give that a go. Seems to be coming from the operator. May take a few weeks since I'm dealing with some things at work.

Can you share all of the cloud/os/airflow details?

zhangqinyang commented 11 months ago

Hi. Let me set up the system and give that a go. Seems to be coming from the operator. May take a few weeks since I'm dealing with some things at work.

Can you share all of the cloud/os/airflow details?

Cloud:BCC(Baidu Cloud Compute) OS:CentOS7.5

# uname -a
Linux 0i 3.10.0-1160.80.1.el7.x86_64 #1 SMP Tue Nov 8 15:48:59 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux

Airflow:2.5.0 KubernetesJobOperator:2.0.12

LamaAni commented 11 months ago

I was able to reproduce the issue, not sure of cause yet.

Using: k0s (local server) Running:

from datetime import timedelta
from utils import default_args, name_from_file
from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import (
    KubernetesJobOperator,
)

dag = DAG(
    name_from_file(__file__),
    default_args=default_args,
    description="Test base job operator",
    schedule_interval=None,
    catchup=False,
)

envs = {
    "PASS_ARG": "a test",
}

total_time_seconds = round(timedelta(hours=4.5).total_seconds())

KubernetesJobOperator(
    task_id="test-long-job-success",
    body_filepath="./templates/test_long_job.yaml",
    envs={
        "PASS_ARG": "a long test",
        "TIC_COUNT": str(total_time_seconds),
    },
    dag=dag,
)

if __name__ == "__main__":
   dag.test()
apiVersion: batch/v1
kind: Job
metadata: {}
spec:
  template:
    spec:
      restartPolicy: Never
      containers:
        - name: job-executor
          image: ubuntu
          command:
            - bash
            - -c
            - |
              #/usr/bin/env bash
              : "${SLEEP_INTERVAL:=10}"
              echo "Starting $PASS_ARG (Sleep interval $SLEEP_INTERVAL)"
              local elapsed_time=0
              while true; do
                  sleep $SLEEP_INTERVAL
                  elapsed_time=$((elapsed_time + $SLEEP_INTERVAL))
                  echo "Elapsed $elapsed_time [seconds]"
                  if [ "$elapsed_time" -ge "$TIC_COUNT" ]; then
                      break
                  fi

              done
              echo "Complete"
          env:
            - name: TIC_COUNT
              value: '10800' # 3 hrs

            - name: SLEEP_INTERVAL
              value: '60'
  backoffLimit: 0
zhangqinyang commented 11 months ago

I have made no progress on this issue yet. Thank you for the information, and I hope to find a solution soon. Thanks again.

LamaAni commented 11 months ago

Found and fixed? the issue @ https://github.com/LamaAni/KubernetesJobOperator/pull/93 and merge to master.

The issue was that once 4 hours have passed, Kubernetes closes the connection on its side, but returns no errors. Since its follow, the query should restart.

  1. Added query restart in this case.
  2. Added support for proper sinceTime requirement on the query.

This would allow the logs query to essentially last forever. This dose add some call overhead whilst Kubernetes is deleting the resources, but that should be not a lot (maybe 10 more calls at the end of the sequence).

Please, if you can, do test by installing master branch as the package and let me know.

LamaAni commented 11 months ago

My tests were successful, 4.5 hrs run without issue. Once you confirm on your end I'll make another release.

zhangqinyang commented 11 months ago

Thank you very much for your support. I will quickly use the master branch to verify the issue, and I will respond to you with the exact results as soon as possible.

LamaAni commented 11 months ago

Hi, any update? I want to create a new release

zhangqinyang commented 11 months ago

I apologize for the delayed response as I was validating multiple long tasks. I have now verified in the environment, and the issue has been resolved. Thank you very much for your support.

LamaAni commented 11 months ago

New release fix: https://github.com/LamaAni/KubernetesJobOperator/releases/tag/2.0.14