apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.14k stars 14.03k forks source link

KubernetesPodOperator inconsistently returns logs #38003

Open itsnotapt opened 6 months ago

itsnotapt commented 6 months ago

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

No response

Apache Airflow version

2.8.2

Operating System

apache/airflow:2.8.2-python3.10

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

There seems to be a 50/50 chance that the correct logs will be returned by the pod.

I'm expecting the following:

[2024-03-08, 22:18:33 UTC] {pod.py:778} INFO - Container logs: /opt/airflow
[2024-03-08, 22:18:33 UTC] {pod.py:778} INFO - Container logs: Hello world!

Successful log:

2024-03-08, 22:16:28 UTC] {pod.py:1057} INFO - Building pod airflow-pod-uvaridfc with labels: {'dag_id': 'example_python_pod', 'task_id': 'run_pod', 'run_id': 'manual__2024-03-08T221625.9866800000-564be90aa', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2024-03-08, 22:16:29 UTC] {taskinstance.py:2367} INFO - Pausing task as DEFERRED. dag_id=example_python_pod, task_id=run_pod, execution_date=20240308T221625, start_date=20240308T221627
[2024-03-08, 22:16:29 UTC] {local_task_job_runner.py:231} INFO - Task exited with return code 100 (task deferral)
[2024-03-08, 22:18:31 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_python_pod.run_pod manual__2024-03-08T22:16:25.986680+00:00 [queued]>
[2024-03-08, 22:18:31 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_python_pod.run_pod manual__2024-03-08T22:16:25.986680+00:00 [queued]>
[2024-03-08, 22:18:31 UTC] {taskinstance.py:2191} INFO - Resuming after deferral
[2024-03-08, 22:18:31 UTC] {taskinstance.py:2214} INFO - Executing <Task(KubernetesPodOperator): run_pod> on 2024-03-08 22:16:25.986680+00:00
[2024-03-08, 22:18:31 UTC] {standard_task_runner.py:60} INFO - Started process 200 to run task
[2024-03-08, 22:18:31 UTC] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'example_python_pod', 'run_pod', 'manual__2024-03-08T22:16:25.986680+00:00', '--job-id', '468', '--raw', '--subdir', 'DAGS_FOLDER/example/example_python_pod.py', '--cfg-path', '/tmp/tmpov5tx0h_']
[2024-03-08, 22:18:31 UTC] {standard_task_runner.py:88} INFO - Job 468: Subtask run_pod
[2024-03-08, 22:18:31 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_python_pod.run_pod manual__2024-03-08T22:16:25.986680+00:00 [running]> on host ***
[2024-03-08, 22:18:33 UTC] {pod.py:778} INFO - Container logs: /opt/airflow
[2024-03-08, 22:18:33 UTC] {pod.py:778} INFO - Container logs: Hello world!
[2024-03-08, 22:18:33 UTC] {pod.py:778} INFO - Container logs: 
[2024-03-08, 22:18:33 UTC] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-03-08, 22:18:33 UTC] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1
[2024-03-08, 22:18:34 UTC] {pod.py:559} INFO - xcom result file is empty.
[2024-03-08, 22:18:34 UTC] {pod_manager.py:616} INFO - Pod airflow-pod-uvaridfc has phase Running
[2024-03-08, 22:18:36 UTC] {pod_manager.py:616} INFO - Pod airflow-pod-uvaridfc has phase Running
[2024-03-08, 22:18:38 UTC] {pod.py:914} INFO - Skipping deleting pod: airflow-pod-uvaridfc
[2024-03-08, 22:18:38 UTC] {taskinstance.py:1149} INFO - Marking task as SUCCESS. dag_id=example_python_pod, task_id=run_pod, execution_date=20240308T221625, start_date=20240308T221627, end_date=20240308T221838
[2024-03-08, 22:18:38 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0
[2024-03-08, 22:18:38 UTC] {taskinstance.py:3309} INFO - 0 downstream tasks scheduled from follow-on schedule check

Unsuccessful log:

[2024-03-08, 22:20:23 UTC] {pod.py:1057} INFO - Building pod airflow-pod-4aauulaa with labels: {'dag_id': 'example_python_pod', 'task_id': 'run_pod', 'run_id': 'manual__2024-03-08T222021.2242180000-5c0bad58f', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2024-03-08, 22:20:23 UTC] {taskinstance.py:2367} INFO - Pausing task as DEFERRED. dag_id=example_python_pod, task_id=run_pod, execution_date=20240308T222021, start_date=20240308T222022
[2024-03-08, 22:20:24 UTC] {local_task_job_runner.py:231} INFO - Task exited with return code 100 (task deferral)
[2024-03-08, 22:22:26 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_python_pod.run_pod manual__2024-03-08T22:20:21.224218+00:00 [queued]>
[2024-03-08, 22:22:26 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_python_pod.run_pod manual__2024-03-08T22:20:21.224218+00:00 [queued]>
[2024-03-08, 22:22:26 UTC] {taskinstance.py:2191} INFO - Resuming after deferral
[2024-03-08, 22:22:26 UTC] {taskinstance.py:2214} INFO - Executing <Task(KubernetesPodOperator): run_pod> on 2024-03-08 22:20:21.224218+00:00
[2024-03-08, 22:22:26 UTC] {standard_task_runner.py:60} INFO - Started process 218 to run task
[2024-03-08, 22:22:26 UTC] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'example_python_pod', 'run_pod', 'manual__2024-03-08T22:20:21.224218+00:00', '--job-id', '470', '--raw', '--subdir', 'DAGS_FOLDER/example/example_python_pod.py', '--cfg-path', '/tmp/tmprocxttfb']
[2024-03-08, 22:22:26 UTC] {standard_task_runner.py:88} INFO - Job 470: Subtask run_pod
[2024-03-08, 22:22:26 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_python_pod.run_pod manual__2024-03-08T22:20:21.224218+00:00 [running]> on host ***
[2024-03-08, 22:22:27 UTC] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-03-08, 22:22:28 UTC] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1
[2024-03-08, 22:22:28 UTC] {pod.py:559} INFO - xcom result file is empty.
[2024-03-08, 22:22:28 UTC] {pod_manager.py:616} INFO - Pod airflow-pod-4aauulaa has phase Running
[2024-03-08, 22:22:30 UTC] {pod_manager.py:616} INFO - Pod airflow-pod-4aauulaa has phase Running
[2024-03-08, 22:22:32 UTC] {pod.py:914} INFO - Skipping deleting pod: airflow-pod-4aauulaa
[2024-03-08, 22:22:32 UTC] {taskinstance.py:1149} INFO - Marking task as SUCCESS. dag_id=example_python_pod, task_id=run_pod, execution_date=20240308T222021, start_date=20240308T222022, end_date=20240308T222232
[2024-03-08, 22:22:32 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0
[2024-03-08, 22:22:32 UTC] {taskinstance.py:3309} INFO - 0 downstream tasks scheduled from follow-on schedule check

What you think should happen instead

No response

How to reproduce

The example code that is being used:

    KubernetesPodOperator(
        name="airflow-pod",
        task_id="run_pod",
        # forward pod logs back to Airflow for viewing
        get_logs=True,
        # output results from the pod by writing to /airflow/xcom/return.json
        do_xcom_push=True,
        # keep the pod for troubleshooting, a cleanup job will automatically remove it later
        on_finish_action="keep_pod",
        # if the pod is likely to run for an extended period of time use deferrable=True
        deferrable=True,
        # if running within kubernetes cluster vs local
        in_cluster=True,
        # how often to check the pod status
        poll_interval=120,
        # how often to check for logs
        #logging_interval=120,
        # default is 2 minutes, however this might not be enough time to get the image and initialize the containers
        startup_timeout_seconds=300,
        cmds=["/bin/bash", "-c", "--"],
        arguments=[
            # "while true; do sleep 1; done;"
            "source /vault/secrets/env-secrets && "
            "PYTHON_PATH=/git/airflow/dags && "
            "python /git/airflow/dags/python_scripts/hello_world.py"
        ],
    )

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 6 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

pankajastro commented 5 months ago

what is apache-airflow-providers-cncf-kubernetes version in your env?

VincentChantreau commented 5 months ago

When reading the logs in trigger_reentry when the trigger return a "success" status, the operator read the log by calling self.write_logs https://github.com/apache/airflow/blob/191b5c30e68566a75f67aefc860f59573b79bed6/airflow/providers/cncf/kubernetes/operators/pod.py#L745

When reading the logs, the follow parameter is hardcoded to False. https://github.com/apache/airflow/blob/191b5c30e68566a75f67aefc860f59573b79bed6/airflow/providers/cncf/kubernetes/operators/pod.py#L781

Maybe the bug is coming from here ?

itsnotapt commented 5 months ago
apache-airflow-providers-cncf-kubernetes==8.0.1
pankajastro commented 5 months ago

Ok, so I did some investigation. The POST_TERMINATION_TIMEOUT is set to 120 seconds, which means that the pod's logs will be available for retrieval for up to 120 seconds after the pod termination. https://github.com/apache/airflow/blob/8fc984873aab3424df0d44351da136e5c65b81e2/airflow/providers/cncf/kubernetes/operators/pod.py#L235 But your task is hanging in the trigger for more than 120 after pod termination and because of this log is not available. To test this you can try to reduce the poll_interval to maybe 60 seconds (since your script probably will be finshed quickly) and your logs should be available. I'll see if it makes sense to parametize it.

Found one more small bug while debugging and created fixes https://github.com/apache/airflow/pull/38075

itsnotapt commented 5 months ago

That appears to do the trick. Thanks for looking into this so quickly.

pankajastro commented 5 months ago

When reading the logs in trigger_reentry when the trigger return a "success" status, the operator read the log by calling self.write_logs

https://github.com/apache/airflow/blob/191b5c30e68566a75f67aefc860f59573b79bed6/airflow/providers/cncf/kubernetes/operators/pod.py#L745

When reading the logs, the follow parameter is hardcoded to False.

https://github.com/apache/airflow/blob/191b5c30e68566a75f67aefc860f59573b79bed6/airflow/providers/cncf/kubernetes/operators/pod.py#L781

Maybe the bug is coming from here ?

hmm, since the follow false it can either miss some logs or can produce some duplicate logs in termination steps. I have created https://github.com/apache/airflow/pull/38081 to fix it

eladkal commented 5 months ago

@pankajastro will https://github.com/apache/airflow/pull/38081 resolve this issue?

pankajastro commented 5 months ago

@pankajastro will #38081 resolve this issue?

No, it does not fix this issue. I'm not sure even we need to work on this. https://github.com/apache/airflow/pull/38081 Address comment https://github.com/apache/airflow/issues/38003#issuecomment-1987238944

https://github.com/apache/airflow/issues/38003#issuecomment-1991780598 suggestion worked for user https://github.com/apache/airflow/issues/38003#issuecomment-1992122986