dbt-flow
is a package that adds integration tests functionality for dbt.
sources
, tables
(incremental or not) and seeds
can be mocked with customized data and then target nodes are compared against the expected.
In a nutshell, it's unit-tests for data transformations.
Just add the following to your project packages.yml
:
packages:
- git: "https://github.com/WillianFuks/dbt-flow.git"
revision: 0.0.2
Python dependencies:
- dbt-postgres==1.5.3
- dbt-bigquery==1.5.3 (if using bigquery)
dbt-hub packages:
- dbt_utils>=1.0.0
- metrics==1.5.0
Postgres
and BigQuery
are officially supported. Adding new adapters should be straightforward, it's something we didn't have time yet to do.dbt-flow
was born out of a job rendered for a large company that required lots of business rules and fairly complex transformations. As it was necessary
to have some confirmation and quality control over the delivered data project, dbt-flow
was first conceptualized then implemented to offer the extra guidance
on this regard.
Despite having being tested in production, the project is still in alpha development and is expected to be unstable in some corner cases.
Simply define your code tests in the test
folder of your dbt project. Here's an example of a test running for the model customers as defined
in integration-tests/postgres/tests/flow/marts/tests.sql:
-- depends_on: {{ ref('dbt_metrics_default_calendar') }}
-- depends_on: {{ ref('stg__customers') }}
-- depends_on: {{ ref('stg__orders') }}
-- depends_on: {{ ref('int_payments_mapped') }}
-- depends_on: {{ ref('stg__payments') }}
-- depends_on: {{ ref('payments_mapping') }}
{{
config(
tags=['flow-test']
)
}}
{%- set test1 = dbt_flow.setup_test(target_model='customers', test_name='cust1', options={"drop_tables": false}, test_description='simple flow test on customers node, without column "payment_method_name" in expected results.',
mocks={
"stg__orders": """
select 1 as order_id, 2 as customer_id, '2023-01-01'::Timestamp as order_date, 'statusB' as status
""",
"stg__payments": """
select 1 as payment_id, 1 as order_id, 1 as payment_method_id, 100 as amount
""",
"test_source_customers.test_customers": """
select 2 as customer_id, 'first_name' as first_name, 'last_name' as last_name
""",
"payments_mapping": """
select 1 as payment_method_id, 'payment_method_A' as name union all
select 2 as payment_method_id, 'payment_method_B' as name
"""
},
expected="""
select
2 as customer_id,
'first_name' as first_name,
'last_name' as last_name,
'2023-01-01'::Timestamp as first_order_date,
'2023-01-01'::Timestamp as last_order_date,
1 as number_of_orders,
100 as total_amount
"""
) -%}
{{ dbt_flow.run_tests([test1], global_options={"drop_tables": false}) }}
Let's break down each component of the test:
The first requirement in any dbt_flow
test is to make explict all nodes that are dependencies of the target node being tested (customers in this example).
-- depends_on: {{ ref('dbt_metrics_default_calendar') }}
-- depends_on: {{ ref('stg__customers') }}
-- depends_on: {{ ref('stg__orders') }}
-- depends_on: {{ ref('int_payments_mapped') }}
-- depends_on: {{ ref('stg__payments') }}
-- depends_on: {{ ref('payments_mapping') }}
This was an implementation decision to not take the route of mocking the macros ref()
and source()
because of the following:
By mocking ref
and source
the code becomes tightly coupled with the main runtime of dbt. If any bug or mistake is implemented in the mocks
that would affect production systems.
If dbt-core makes changes to the contracts of both ref()
and source()
that would prevent end users to upgrade their dbt version, which would
lead to a lock-in to the testing framework.
The mocking approach also hinges the possibility of mocking metric nodes as their ref
function is defined in yamls.
Incremental tables cannot be tested solely with the mocked macros.
dbt-flow
performs the tests by reading through the whole graph dependency as given by dbt and loops through each node and their dependencies, building
mocks when necessary. With this approach, all the above points are eliminated. But there are several downsides as well:
As show above, each node that is part of the test must be referenced at the beginning of the test (such as -- depends_on: {{ ref('stg__customers') }}
). This
is so because dbt-flow
loops through all dependency nodes and renders their raw sql code as defined in Jinja and dbt complains that it couldn't find previously
a reference to the node being rendered. That's why we need to make it explicit so dbt can see such dependencies at compile time.
All materializations must be customized. That is, tables and incremental tables must receive newer macros that replaces dbt's original materializaion ones. As it's
not possible to trigger those macros from the test
task runtime that was the only way to properly mock the nodes and build them.
Each adapter must implement its own materialization strategies.
Each test can be setup with the helper macro dbt_flow.setup_test()
. Input parameters are:
target_model: str, such as 'customers'. It represents the node that we want to test.
test_name: str, it's an identifier of the test. This string is used as a suffix when creating the mocked tables in the database. For instance,
if test_name=test1
then the mocked node "stgcustomers" will be stored as "test1stg__customers".
options: Dict[str, bool], general options for how to run each specific test, such as whether to delete created tables aftewards or not. Available options are:
True
then prints information about this test.True
then the is_incremental()
macro will be mocked as truthy and tables will be considered incremental.test_description: str, mainly for explaining what the current test is about. Intended for documentation purposes.
mocks: Dict[str, str], one of the most important inputs, it specifies how nodes will be mocked through SQL queries. Keys must be the name
of the node being tested (just as in their filenames) or, for source nodes, it must be the name of the source combined with the name of the table
inside that source by a dot ('.'
). For instance, here's how the node "stg__orders" is mocked for postgres adapter:
"stg__orders": """ select 1 as order_id, 2 as customer_id, '2023-01-01'::Timestamp as order_date, 'statusB' as status """
that tells dbt-flow
to replace the original "stg__orders" file with the newer mocked data. For mocking the source "test_source_customers" and
its table "test_customers", simply combine those two with a dot like so:
"test_source_customers.test_customers": """ select 2 as customer_id, 'first_name' as first_name, 'last_name' as last_name """
expected: str, a SQL query that represents what the results of the transformations should be. Like mocks but for the target node.
And then run dbt tests as you normally would:
dbt test -s tag:flow-test
If it succeeds then no error is returned.
In case of failure, dbt-utils
returns the differences observed between expected (represented as "a") and the actual observed results ("b"):
In the image above, we can see that the expected input (a) have a total_amount of "200" but the obtained result was of "100".
Final step is to trigger the test execution. This is done so in the command:
{{ dbt_flow.run_tests([test1], global_options={"drop_tables": false}) }}
Tests are initiated by the macro dbt_flow.run_tests()
. It accepts two inputs:
list_setups: List[Dict[str, Any]], it receives a list of all the test_setups
as previously defined. Notice that it's quite possible to perform
several tests at once. Simply define them and then run dbt_flow.run_tests([test1, test2, ...])
.
global_options: Dict[str, Any], this is similar to the test "options" keys but works globally for all tests. It has less precedence than the inner most specific options defined on each test.
Notice that it must be triggered with the Jinja {{ ... }}
double curly braces code as otherwise no mocking query will be returned to the sql file and
dbt will crash when trying to execute it.
Running tests against an external database such as is the case for BigQuery may slow things down. What we found to be useful is to have
one file for each test and run dbt with multiple threads. For instance, for the eCommerce that dbt-flow was implemented for we designed tests such as
(just make sure to name tests test_name
differently on each file):
test_orders.sql
test_incremental_orders.sql
test_origin_systems_collide.sql
The error messages as presented by dbt-flow
may not be helpful. We recommend running your mocks against your database to confirm they are working.
We're still looking for ways to improve that. What also might help is to set verbose: True
in your run to see at what step the system crashes.
ARRAYs and STRUCTs are both converted to JSON_STRING! There is no other way, currently, to test using dbt-utils
both types of fields so casting
is mandatory. As an end-user you don't have to change anything, just express your expected data as you normally would (referencing them as ARRAYs and STRUCTS)
and dbt-flow
will carry the necessary transformations for you (by applying the function TO_JSON_STRING()
)
As already stated before, dbt_flow
is still alpha and was implemented on a demanding environment, which means parts of the code was just hastly implemented.
Not all adapters could be implemented but adding those should be direct (it does require understanding their materialization scripts though).
Do consider opening issues or sending Pull Requests to help improve this project.