astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
626 stars 159 forks source link

How to store all test results #801

Open DanMawdsleyBA opened 9 months ago

DanMawdsleyBA commented 9 months ago

I'm looking to store test results (Both the successful and failures so https://docs.getdbt.com/reference/resource-configs/store_failures won't be sufficient). I'm considering using a dbt package such as https://github.com/brooklyn-data/dbt_artifacts which works with an on run end which will upload all the results at the end. As far as I'm aware airflow doesn't allow for always run a task on run end like how dbt does. So is there any way that I can materialize the results of all the tests using Cosmos? If not currently would it be possible to do somehow?

For added context we're also looking to integrate these test results into Atlan.

dosubot[bot] commented 9 months ago

🤖

Hi @DanMawdsleyBA! I'm Dosu, a friendly bot here to assist you with your questions and issues related to Cosmos while we wait for a human maintainer. Don't worry, although I'm a bot, I'm here to make your experience smoother and easier. Looking forward to helping you out!

Yes, it is possible to materialize and store all test results, both successful and failures, using Cosmos. You can achieve this by using the on_warning_callback parameter in the create_test_task_metadata function. This callback function is called on warnings with additional Context variables “test_names” and “test_results” of type List.

You can define a callback function that logs all test results and stores them in a desired location. This location can then be integrated with Atlan.

Here is an example of how you can define the callback function:

def log_test_results(context):
    test_names = context['test_names']
    test_results = context['test_results']

    for test_name, test_result in zip(test_names, test_results):
        # Log the test result
        logger.info(f"Test {test_name} result: {test_result}")

        # Store the test result in a desired location
        # This can be a database, a file, etc.
        store_test_result(test_name, test_result)

You can then pass this function as the on_warning_callback parameter when calling create_test_task_metadata:

test_meta = create_test_task_metadata(
    "test",
    execution_mode,
    test_indirect_selection,
    task_args=task_args,
    node=node,
    on_warning_callback=log_test_results,
)

This will ensure that all test results are logged and stored, and can then be integrated into Atlan.

The create_test_task_metadata function in the Cosmos codebase does support the on_warning_callback parameter for logging and storing all test results. This is evident in the DbtTestKubernetesOperator class where the on_warning_callback parameter is used in the __init__ method. If the on_warning_callback is provided, it is stored as an instance variable and used in the _handle_warnings method to process warnings from the logs.

In the _handle_warnings method, if there are warnings in the logs, the on_warning_callback is called with the updated context that includes the test names and results.

You can find more details in the cosmos/airflow/graph.py and cosmos/operators/kubernetes.py files in the Cosmos repository.

I hope this helps! If you have any other questions, feel free to ask.

-Dosu

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a đź‘Ť if this is helpful and đź‘Ž if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

DanMawdsleyBA commented 9 months ago

@tatiana / any thoughts on the above?

It might also be desirable to have a task run on end that syncs the run_results.json with an S3 bucket (Perhaps something similar to the docs operator) as Atlan can interpret it directly. This task would have to always run regardless of if previous tasks have failed (In order to capture test results that are failures)

Although my concern with the above would be what happens if an airflow deployment gets redeployed would those run_results.json files be overidden/deleted.

tatiana commented 9 months ago

Hi @DanMawdsleyBA, this is a very valid use case - and it would be great if Cosmos could support it.

At the moment, Cosmos 1.3 only offers a built-in "post command" customisation via the callback feature, which only works on failed tests.

The other possibility you'd have is to customise how you'd like dbt test to behave in Cosmos, using this feature.

How are you running your tests in Cosmos? Are you using TestBehavior.AFTER_ALL or TestBehavior.AFTER_EACH (default)?

From an Atlan perspective, would it be okay if those test results files were sent on a per-task basis, considering you're running using TestBehavior.AFTER_EACH?

Have you tried using https://github.com/brooklyn-data/dbt_artifacts with Cosmos? From what I understood from the documentation, it would be triggered as part of the dbt command execution itself, so I'd expect it to work.

Another feature we could consider implementing is to expose in Cosmos the ability to upload the artefacts generated in target to {s3, gcs, azure, etc). What do you think? One challenge with this approach may be how we'd like to differentiate between DAG runs/task runs.

Metystes commented 8 months ago

Another feature we could consider implementing is to expose in Cosmos the ability to upload the artefacts generated in target to {s3, gcs, azure, etc). What do you think?

That would be great functionality, and it could be useful in another scenarios. For example, when running a bigger DBT project as a single task (to avoid overhead with starting new pods), and one of the tasks fails, we could use the retry command to rerun only the failed models. To do that, we would need to have run_results.json. Plus, it would help with the debugging to see the compiled code.

ms32035 commented 6 months ago

@tatiana I will add one thing to the problem.

In case tests fail, https://github.com/astronomer/astronomer-cosmos/blob/7c4d7d6e0c680a4847414ed4eb525b7baa27c8c1/cosmos/operators/local.py#L342 this line will actually throw an exception before the callback is executed, so run_results.json is actually lost and in many cases there's no way to report on failed tests.

fabiomx commented 6 months ago

@ms32035,

yes, FYI, I raised the same issue here: https://github.com/astronomer/astronomer-cosmos/issues/867