astronomer / astronomer-providers

Airflow Providers containing Deferrable Operators & Sensors from Astronomer
https://astronomer-providers.rtfd.io/
Apache License 2.0
128 stars 25 forks source link

Invalid dependency graph for tasks #1499

Closed singhsatnam closed 3 months ago

singhsatnam commented 3 months ago

Describe the bug While creating a dependency between two tasks created using DatabricksTaskOperator() does not use the task_key specified, but uses dagName__groupId__taskKey. This is inconsistent with the tasks created on Databricks because they correctly use the task_key specified.

To Reproduce Steps to reproduce the behavior:

  1. Run the following code with a valid cluster config and update the path to two notebooks on databricks which could simply print hello.
from airflow.decorators import dag
from astro_databricks.operators.common import DatabricksTaskOperator
from astro_databricks.operators.workflow import DatabricksWorkflowTaskGroup
from pendulum import datetime

DATABRICKS_JOB_CLUSTER_KEY: str = "Airflow_Shared_job_cluster"
DATABRICKS_CONN_ID: str = "databricks_default"

job_cluster_spec: list[dict] = [
# A valid cluster config
]

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def dynamic_template():
    task_group = DatabricksWorkflowTaskGroup(
        group_id="projectv2",
        databricks_conn_id=DATABRICKS_CONN_ID,
        job_clusters=job_cluster_spec,
    )
    with task_group:
        print_1 = DatabricksTaskOperator(
            task_id="print_1",
            databricks_conn_id=DATABRICKS_CONN_ID,
            job_cluster_key=DATABRICKS_JOB_CLUSTER_KEY,
            task_config={
                "task_key": "print_1",
                "notebook_task": {
                    "notebook_path": "path_to_notebook/print_test1",
                    "source": "WORKSPACE",
                },
            },
        )

        print_2 = DatabricksTaskOperator(
            task_id="print_2",
            databricks_conn_id=DATABRICKS_CONN_ID,
            job_cluster_key=DATABRICKS_JOB_CLUSTER_KEY,
            task_config={
                "task_key": "print_2",
                "notebook_task": {
                    "notebook_path": "path_to_notebook/print_test2",
                    "source": "WORKSPACE",
                },
            },
        )
        print_2.set_upstream(print_1)
dynamic_template()

Expected behavior This should create a DAG with two tasks - print_1 and print_2 - and print_2 should be dependent on print_1.

Screenshots image

image

Desktop (please complete the following information):

singhsatnam commented 3 months ago

Created bug in the correct repo: https://github.com/astronomer/astro-provider-databricks/issues/71