astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
601 stars 152 forks source link

[Bug] Tried to set relationships between tasks in more than one DAG #1218

Open davidmorosini opened 4 hours ago

davidmorosini commented 4 hours ago

Astronomer Cosmos Version

Other Astronomer Cosmos version (please specify below)

If "Other Astronomer Cosmos version" selected, which one?

1.6.0

dbt-core version

1.7.16

Versions of dbt adapters

dbt-databricks==1.7.16

LoadMode

DBT_LS_MANIFEST

ExecutionMode

VIRTUALENV

InvocationMode

SUBPROCESS

airflow version

2.8.2

Operating System

Debian GNU/Linux 12 (bookworm) (Local using docker)

If a you think it's an UI issue, what browsers are you seeing the problem on?

No response

Deployment

Other

Deployment details

No response

What happened?

Hello everyone,

We are trying to update the Cosmos version to 1.6.0 and have encountered a problem. We would like to request your help in understanding what might be happening.

Before describing the issue, please consider the code snippet below. The object partial_dbt_env is an instance of <class 'airflow.models.xcom_arg.PlainXComArg'>, similar to the example: {{ task_instance.xcom_pull(task_ids='TASK_ID', dag_id='DAG_NAME', key='return_value') }}.

from cosmos import DbtTaskGroup, ProjectConfig

project_config = ProjectConfig(
    dbt_project_path=<DBT PROJECT PATH>,
    manifest_path=<MANIFEST PATH>,
    env_vars=partial_dbt_env
)

transform_task_group = DbtTaskGroup(
    group_id=<GROUP ID>,
    project_config=project_config,
    operator_args=<OPERATOR ARGS>,
    profile_config=<PROFILE CONFIG>,
    execution_config=<EXECUTION CONFIG>,
    render_config=<RENDER CONFIG>
)

When attempting to update from version 1.5.1 to version 1.6.0 using the code above, we encountered a significant issue. The exception raised is:

airflow.exceptions.AirflowException: Tried to set relationships between tasks in more than one DAG: {<DAG: DAG_NAME>, <DAG: DAG_NAME>}.

After investigating, we noticed that the way some assignments are made in the cosmos/converter.py code has changed. For example:

This change means that Cosmos uses both the original objects and the copies created internally. Since these objects carry a reference to the DAG they belong to, the duplication results in two distinct objects, each referencing a DAG, which is identified by the Airflow code when performing a set of all objects with the DAG identification. Previously, with reference passing, the result was the same object. Now, with the copy, the set operation performed by Airflow considers them as distinct objects, leading to the exception.

Note: If the env_vars attribute of the ProjectConfig object is removed, the issue is immediately resolved. However, this leads to other problems, are need the value from partial_dbt_env because it is the result of a task, and thus it is a templated value. Note: Up to version 1.5.1, the code works without issues; this problem occurs only in version 1.6.0.

We would like to understand the reason for this change in how these variable assignments were made (so that we can adopt the best practices established by you) and whether there is a way to work around this issue in version 1.6.0.

Thank you in advance for your attention and assistance.

Relevant log output

Traceback (most recent call last):
  File "/opt/airflow/dags/core_operators/dbt_run.py", line 109, in dbt_run
    transform_task_group = DbtTaskGroup(
  File "/home/airflow/.local/lib/python3.10/site-packages/cosmos/airflow/task_group.py", line 28, in __init__
    DbtToAirflowConverter.__init__(self, *args, **specific_kwargs(**kwargs))
  File "/home/airflow/.local/lib/python3.10/site-packages/cosmos/converter.py", line 295, in __init__
    build_airflow_graph(
  File "/home/airflow/.local/lib/python3.10/site-packages/cosmos/airflow/graph.py", line 303, in build_airflow_graph
    task_or_group = conversion_function(  # type: ignore
  File "/home/airflow/.local/lib/python3.10/site-packages/cosmos/airflow/graph.py", line 251, in generate_task_or_group
    task_or_group = create_airflow_task(task_meta, dag, task_group=task_group)
  File "/home/airflow/.local/lib/python3.10/site-packages/cosmos/core/airflow.py", line 33, in get_airflow_task
    airflow_task = Operator(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 445, in apply_defaults
    self.set_xcomargs_dependencies()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1159, in set_xcomargs_dependencies
    XComArg.apply_upstream_relationship(self, arg)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom_arg.py", line 129, in apply_upstream_relationship
    op.set_upstream(operator)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 270, in set_upstream
    self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 228, in _set_relatives
    raise AirflowException(f"Tried to set relationships between tasks in more than one DAG: {dags}")
airflow.exceptions.AirflowException: Tried to set relationships between tasks in more than one DAG: {<DAG: DAG_NAME>, <DAG: DAG_NAME>}

How to reproduce

Assemble the provided code by filling in the values according to the runtime we use. The key point to reproducing this error is using the XCom return from a Python operator as the dictionary for partial_dbt_env.

from airflow.decorators import task
from cosmos import DbtTaskGroup, ProjectConfig

@task(task_id="build_partial_dbt_env_vars_operator")
def build_partial_dbt_env() -> Dict[str, str | bool]:
    # some code
    # This return is for demonstration purposes only
    return {
        "ENV_VAR_NAME": "value",
        "ENV_VAR_NAME_2": False
    }

partial_dbt_env = build_partial_dbt_env()

project_config = ProjectConfig(
    dbt_project_path=<DBT PROJECT PATH>,
    manifest_path=<MANIFEST PATH>,
    env_vars=partial_dbt_env
)

transform_task_group = DbtTaskGroup(
    group_id=<GROUP ID>,
    project_config=project_config,
    operator_args=<OPERATOR ARGS>,
    profile_config=<PROFILE CONFIG>,
    execution_config=<EXECUTION CONFIG>,
    render_config=<RENDER CONFIG>
)

partial_dbt_env >> transform_task_group

Anything else :)?

No response

Are you willing to submit PR?

dosubot[bot] commented 4 hours ago

Hey @davidmorosini! 👋 I'm here to help you with your issue.

Updating Astronomer Cosmos from version 1.5.1 to 1.6.0 introduces stricter validation checks to ensure that tasks are not shared between multiple DAGs or TaskGroups. This is likely causing the AirflowException you are encountering. The deepcopy function is used in the cosmos/converter.py file to create independent copies of configuration objects, which can lead to the issue of tasks being associated with more than one DAG [1].

To resolve this issue, ensure that each task is only associated with a single DAG or TaskGroup. Carefully review your DAG and TaskGroup configurations to ensure that tasks are not reused across different DAGs or TaskGroups. Here is a snippet from the build_airflow_graph function that highlights the validation logic:

def build_airflow_graph(
    nodes: dict[str, DbtNode],
    dag: DAG,  # Airflow-specific - parent DAG where to associate tasks and (optional) task groups
    execution_mode: ExecutionMode,  # Cosmos-specific - decide what which class to use
    task_args: dict[str, Any],  # Cosmos/DBT - used to instantiate tasks
    test_indirect_selection: TestIndirectSelection,  # Cosmos/DBT - used to set test indirect selection mode
    dbt_project_name: str,  # DBT / Cosmos - used to name test task if mode is after_all,
    render_config: RenderConfig,
    task_group: TaskGroup | None = None,
    on_warning_callback: Callable[..., Any] | None = None,  # argument specific to the DBT test command
) -> None:
    # ...
    for node_id, node in nodes.items():
        conversion_function = node_converters.get(node.resource_type, generate_task_or_group)
        task_or_group = conversion_function(
            dag=dag,
            task_group=task_group,
            dbt_project_name=dbt_project_name,
            execution_mode=execution_mode,
            task_args=task_args,
            test_behavior=test_behavior,
            source_rendering_behavior=source_rendering_behavior,
            test_indirect_selection=test_indirect_selection,
            on_warning_callback=on_warning_callback,
            node=node,
        )
        if task_or_group is not None:
            tasks_map[node_id] = task_or_group

    create_airflow_task_dependencies(nodes, tasks_map)

Ensure that the configurations for project_config, render_config, and execution_config are set correctly and that tasks are not being reused across multiple DAGs or TaskGroups [2].

To continue talking to Dosu, mention @dosu.