astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
776 stars 171 forks source link

Use callback in case of command failure #867

Open fabiomx opened 9 months ago

fabiomx commented 9 months ago

As mentioned in this Slack thread with @jlaneve, I've built a custom local operator, based on the DbtLocalBaseOperator, to upload the run_results.json file to GCS after running the test command, using the callback parameter.

Everything works fine if the tests are ok, but if any test fails, an Airflow exception is raised, and the callback is not called (https://github.com/astronomer/astronomer-cosmos/blob/main/cosmos/operators/local.py#L321-L344).

[2024-02-28, 15:26:13 CET] {log.py:232} WARNING - [[34m2024-02-28T14:26:13.085+0000[0m] {[34msubprocess.py:[0m94} INFO[0m - [35m(astronomer-cosmos)[0m - [0m14:26:13  Done. PASS=114 WARN=0 ERROR=1 SKIP=0 TOTAL=115[0m
[2024-02-28, 15:26:13 CET] {subprocess.py:94} INFO - [0m14:26:13  Done. PASS=114 WARN=0 ERROR=1 SKIP=0 TOTAL=115
[2024-02-28, 15:26:14 CET] {log.py:232} WARNING - [[34m2024-02-28T14:26:14.488+0000[0m] {[34msubprocess.py:[0m98} INFO[0m - [35m(astronomer-cosmos)[0m - Command exited with return code 1[0m
[2024-02-28, 15:26:14 CET] {subprocess.py:98} INFO - Command exited with return code 1
...
[2024-02-28, 15:26:14 CET] {taskinstance.py:1826} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/cosmos/operators/local.py", line 366, in execute
    self.build_and_run_cmd(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/cosmos/operators/local.py", line 361, in build_and_run_cmd
    result = self.run_command(cmd=dbt_cmd, env=env, context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/cosmos/operators/local.py", line 247, in run_command
    self.exception_handling(result)
  File "/opt/python3.8/lib/python3.8/site-packages/cosmos/operators/local.py", line 128, in exception_handling
    raise AirflowException(
airflow.exceptions.AirflowException: ('dbt command failed. The command returned a non-zero exit code 1.
...

At least for the test command, I would still need to upload the run_results.json produced in the tmp_project_dir. even in the case of failure. Indeed, when the tests fail, it's precisely when the information from the run_results.json is most critical. Moreover, after the failure, I can't access the tmp_project_dir anymore, so I haven't been able to use the on_failure_callback either.

dosubot[bot] commented 9 months ago

🤖

Hey @fabiomx, long time no see! Hope you've been coding away without too many bugs. 😉

To ensure that the callback parameter is still called and the run_results.json file is uploaded to GCS even when the test command fails and an Airflow exception is raised in the DbtLocalBaseOperator in the Cosmos repository, you can modify the exception handling logic in both handle_exception_subprocess and handle_exception_dbt_runner methods. After raising the Airflow exception, you should call the callback function if it's not None and then handle the uploading of run_results.json to GCS.

Here's how you can modify the handle_exception_subprocess and handle_exception_dbt_runner methods:

def handle_exception_subprocess(self, result: FullOutputSubprocessResult) -> None:
    try:
        if self.skip_exit_code is not None and result.exit_code == self.skip_exit_code:
            raise AirflowSkipException(f"dbt command returned exit code {self.skip_exit_code}. Skipping.")
        elif result.exit_code != 0:
            raise AirflowException(
                f"dbt command failed. The command returned a non-zero exit code {result.exit_code}. Details: ",
                *result.full_output,
            )
    finally:
        if self.callback:
            self.callback(self.project_dir)
        self.upload_run_results_to_gcs(self.project_dir)

def handle_exception_dbt_runner(self, result: dbtRunnerResult) -> None:
    try:
        if not result.success:
            if result.exception:
                raise AirflowException(f"dbt invocation did not complete with unhandled error: {result.exception}")
            else:
                node_names, node_results = extract_dbt_runner_issues(result, ["error", "fail", "runtime error"])
                error_message = "\n".join([f"{name}: {result}" for name, result in zip(node_names, node_results)])
                raise AirflowException(f"dbt invocation completed with errors: {error_message}")
    finally:
        if self.callback:
            self.callback(self.project_dir)
        self.upload_run_results_to_gcs(self.project_dir)

def upload_run_results_to_gcs(self, project_dir: str) -> None:
    # Implement the logic to upload run_results.json to GCS
    # This is a placeholder function. You need to replace it with actual code to upload to GCS.
    pass

In this modification, the finally block ensures that the callback function is called and the run_results.json file is uploaded to GCS regardless of whether an exception was raised due to a failed dbt command. You need to implement the upload_run_results_to_gcs method with the actual logic to upload the file to GCS, which is not provided here.

This approach allows you to handle the dbt command failure gracefully by ensuring that necessary cleanup or follow-up actions (like calling a callback and uploading files to GCS) are still performed even in the event of a failure.

The current implementation of the callback parameter in the DbtLocalBaseOperator is as a parameter that accepts a callable function. This callable function is expected to take a single argument, a string representing the path to the dbt project directory, and return None. The callback function is intended to be called after a dbt run completes.

Here is the relevant part of the constructor signature from the DbtLocalBaseOperator class that shows how the callback parameter is defined:

def __init__(
        self,
        profile_config: ProfileConfig,
        invocation_mode: InvocationMode | None = None,
        install_deps: bool = False,
        callback: Callable[[str], None] | None = None,
        should_store_compiled_sql: bool = True,
        **kwargs: Any,
    ) -> None:
        ...
        self.callback = callback
        ...

This implementation detail indicates that the callback parameter is designed to be flexible, allowing users of the DbtLocalBaseOperator to specify custom logic to be executed after dbt commands are run, by passing a function that does something with the path to the dbt project directory.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.