astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
638 stars 161 forks source link

[Feature] Invoking macro directly from a DAG file #1243

Open sangee14 opened 2 weeks ago

sangee14 commented 2 weeks ago

Description

I am trying to invoke a macro directly from the DAG file as a task rather than being invoked from any of the model. Below is the code, I am using which is not working as expected, please help on this,

run_macro = DbtTaskGroup(
    group_id="dbt_run_macro",
    project_config=ProjectConfig(
        dbt_project_path="/usr/local/airflow/dags/dbt/project_folder"
    ),
    profile_config=profile_config,
    execution_config=execution_config,
    operator_args={"select": "run-operation"},
    render_config=RenderConfig(
        select=["path:macros"]
    ),
)

The code does not rendering any of the macros, but without render_config, it selects all the models and snapshots. So, the problem is that, its not picking up any macros from macros folder, which holds upto 7-8 macros in it.

Use case/motivation

The code should pick up the all or specific macro and execute it successfully from the DAG file, rather than invoking from any of the models. All the macros runs perfectly outside of airflow using the command dbt run-operation macro_name

Related issues

No response

Are you willing to submit a PR?

tatiana commented 2 weeks ago

@sangee14, would it be possible to use DbtRunOperationLocalOperator‎ directly within a traditional Airflow TaskGroup? https://github.com/astronomer/astronomer-cosmos/blob/main/cosmos/operators/local.py#L766

When you use DbtTaskGroup, Cosmos will attempt to run models (using the run command), seeds, snapshots, sources and tests - not necessarily macros.

sangee14 commented 2 weeks ago

DbtRunOperationLocalOperator‎ does not help, even tried the below code using PythonOperator, but the macro is not executed neither got any error.

def run_dbt_macro(**kwargs):
    dbt_executable = execution_config.dbt_executable_path
    print(f"the path is: {dbt_executable}") 

    dbt_command = [
        dbt_executable, 
        'run-operation',
        'macro_name','-v',
        '--project-dir', dbt_project_path,
        '--profiles-dir', dbt_profile_path,
        '--profile', 'default',
        '--target', running_env,
        '--debug'
    ]
    print(f"the command is: {dbt_command}")

    try:
        process = subprocess.run(dbt_command, capture_output=True, text=True)
        print("STDOUT:", process.stdout)
        print("STDERR:", process.stderr)
        # process = subprocess.Popen(dbt_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        # stdout, stderr = process.communicate()

        # print(f"STDOUT: {stdout.decode('utf-8')}")
        # if stderr:
        #     print(f"STDERR: {stderr.decode('utf-8')}")

        if process.returncode != 0:
            raise Exception(f"dbt command failed with exit code {process.returncode}")

    except Exception as e:
        raise ValueError(f"Error while running dbt macro: {e}")
    return process.stdout,process.stderr

    run_macro = PythonOperator(
        task_id="run_specific_macro",
        python_callable=run_dbt_macro,
    )