apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.31k forks source link

Deferrable KPO - stuck with do_xcom_push=True #32458

Closed raphaelauv closed 1 year ago

raphaelauv commented 1 year ago

Apache Airflow version

2.6.2

What happened

The version 7.2.0 of the cncf-kubernetes is never setting the task in success if you use the KPO with do_xcom_push and deferrable at true

the sidecar airflow-xcom-sidecar is in running state in K8S and the operator log

 {taskinstance.py:1308} INFO - Starting attempt 1 of 1
 {taskinstance.py:1327} INFO - Executing <Mapped(KubernetesPodOperator): task-one> on 2023-07-09 15:01:01.455462+00:00
 {standard_task_runner.py:57} INFO - Started process 157 to run task
 {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'kubernetes_dag', 'task-one', 'manual__2023-07-09T15:01:01.455462+00:00', '--job-id', '8', '--raw', '--subdir', 'DAGS_FOLDER/kubernetes_dag.py', '--cfg-path', '/tmp/tmpve9e4m0j', '--map-index', '0']
 {standard_task_runner.py:85} INFO - Job 8: Subtask task-one
 {task_command.py:410} INFO - Running <TaskInstance: kubernetes_dag.task-one manual__2023-07-09T15:01:01.455462+00:00 map_index=0 [running]> on host efa1d3dea00b
 {logging_mixin.py:149} WARNING - /home/airflow/.local/lib/python3.11/site-packages/airflow/models/mappedoperator.py:615 AirflowProviderDeprecationWarning: `is_delete_operator_pod` parameter is deprecated, please use `on_finish_action`
 {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='kubernetes_dag' AIRFLOW_CTX_TASK_ID='task-one' AIRFLOW_CTX_EXECUTION_DATE='2023-07-09T15:01:01.455462+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-07-09T15:01:01.455462+00:00'
 {pod.py:878} INFO - Building pod airflow-test-pod-q3ez8146 with labels: {'dag_id': 'kubernetes_dag', 'task_id': 'task-one', 'run_id': 'manual__2023-07-09T150101.4554620000-198a0a929', 'kubernetes_pod_operator': 'True', 'map_index': '0', 'try_number': '1'}
 {base.py:73} INFO - Using connection ID 'kubernetes_default' for task execution.
 {taskinstance.py:1415} INFO - Pausing task as DEFERRED. dag_id=kubernetes_dag, task_id=task-one, execution_date=20230709T150101, start_date=20230709T150102
 {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral)
 {pod.py:142} INFO - Checking pod 'airflow-test-pod-q3ez8146' in namespace 'default'.
 {base.py:73} INFO - Using connection ID 'kubernetes_default' for task execution.
 {pod.py:175} INFO - Container is not completed and still working.
 {pod.py:194} INFO - Sleeping for 2 seconds.
 {pod.py:175} INFO - Container is not completed and still working.
 {pod.py:194} INFO - Sleeping for 2 seconds.
 {pod.py:175} INFO - Container is not completed and still working.
 {pod.py:194} INFO - Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.
 {pod.py:158} INFO - Pod airflow-test-pod-q3ez8146 is still running. Sleeping for 2 seconds.

What you think should happen instead

No response

How to reproduce

from pendulum import today
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

dag = DAG(
    dag_id="kubernetes_dag",
    schedule_interval="0 0 * * *",
    start_date=today("UTC").add(days=-1)
)

with dag:
    cmd = "echo toto && sleep 4 && echo finish"

    KubernetesPodOperator.partial(
        task_id="task-one",
        namespace="default",
        kubernetes_conn_id="kubernetes_default",
        name="airflow-test-pod",
        image="alpine:3.16.2",
        cmds=["sh", "-c", cmd],
        is_delete_operator_pod=True,
        deferrable=True,
        poll_interval=2,
        do_xcom_push=True,
        get_logs=True,
    ).expand(env_vars=[{"a": "a"} for _ in range(1)])

Operating System

Ubuntu 22.04

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

hussein-awala commented 1 year ago

Could you provide the pod and containers states when the task is running and logging this line?

raphaelauv commented 1 year ago

the pod is in running state

--

the container base is in completed state -> logs `toto\n finish'

the container airflow-xcom-sidecar is in running state-> no logs