apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.16k stars 14.04k forks source link

Triggered tasks from web-ui are killed by scheduler because they are orphaned task instances #20982

Open nclaeys opened 2 years ago

nclaeys commented 2 years ago

Apache Airflow version

2.2.2

What happened

I have a dag which is scheduled every day and has depends_on_past set to true. When a task fails for a given date, which is expected as the required data is not there. If I want to manually run the next task (for the following day) the run always fails.

The reason for this is that airflow creates next task instances for the following days and sets their state to None as they cannot be scheduled because the previous task instance is in failed state, this is correct and expected behavior. If I now manually trigger the task_run from the web-ui, the task_instance data in the database is not updated and as a consequence the queued_by_job_id is not filled in.

Every 5 minutes the airflow scheduler queries for orphaned tasks, and since my manual run does not have queued_by_job_id filled in, it always gets killed as the scheduler thinks it is orphaned. The scheduler shows the following logs: airflow-scheduler [2022-01-20 11:15:41,784] {scheduler_job.py:1178} INFO - Reset the following 1 orphaned TaskInstances: airflow-scheduler <TaskInstance: testmanual.sample scheduled__2022-01-16T00:00:00+00:00 [running]>

What you expected to happen

I expect that the manual run will not be killed by the scheduler as it thinks it is orphaned and thus that my tasks can still succeed.

If this is expected behavior, It is best to show an error to the user when it tries to run the request stating: Running this task manually is not supported because... Then at least it is clear to the user, now the actual reason is really hidden and not obvious for most users I assume.

How to reproduce

Operating System

debian buster

Versions of Apache Airflow Providers

/

Deployment

Other Docker-based deployment

Deployment details

We run airflow on kubernetes and thus use the kubernetes_executor to schedule tasks.

Anything else

No response

Are you willing to submit PR?

Code of Conduct

potiuk commented 2 years ago

It does seam like pretty unintended behaviour.

potiuk commented 2 years ago

Do you want to attempt to fix it @nclaeys ?

potiuk commented 2 years ago

@ashb @uranusjr @ephraimbuddy @jedcunningham - I marked it for 2.2.4 - I've seen this one reported by others (I think so: https://github.com/apache/airflow/discussions/21047 ). I guess it might be a bad query somewhere in orphan detection which does not exclude the tasks that are run through other means from adoption by scheduler.

I might be wrong about it and I might take a look shortly, but maybe you know something that I would have to find out through digging in the code.

And I think this one is pretty disruptive as well.

ashb commented 2 years ago

I do remember we already made some changes in the orphan code, so there is a chance this has already been fixed.

nclaeys commented 2 years ago

@potiuk I will take a look at it. I am not that familiar with the inner workings of airflow, but I think that the query for orphaned tasks is wrong. At the moment it does:

session.query(TI)
                        .filter(TI.state.in_(resettable_states))
                        .outerjoin(TI.queued_by_job)
                        .filter(or_(TI.queued_by_job_id.is_(None), SchedulerJob.state != State.RUNNING))
                        .join(TI.dag_run)
                        .filter(
                            DagRun.run_type != DagRunType.BACKFILL_JOB,
                            DagRun.state == State.RUNNING,
                        )

and I think that in my case the DagRun.run_type == DagRunType.Manual. I will validate this and supply a fix for it.

potiuk commented 2 years ago

@potiuk I will take a look at it. I am not that familiar with the inner workings of airflow, but I think that the query for orphaned tasks is wrong. At the moment it does: session.query(TI) .filter(TI.state.in_(resettable_states)) .outerjoin(TI.queued_by_job) .filter(or_(TI.queued_by_job_id.is_(None), SchedulerJob.state != State.RUNNING)) .join(TI.dag_run) .filter( DagRun.run_type != DagRunType.BACKFILL_JOB, DagRun.state == State.RUNNING, ) and I think that in my case the DagRun.run_type == DagRunType.Manual. I will validate this and supply a fix for it.

I think it ain't so easy. Unlike backfills (controlled by the client airflow backfill command) , the manual runs are also "controlled" by scheduler - same as regular runs. That's why backfills are excluded in this query.

I have a feeling this is more a problem with kubernetes_executor's adoption code which uses labels: https://github.com/apache/airflow/blob/1169e3a81f5481e92d29e174163eb5e6d7f3fe93/airflow/executors/kubernetes_executor.py#L678

But this is more of an intuition than looking at the code yet.

nclaeys commented 2 years ago

Well I do not think the problem is in the adopt_task_instances but I am not sure what the expected behavior should be. It only tries adopting task instances that have the queued_by_job_id field filled in, in which case it changes the labels identifying the previous scheduler pod name with the new one in order to watch them (makes sense in my opinion). In the manually triggered run, we do not have a queued_by_job_id so it is not adopted.

If I am not mistaken the run button triggers a local execution of the task within the web pod and creates a local kubernetes executor to launch the task, which is different than a task that is triggered from the scheduler pod itself.

I am not sure what the expected behavior should be:

tanelk commented 2 years ago

I have never worked with kubernetes, but based on the triggering code: https://github.com/apache/airflow/blob/1f29d844411522e9a2cd4a14c3573e903b4e5074/airflow/www/views.py#L1786-L1794

I believe this diff could get you in the right direction.

index bdac0889f..fe25d25e8 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -685,9 +685,14 @@ class KubernetesExecutor(BaseExecutor):
         self.event_buffer[key] = state, None

     def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis if ti.queued_by_job_id}
+
+        # Tasks triggered through API will have no ti.queued_by_job_id
+        # and their pod will have label 'airflow-worker=manual'
+        if any(ti for ti in tis if not ti.queued_by_job_id):
+            scheduler_job_ids.add('manual')
+
+        pod_ids = {ti.key: ti for ti in tis}
         kube_client: client.CoreV1Api = self.kube_client
         for scheduler_job_id in scheduler_job_ids:
             scheduler_job_id = pod_generator.make_safe_label_value(str(scheduler_job_id))
@@ -696,8 +701,8 @@ class KubernetesExecutor(BaseExecutor):
             for pod in pod_list.items:
                 self.adopt_launched_task(kube_client, pod, pod_ids)
         self._adopt_completed_pods(kube_client)
-        tis_to_flush.extend(pod_ids.values())
-        return tis_to_flush
+
+        return list(pod_ids.values())

     def adopt_launched_task(
         self, kube_client: client.CoreV1Api, pod: k8s.V1Pod, pod_ids: Dict[TaskInstanceKey, k8s.V1Pod]

It would be nice if someone could try this out.

nclaeys commented 2 years ago

Thanks for the new input, I will take a look at it

hterik commented 1 year ago

@potiuk I will take a look at it. I am not that familiar with the inner workings of airflow, but I think that the query for orphaned tasks is wrong. At the moment it does:

session.query(TI)
                        .filter(TI.state.in_(resettable_states))
                        .outerjoin(TI.queued_by_job)
                        .filter(or_(TI.queued_by_job_id.is_(None), SchedulerJob.state != State.RUNNING))
                        .join(TI.dag_run)
                        .filter(
                            DagRun.run_type != DagRunType.BACKFILL_JOB,
                            DagRun.state == State.RUNNING,
                        )

and I think that in my case the DagRun.run_type == DagRunType.Manual. I will validate this and supply a fix for it.

~~I've been debugging this and #25021 for a while now and my prime suspect is also this query. But rather the SchedulerJob.state != State.RUNNING part. I think that condition should only be used on the periodic call of adopt_or_reset_orphaned_tasks(). But first time on scheduler startup, it should pick up everything, including things that were previously in running state. It might not be entirely same cause as for this issue but could be good to be aware of.~~ Edit: wrong conclusion