apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.08k stars 14.29k forks source link

Task end_date prevents downstream tasks from running #43484

Open Gollum999 opened 2 weeks ago

Gollum999 commented 2 weeks ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.1

What happened?

I have a task with multiple upstream dependencies, one of which is a deprecated dataset. To retire this dataset, I set its task's end_date to the last day that the data was available, and I updated the code for the downstream task to stop looking for that data. Now, the downstream task is stuck in "No status" and must be manually started from the command line.

The only trigger_rules that work around this issue are ones that have the wrong semantics for my task (e.g. always). I also want to make sure we can still backfill the deprecated dataset if needed, so I want to keep the DAG structure and dependencies backward compatible.

What you think should happen instead?

In descending order of personal preference:

  1. The downstream task should only be prevented from running if all of its upstream dependencies are outside of their start_date/end_date window, not just one.
  2. A task outside of its date window should behave like a "skipped" task, propagating down to children but allowing downstream behavior to be controlled via trigger_rules.
  3. If the scheduling behavior cannot be changed, perhaps a workaround would be to expose the logical_date at the DAG level via AirflowParsingContext, so that dependencies could be conditionally changed:
    if deprecated_task.start_date <= get_parsing_context().logical_date <= deprecated_task.end_date:
    deprecated_task >> downstream_task

How to reproduce

#!/usr/bin/env python3
import datetime
import logging

from airflow.decorators import dag
from airflow.models.baseoperator import cross_downstream
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup

logger = logging.getLogger(__name__)

@dag(
    schedule='@daily',
    start_date=datetime.datetime(2024, 10, 21),
)
def test_task_end_date():
    with TaskGroup(group_id='single_dep'):
        (
            EmptyOperator(task_id='upstream')
            >> EmptyOperator(task_id='expired', end_date=datetime.datetime(2024, 10, 24))
            >> EmptyOperator(task_id='downstream')
        )

    with TaskGroup(group_id='multiple_deps'):
        [
            EmptyOperator(task_id='upstream_1'),
            EmptyOperator(task_id='upstream_2'),
            EmptyOperator(task_id='expired', end_date=datetime.datetime(2024, 10, 24)),
        ] >> EmptyOperator(task_id='downstream')

    with TaskGroup(group_id='trigger_rules'):
        cross_downstream([
            EmptyOperator(task_id='upstream_1'),
            EmptyOperator(task_id='upstream_2'),
            EmptyOperator(task_id='expired', end_date=datetime.datetime(2024, 10, 24)),
        ], [
            EmptyOperator(task_id='all_success'),
            EmptyOperator(task_id='all_done', trigger_rule='all_done'),
            EmptyOperator(task_id='one_success', trigger_rule='one_success'),
            EmptyOperator(task_id='none_failed', trigger_rule='none_failed'),
            EmptyOperator(task_id='none_failed_min_one_success', trigger_rule='none_failed_min_one_success'),
            EmptyOperator(task_id='none_skipped', trigger_rule='none_skipped'),
            EmptyOperator(task_id='always', trigger_rule='always'),
        ])

dag = test_task_end_date()

if __name__ == '__main__':
    dag.cli()

Screenshot from 2024-10-29 09-59-27

Operating System

CentOS Stream 8

Versions of Apache Airflow Providers

N/A

Deployment

Other

Deployment details

Self-hosted/standalone

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

RNHTTR commented 1 week ago

Since this would be adding functionality, I think this is a feature request and not a bug. I think skipping the task instance whose end_date is passed makes sense.

To work around this, would it make sense to remove the deprecated task from the Dag? Also, the logical_date is already available via task instance context.

Gollum999 commented 1 week ago

In my case, I do not want to remove the deprecated task for at least a few months because we may still need to backfill the data that it produces.

Using logical_date inside of the task instance is a good idea, I had not thought of that. I'd imagine I can conditionally raise AirflowSkipException to get the behavior I am looking for. Thanks!