apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.2k stars 14.32k forks source link

Print kubernetes failure Status and Reason on pod failures #37548

Open hterik opened 9 months ago

hterik commented 9 months ago

Description

We occasionally see KubernetesExecutor tasks getting lost in cyberspace with no logs describing why in the airflow UI.

If admins looks into the scheduler logs (Airflow 2.7.1), the following can be seen:

15:55:58 {scheduler_job_runner.py:636} INFO - Sending TaskInstanceKey(dag_id='G', task_id='S', run_id='RRR', try_number=1, map_index=-1) to executor with priority 31 and queue kubernetes
15:55:58 {base_executor.py:144} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'G', 'S', 'RRR', '--local', '--subdir', 'DAGS_FOLDER/dag_G.py.py']
15:55:58 {kubernetes_executor.py:319} INFO - Add task TaskInstanceKey(dag_id='G', task_id='S', run_id='RRR', try_number=1, map_index=-1) with command ['airflow', 'tasks', 'run', 'G', 'S', 'RRR', '--local', '--subdir', 'DAGS_FOLDER/dag_G.py.py']
15:55:58 {kubernetes_executor_utils.py:395} INFO - Creating kubernetes pod for job is TaskInstanceKey(dag_id='G', task_id='S', run_id='RRR', try_number=1, map_index=-1), with pod name PPP, annotations: <omitted>
15:55:58 {scheduler_job_runner.py:686} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='G', task_id='S', run_id='RRR', try_number=1, map_index=-1)
15:55:58 {scheduler_job_runner.py:713} INFO - Setting external_id for <TaskInstance: G.S RRR [queued]> to 800028

15:56:20 <<<OUTSIDE AIRFLOW>>>:  Kubernetes Eviction event: "The node was low on resource: memory. Threshold quantity: .....

15:56:23 {kubernetes_executor.py:363} INFO - Changing state of (TaskInstanceKey(dag_id='G', task_id='S', run_id='RRR', try_number=1, map_index=-1), <TaskInstanceState.FAILED: 'failed'>, 'PPP', 'default', '311004175') to failed
15:56:23 {kubernetes_executor.py:455} INFO - Patched pod TaskInstanceKey(dag_id='G', task_id='S', run_id='RRR', try_number=1, map_index=-1) in namespace default to mark it as done
15:56:23 {scheduler_job_runner.py:686} INFO - Received executor event with state failed for task instance TaskInstanceKey(dag_id='G', task_id='S', run_id='RRR', try_number=1, map_index=-1)
15:56:23 {scheduler_job_runner.py:723} INFO - TaskInstance Finished: dag_id=G, task_id=S, run_id=RRR, map_index=-1, run_start_date=None, run_end_date=None, run_duration=None, state=queued, executor_state=failed, try_number=1, max_tries=0, job_id=None, pool=default_pool, queue=kubernetes, priority_weight=31, operator=BranchPythonOperator, queued_dttm=2024-02-19 14:55:58.808536+00:00, queued_by_job_id=800028, pid=None
15:56:23 {scheduler_job_runner.py:771} ERROR - Executor reports task instance <TaskInstance: G.S RRR [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?
15:56:23 {taskinstance.py:1937} ERROR - Executor reports task instance <TaskInstance: G.S RRR [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?
15:56:25 {kubernetes_executor.py:363} INFO - Changing state of (TaskInstanceKey(dag_id='G', task_id='S', run_id='RRR', try_number=1, map_index=-1), <TaskInstanceState.FAILED: 'failed'>, 'PPP', 'default', '311004178') to failed
15:56:25 {kubernetes_executor.py:455} INFO - Patched pod TaskInstanceKey(dag_id='G', task_id='S', run_id='RRR', try_number=1, map_index=-1) in namespace default to mark it as done
15:56:25 {kubernetes_executor.py:363} INFO - Changing state of (TaskInstanceKey(dag_id='G', task_id='S', run_id='RRR', try_number=1, map_index=-1), <TaskInstanceState.FAILED: 'failed'>, 'PPP', 'default', '311004180') to failed
15:56:25 {kubernetes_executor.py:455} INFO - Patched pod TaskInstanceKey(dag_id='G', task_id='S', run_id='RRR', try_number=1, map_index=-1) in namespace default to mark it as done
15:56:25 {scheduler_job_runner.py:686} INFO - Received executor event with state failed for task instance TaskInstanceKey(dag_id='G', task_id='S', run_id='RRR', try_number=1, map_index=-1)
15:56:25 {scheduler_job_runner.py:723} INFO - TaskInstance Finished: dag_id=G, task_id=S, run_id=RRR, map_index=-1, run_start_date=None, run_end_date=2024-02-19 14:56:23.933950+00:00, run_duration=None, state=failed, executor_state=failed, try_number=1, max_tries=0, job_id=None, pool=default_pool, queue=kubernetes, priority_weight=31, operator=BranchPythonOperator, queued_dttm=2024-02-19 14:55:58.808536+00:00, queued_by_job_id=800028, pid=None

It would be a lot easier to debug such issues if A). The scheduler logs somehow mentioned the Pod failure Reason=Evicted and status=Failed. These can be found on the V1Pod object returned by kubernetes API. B) The Airflow UI somehow surfaced this error, instead of not showing anything at all.

Use case/motivation

No response

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

SamWheating commented 9 months ago

This is a good suggestion - we have the log_events_on_failure arg on the KubernetesPodOperator, which can be really useful in diagnosing pod starting issues like missing images or volumes - having similar behaviour in the KubeExecutor would be rad.

Feel free to assign this to me and I can put a fix together next week 👍

eladkal commented 7 months ago

@SamWheating will you have time to work on it?

SamWheating commented 7 months ago

Ah sorry, I never got assigned and it fell off my radar.

I don't think I'll have time for this in the near future so feel free to assign to someone else.

tsafacjo commented 2 months ago

I will pick it , can you assign it to me ?