astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
607 stars 153 forks source link

Hook to be called after cosmos task execution #976

Open SiddiqueAhmad opened 4 months ago

SiddiqueAhmad commented 4 months ago

To pipe dbt test results.json, We need to register hook with results so users can use it for further integration. We need this for integration with openmetadata for data quality.

tatiana commented 4 months ago

Thanks a lot, @SiddiqueAhmad , for writing down this!

Giving further context, this came from a discussion in Slack #airflow-dbt: https://apache-airflow.slack.com/archives/C059CC42E9W/p1715857421750369

It is a valid use case, and we don't have an out-of-the-box solution at the moment.

Similar to how we're handling on_warning_callback: https://github.com/astronomer/astronomer-cosmos/blob/af11d348cd87bdbd7cdef52535aaca74287b18e8/cosmos/operators/local.py#L592

We could have a mechanism for users to define a function that would be called after Cosmos tasks are executed and that could receive the result/context. I believe, ideally, this function would affect all (at least Local/VirtualEnv) Cosmos Operators. This may solve the problem described in https://github.com/astronomer/astronomer-cosmos/issues/903.

Did you check with your team if someone would be available/interested in implementing this, @SiddiqueAhmad ?

ghjklw commented 3 months ago

Very interested in this feature 😊

Could it be an option to leverage/expose Airflow built-in callbacks? Or the BaseOperator's pre_execute and post_execute tasks?

In either case Airflow already has the logic to execute these callbacks after the task has run and pass them the context (+ eventually result) and they are in some way already implemented since Cosmos operators inherit from Airflow's BaseOperator.

I believe it might actually already be possible using the operator arguments feature, but I haven't had the opportunity to test it yet.

If that is the case, then we could just document it and provide an example. It would also help a lot to type operator_args with a dataclass or TypedDict (OperatorArgs | dict[str, Any] for backward compatibility), to provide good autocomplete and documentation of these arguments.

Update

I just ran a basic test with the following code:

from pprint import pprint

def test_on_success_callback(context: "Context") -> None:
    print("Success callback")
    pprint(context)

def test_post_execute(context: "Context", result: Any) -> None:
    print("Post execute")
    pprint(context)
    pprint(result)

DbtTaskGroup(
    group_id="test_dbt",
    project_config=project_config,
    profile_config=profile_config,
    execution_config=execution_config,
    render_config=render_config,
    operator_args={
        "on_success_callback": test_on_success_callback,
        "post_execute": test_post_execute,
    },
)

And it worked!

[2024-07-01, 16:21:19 CEST] {taskinstance.py:441} â–¼ Post task execution logs
[2024-07-01, 16:21:19 CEST] {logging_mixin.py:188} INFO - Post execute
[2024-07-01, 16:21:19 CEST] {logging_mixin.py:188} INFO - {'conf': <***.configuration.AirflowConfigParser object at 0x7f1976297070>, ...}
[2024-07-01, 16:21:19 CEST] {logging_mixin.py:188} INFO - None
[2024-07-01, 16:21:19 CEST] {taskinstance.py:1206} INFO - Marking task as SUCCESS. dag_id=akheloos_test_dag, task_id=test_dbt.v_test.run, run_id=scheduled__2024-06-30T02:00:00+00:00, execution_date=20240630T020000, start_date=20240701T142110, end_date=20240701T142119
[2024-07-01, 16:21:19 CEST] {logging_mixin.py:188} INFO - Success callback
[2024-07-01, 16:21:19 CEST] {logging_mixin.py:188} INFO - {'conf': <***.configuration.AirflowConfigParser object at 0x7f1976297070>, ...}
[2024-07-01, 16:21:19 CEST] {local_task_job_runner.py:240} INFO - Task exited with return code 0
GabrielEisenbergOlympus commented 1 month ago

Hi @tatiana, thanks for looking into this. I also need this functionality - unfortunately quite urgently to meet a requirement.

Is there anything that can be done in the meantime to address this before it is cosmos native functionality? In particular I need to access the run_results.json and manifest.json after each run, however, my major challenge is retrieving the /tmp subfolder in which the task is run. With that I could make my own workaround until this functionality is available.

I attempted the post_execute behaviour that @ghjklw suggested, however, it points to a different subdirectory.

Thanks so much!