apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.31k forks source link

timeout on Dynamic Task mapping : skipped inner task , task status is still success #37332

Open raphaelauv opened 9 months ago

raphaelauv commented 9 months ago

Apache Airflow version

2.8.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Screenshot from 2024-02-10 22-05-57

this is confusing and not obvious to find the problem when just looking at the grid view on a complex dag

What you think should happen instead?

maybe we should mark the task as skipped or failed

How to reproduce

from datetime import timedelta

from airflow.operators.python import PythonOperator
from pendulum import today
from airflow import DAG

dag = DAG(
    dag_id="test_dag",
    schedule_interval=None,
    dagrun_timeout=timedelta(seconds=5),
    start_date=today("UTC").add(days=-1)
)

with dag:
    def something(arg):
        import time
        time.sleep(int(arg))

    PythonOperator.partial(
        task_id="toto",
        python_callable=something,
    ).expand(op_args=[("10",), ("5",), ("4",)])

Operating System

ubuntu 22.04

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

ephraimbuddy commented 7 months ago

The problem here is that you have a 5-second timeout on the DAG. Not sure why it didn't log but I'll investigate.

By definition, skipped is a success state see https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dag-run.html#dag-run-status and dag should be marked success but because it timeout, it marked it as failed.

ephraimbuddy commented 7 months ago

It's actually there that it has timed out. Please confirm

raphaelauv commented 7 months ago

I don''t understand , what is your question ?

--

also the problem is not present without dynamic task mapping image


from datetime import timedelta
from airflow.operators.python import PythonOperator
from pendulum import today
from airflow import DAG

dag = DAG(
    dag_id="test_dag",
    schedule_interval=None,
    dagrun_timeout=timedelta(seconds=5),
    start_date=today("UTC").add(days=-1)
)

with dag:
    def something(arg):
        import time
        time.sleep(int(arg))

    PythonOperator(
        task_id="toto",
        python_callable=something,
        op_args="10")
    PythonOperator(
        task_id="tata",
        python_callable=something,
        op_args="10")
``
github-actions[bot] commented 7 months ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

raphaelauv commented 7 months ago

@ephraimbuddy could you please remove the tag wait for response , thanks

raphaelauv commented 4 months ago

hey @ephraimbuddy , could you please update the tags of the issue , thanks

ephraimbuddy commented 4 months ago

@raphaelauv https://github.com/apache/airflow/blob/9901a065fcd93307d8e1d69e34621966d7313511/airflow/jobs/scheduler_job_runner.py#L1444-L1455. The dagrun timeout you have on your dag causes the dag to fail. You can read it on the link above

raphaelauv commented 4 months ago

yes the dag_run is in fail it's good , the issue is not there

the issue is that the task status is not in fail ( which is the case when not using DTM ), so if I have 10 DTM tasks I can't know where a task was skipped

ephraimbuddy commented 4 months ago

the issue is that the task status is not in fail ( which is the case when not using DTM ), so if I have 10 DTM tasks I can't know where a task was skipped

From the above, all the running tasks would be skipped(SKIPPED state). Do you mean that in dynamic mapped task, the states for the unfinished tasks are not in SKIPPED?

raphaelauv commented 4 months ago

image

If I have 10 DTM tasks like toto ( tata , tutu ... ) I can't know where a task was skipped cause the DTM status is success

from datetime import timedelta

from airflow.operators.python import PythonOperator
from pendulum import today
from airflow import DAG

dag = DAG(
    dag_id="test_dag",
    schedule_interval=None,
    dagrun_timeout=timedelta(seconds=5),
    start_date=today("UTC").add(days=-1)
)

with dag:
    def something(arg_1, arg_2):
        import time
        if arg_2 == "titi":
            time.sleep(int(arg_1))

    for i in ["tata", "tutu", "toto", "titi"]:
        PythonOperator.partial(
            task_id=i,
            python_callable=something,
        ).expand(op_args=[("10", i), ("5", i), ("4", i)])
ephraimbuddy commented 4 months ago

What if you click to see the logging? Do you see failed tasks/skipped that way?

raphaelauv commented 4 months ago

when a task is skipped, airflow mark the task it FAILED when a DTM task have a subtask skipped, airflow mark the task SUCCESS


What if you click to see the logging

I can't know from the UI which DTM task have a subtask that was skipped

ephraimbuddy commented 4 months ago

when a task is skipped, airflow mark the task it FAILED

This is strange and I doubt a skipped task is marked Failed. Please without a clear illustration, this issue is confusing. Can you make a short screen capture video on this

raphaelauv commented 4 months ago

Already did -> https://github.com/apache/airflow/issues/37332#issuecomment-2018836100

github-actions[bot] commented 2 months ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

raphaelauv commented 2 months ago

no stale

github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

raphaelauv commented 1 month ago

no stale

github-actions[bot] commented 1 week ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

raphaelauv commented 1 week ago

no stale