apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.39k stars 14.11k forks source link

API Bug: patching a task instance state to skipped crashes. #37034

Open avandierast opened 7 months ago

avandierast commented 7 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.2

What happened?

Hi :)

When calling TaskInstanceApi.patch_task_instance with new_status=skipped, the API crashes (500) with the following error message: ValueError: skipped is not a valid DagRunState.

What you think should happen instead?

Since we are patching a task instance, we shouldn't be refused because the chosen status is not possible for a dag run.

How to reproduce

Create a simple dag like:

with DAG(
    dag_id="sleeping_dag",
    catchup=False,
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    schedule=None,
) as sleeping_dag:
    sleeping_task = BashOperator(
        task_id="sleeping_task",
        bash_command=("sleep 60"),
    )

And then a curl to patch to skipped (I've used the swagger UI):

curl -X 'PATCH' \
  'http://localhost:8080/api/v1/dags/sleeping_dag/dagRuns/manual__2024-01-26T16%3A16%3A06.275444%2B00%3A00/taskInstances/sleeping_task' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "dry_run": false,
  "new_state": "skipped"
}'

Operating System

Red Hat Enterprise Linux 8.6

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else?

This problem happens every time.

It seems that the problem comes from the following line of code: https://github.com/apache/airflow/blob/bc0e0d95d471868cd4878064fb547aa431506eb8/airflow/api/common/mark_tasks.py#L147 A DagRunState is built with a state that is supposed to be a task instance state. But the task instance state is equal to SKIPPED and thus it throws an exception.

The skipped status was not possible originaly but was added by this PR: https://github.com/apache/airflow/pull/31421

I hope we can make this endpoint work with the status skipped. I was glad to see this possibility in the openapi documentation :)

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 7 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

killua1zoldyck commented 7 months ago

Hi, Can I work on this?

eladkal commented 7 months ago

Hi, Can I work on this?

Assigned

killua1zoldyck commented 7 months ago

Hey! As it has been stated, the issue we're facing is with the DagRunState when dealing with subDagRuns. One idea is to set the state as "success" for subDags. It's counter-intuitive.

Another thought is to check if the DAG has any subDags before diving into the DagRunState function. This way, we'd only invoke it for DAGs with subDags. And, the 500s would come up only in the case when the API is called with a state that is not part of DagRunState and the DAG has a subDag in which case we raise an exception (say something like "Cannot skip a task in a DAG with a subDAG or something). Still, it feels like a bit counter-intuitive but the one I prefer so far.

I'm open to brainstorming better solutions or hearing your thoughts on handling subDags and their states more smoothly! Any ideas?

ehellmann-nydig commented 2 months ago

Any updates on this? I'm attempting to mark a taskgroup as skipped if all tasks are skipped. The group is effectively skipped since the first operator is short_circuit operator, but unfortunately that operator get's marked as success so the whole group is marked as success. My idea is to attempt to mark the chort_circuit instance as skipped if the result of it is skip, so that the group can be marked as skipped. This is a blocker for me.