apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.29k stars 13.78k forks source link

control state of individual taskflow in mapped task-group #40543

Open potiuk opened 2 days ago

potiuk commented 2 days ago

Discussed in https://github.com/apache/airflow/discussions/33556

Originally posted by **ntnhaatj** August 20, 2023 ### Description Hi, as [my issue was raised here](https://github.com/apache/airflow/issues/25032#issuecomment-1684926654) which also included depth-first execution model context, I would like to open an issue to have more discussion on this matter. ### Use case/motivation motivated from the depth-first execution model implemented in mapped TaskGroup: ``` ┌──────────────┐ ┌────────────────┐ ┌───────────┐ ┌──►│extract_file_1│►│transform_file_1│►│load_file_1│ │ └──────────────┘ └────────────────┘ └───────────┘ │ │ ┌──────────────┐ ┌────────────────┐ ┌───────────┐ ┌─────────────┐ ├──►│extract_file_2│►│transform_file_2│►│load_file_2│ │get_file_list├─┤ └──────────────┘ └────────────────┘ └───────────┘ └─────────────┘ │ │ ... ... ... │ │ ┌──────────────┐ ┌────────────────┐ ┌───────────┐ └──►│extract_file_N│►│transform_file_n│►│load_file_N│ └──────────────┘ └────────────────┘ └───────────┘ ``` At present, the upstream / downstream list dependencies now only applied on DAGNode (which is the whole TaskGroup or Operator) There might be better control over desired TaskFlow if we could enhance support for deeper upstream/downstream dependencies at the granularity of mapped task instances, instead of applying them to the entire TaskGroup. In the model above, `extract_file_1` task downstream list should be only `transform_file_1` and `load_file_1` in order, rather than encompassing the whole mapped group `transform_file[]` and `load_file[]` as in the current implementation. For instance, I scheduled my test DAG on `Airflow 2.7.0`: ```python from airflow.decorators import dag, task_group, task from airflow.operators.empty import EmptyOperator from pendulum import datetime files = ["a", "b", "c"] @dag(start_date=datetime(2022, 12, 1), schedule=None, catchup=False) def task_group_mapping_example(): @task_group(group_id="etl") def etl_pipeline(file): e = EmptyOperator(task_id="e") t = EmptyOperator(task_id="t") l = EmptyOperator(task_id="l") e >> t >> l etl = etl_pipeline.expand(file=files) etl task_group_mapping_example() ``` Clear etl.e[1] state with downstream will trigger all mapped task in `etl.t[]` and `etl.l[]` group. Screenshot 2023-08-19 at 18 46 42 Thanks, ### Related issues _No response_ ### Are you willing to submit a PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
boraberke commented 2 days ago

I would like to give this one a try. Could you please assign me?

potiuk commented 2 days ago

Sure