apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.81k stars 14.24k forks source link

Use only single execution date in ExternalTaskSensor #12577

Open turbaszek opened 3 years ago

turbaszek commented 3 years ago

Description

In general we encourage operators to be designed to perform an atomic action. However, ExternalTaskSensor seems to be not alway atomic. Users are able to pass execution_date_fn which can return multiple dates. If that happens the operator will check state not of single DAG/task but of multiple instances. Even if that happens users will get 1/0 response - task/dag failed or not.

In my opinion the proper way to handle the multiple dates case would be to have multiple instances of the sensor, each checking for a task/dag for a given date.

Use case / motivation

Improve atomicity of ExternalTaskSensor.

Related Issues

_Originally posted by @turbaszek in https://github.com/apache/airflow/pull/12574#discussion_r528967901_

ManiBharataraju commented 3 years ago

@turbaszek - I was thinking it will be a good option to allow the option to pass user-defined arguments as a dict to execution_date_fn. Right now it allows only 2 arguments, one being the date and the second being the context. This will be helpful if someone needs to pass additional arguments example: If we want to change the hour of the execution date based on the user arguments instead of creating different functions in the case where multiple hours of the same date need to be checked given the fact that the function cannot return a list now. We can pass the hour as an argument and manipulate the date with a common logic in the function. What do you think?

turbaszek commented 3 years ago

@ManiBharataraju I would say that you can do python_operator >> external_sensor and pass to the sensor whatever execution date you want. In my opinion this serves 100% of cases. And with TaskFlow API this is dead simple:

@task 
def my_custom_exec_date() -> datetime:
    ctx = get_current_context()
    # here do what you want
    return exec_date

with DAG(...) as dag:
    exec_date = my_custom_exec_date()
    sensor = ExternalTaskSensor(task_id="sensor", execution_date=exec_date)

If you don't know TaskFlow API: