astronomer / astro-provider-databricks

Orchestrate your Databricks notebooks in Airflow and execute them as Databricks Workflows
Apache License 2.0
21 stars 11 forks source link

Bug - Invalid dependency graph for tasks #71

Open singhsatnam opened 6 months ago

singhsatnam commented 6 months ago

Describe the bug

While creating a dependency between two tasks created using DatabricksTaskOperator() does not use the task_key specified, but uses dagName__groupId__taskKey. This is inconsistent with the tasks created on Databricks because they correctly use the task_key specified.

To Reproduce

Steps to reproduce the behavior:

Run the following code with a valid cluster config and update the path to two notebooks on databricks which could simply print hello.

from airflow.decorators import dag
from astro_databricks.operators.common import DatabricksTaskOperator
from astro_databricks.operators.workflow import DatabricksWorkflowTaskGroup
from pendulum import datetime

DATABRICKS_JOB_CLUSTER_KEY: str = "Airflow_Shared_job_cluster"
DATABRICKS_CONN_ID: str = "databricks_default"

job_cluster_spec: list[dict] = [
# A valid cluster config
]

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def dynamic_template():
    task_group = DatabricksWorkflowTaskGroup(
        group_id="projectv2",
        databricks_conn_id=DATABRICKS_CONN_ID,
        job_clusters=job_cluster_spec,
    )
    with task_group:
        print_1 = DatabricksTaskOperator(
            task_id="print_1",
            databricks_conn_id=DATABRICKS_CONN_ID,
            job_cluster_key=DATABRICKS_JOB_CLUSTER_KEY,
            task_config={
                "task_key": "print_1",
                "notebook_task": {
                    "notebook_path": "path_to_notebook/print_test1",
                    "source": "WORKSPACE",
                },
            },
        )

        print_2 = DatabricksTaskOperator(
            task_id="print_2",
            databricks_conn_id=DATABRICKS_CONN_ID,
            job_cluster_key=DATABRICKS_JOB_CLUSTER_KEY,
            task_config={
                "task_key": "print_2",
                "notebook_task": {
                    "notebook_path": "path_to_notebook/print_test2",
                    "source": "WORKSPACE",
                },
            },
        )
        print_2.set_upstream(print_1)
dynamic_template()

Screenshots

image image

Expected behavior

This should create a DAG with two tasks - print_1 and print_2 - and print_2 should be dependent on print_1.

Desktop (please complete the following information):

OS: macos Ventura 13.6.1 Browser Firefox Version 123.0.1

tatiana commented 4 months ago

Thanks for reporting this issue, @singhsatnam ! Would you be up for contributing to the project and fixing this issue?