I am experiencing issues with task group dependencies and dynamic task outputs when using the .expand method in Airflow. Below is the DAG code snippet that illustrates the problem:
from airflow.decorators import dag, task_group, task
from pendulum import datetime
from airflow.operators.empty import EmptyOperator
from constants.common_constants import TASK_BEGIN
@task_group(group_id="Fetch_and_Process_Data", tooltip="This task group is very important!")
def demo_tash_group(my_num):
@task
def fetch_data(num):
print(num)
@task
def process_data(num):
print(num)
@task
def copy_s3_to_ticket_staging(num):
print(num)
@task
def copy_s3_to_transaction_staging(num):
print(num)
@task
def copy_s3_to_payment_staging(num):
print(num)
@task
def copy_s3_into_main_tables(num):
print(num)
# Setting dependencies
fetch_data(my_num) >> process_data(my_num) >> [copy_s3_to_ticket_staging(my_num), copy_s3_to_transaction_staging(my_num), copy_s3_to_payment_staging(my_num)] >> copy_s3_into_main_tables(my_num)
@dag(
start_date=datetime(2022, 12, 1),
schedule=None,
catchup=False,
tags=["task"]
)
def task_group_mapping():
begin = EmptyOperator(task_id=TASK_BEGIN)
@task()
def get_config_data():
# It will be a list of dictionaries, fetched at the time of DAG execution from my db
return [19, 23, 42, 8, 7, 108]
task_group = demo_tash_group.expand(my_num=get_config_data())
end = EmptyOperator(task_id="end")
# Setting dependencies
begin >> task_group >> end
task_group_mapping()
Problem Scenarios:
Scenario 1:
Issue: When I define get_config_data as a normal function (without the @task decorator), my DAG is created correctly with the task chain as defined. However, this approach isn't viable since it will execute every time the scheduler parses the file, which degrades performance, especially because my case involves fetching data from a database.
this is how the dag looks like
Scenario 2:
Issue: To address the performance issue, I added the @task decorator to get_config_data. This solved the first problem by ensuring the task only executes when the DAG runs. However, it introduced a new issue: the tasks using the output of get_config_data are getting attached to it in the DAG chain, whereas I want them to be attached only to the first task of the task group.
Scenario 3:
Attempted Solution: I tried using the output of get_config_data in the first task of demo_tash_group and then pushing my_num into XCom, expecting the remaining tasks to fetch from XCom. However, this didn’t work because when fetching the XCom value, it retrieves a list of all XComs for every dynamic task. This makes it impossible to fetch the XCom value for a specific task group.
What you think should happen instead?
Expected Behavior:
I expect the tasks within the task group to maintain the intended dependencies and not be directly attached to get_config_data, while still allowing the use of dynamic task outputs.
How to reproduce
To reproduce the issue, follow these steps:
Setup Airflow Environment:
Ensure you have a working Airflow environment. The issue has been observed in [your Airflow version]. Make sure your setup matches or is compatible with the version where the bug was encountered.
Create a New DAG:
Create a new DAG file in your Airflow DAGs folder using the provided code snippet. Ensure you have the necessary imports and helper files (constants.common_constants, helper.base_helper) or adjust the code to remove those dependencies if they're specific to your environment.
Scenario 1 - Define get_config_data as a Normal Function:
In the DAG file, define get_config_data as a normal function without the @task decorator.
Load the DAG in the Airflow UI. You should see that the task chain is created as expected, with the task group dependencies preserved.
Note: While this approach works, it's not ideal because the function executes every time the scheduler parses the file, leading to potential performance degradation.
Scenario 2 - Use the @task Decorator for get_config_data:
Now, add the @task decorator to get_config_data in the DAG file to ensure it only runs when the DAG is triggered.
Load the DAG in the Airflow UI again.
Observe that the tasks using the output of get_config_data are now directly attached to it in the DAG chain, breaking the intended task group dependencies.
Scenario 3 - Attempt to Use XCom:
To try and solve the issue from Scenario 2, modify the DAG so that the output of get_config_data is used in the first task of demo_tash_group.
Push the value of my_num into XCom within the first task and attempt to retrieve it in the subsequent tasks.
Observe that when fetching the XCom value, you receive a list of all XComs for every dynamic task, rather than the specific XCom value for the intended task group.
Document the Observations:
Take snapshots of the DAG structures in the Airflow UI for both Scenario 1 and Scenario 2 to illustrate the difference in task dependencies and the issues encountered.
By following these steps, you should be able to reproduce the bug and observe the issues with task group dependencies and dynamic task outputs when using the .expand method in Airflow.
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.6.3
What happened?
Environment: Ubuntu 20.1 , Python 3.10.14
Description:
I am experiencing issues with task group dependencies and dynamic task outputs when using the .expand method in Airflow. Below is the DAG code snippet that illustrates the problem:
Problem Scenarios:
Scenario 1:
Issue: When I define get_config_data as a normal function (without the @task decorator), my DAG is created correctly with the task chain as defined. However, this approach isn't viable since it will execute every time the scheduler parses the file, which degrades performance, especially because my case involves fetching data from a database. this is how the dag looks like
Scenario 2:
Issue: To address the performance issue, I added the @task decorator to get_config_data. This solved the first problem by ensuring the task only executes when the DAG runs. However, it introduced a new issue: the tasks using the output of get_config_data are getting attached to it in the DAG chain, whereas I want them to be attached only to the first task of the task group.
Scenario 3: Attempted Solution: I tried using the output of get_config_data in the first task of demo_tash_group and then pushing my_num into XCom, expecting the remaining tasks to fetch from XCom. However, this didn’t work because when fetching the XCom value, it retrieves a list of all XComs for every dynamic task. This makes it impossible to fetch the XCom value for a specific task group.
What you think should happen instead?
Expected Behavior: I expect the tasks within the task group to maintain the intended dependencies and not be directly attached to get_config_data, while still allowing the use of dynamic task outputs.
How to reproduce
To reproduce the issue, follow these steps:
Setup Airflow Environment:
Ensure you have a working Airflow environment. The issue has been observed in [your Airflow version]. Make sure your setup matches or is compatible with the version where the bug was encountered. Create a New DAG:
Create a new DAG file in your Airflow DAGs folder using the provided code snippet. Ensure you have the necessary imports and helper files (constants.common_constants, helper.base_helper) or adjust the code to remove those dependencies if they're specific to your environment. Scenario 1 - Define get_config_data as a Normal Function:
In the DAG file, define get_config_data as a normal function without the @task decorator. Load the DAG in the Airflow UI. You should see that the task chain is created as expected, with the task group dependencies preserved. Note: While this approach works, it's not ideal because the function executes every time the scheduler parses the file, leading to potential performance degradation. Scenario 2 - Use the @task Decorator for get_config_data:
Now, add the @task decorator to get_config_data in the DAG file to ensure it only runs when the DAG is triggered. Load the DAG in the Airflow UI again. Observe that the tasks using the output of get_config_data are now directly attached to it in the DAG chain, breaking the intended task group dependencies. Scenario 3 - Attempt to Use XCom:
To try and solve the issue from Scenario 2, modify the DAG so that the output of get_config_data is used in the first task of demo_tash_group. Push the value of my_num into XCom within the first task and attempt to retrieve it in the subsequent tasks. Observe that when fetching the XCom value, you receive a list of all XComs for every dynamic task, rather than the specific XCom value for the intended task group. Document the Observations:
Take snapshots of the DAG structures in the Airflow UI for both Scenario 1 and Scenario 2 to illustrate the difference in task dependencies and the issues encountered. By following these steps, you should be able to reproduce the bug and observe the issues with task group dependencies and dynamic task outputs when using the .expand method in Airflow.
Operating System
Ubuntu
Versions of Apache Airflow Providers
No response
Deployment
Amazon (AWS) MWAA
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
Code of Conduct