snowplow / dbt-snowplow-utils

Snowplow utility functions to be used in conjunction with the snowplow-web dbt package.
Other
12 stars 6 forks source link
data-model dbt snowplow-analytics

early-release License Discourse posts

snowplow-logo

snowplow-utils

This package contains a mix of functionality to be used with the other Snowplow dbt packages, or to be used within your own packages/projects.

Includes:

Contents

Macros

Macros

There are many macros contained in this package, with the majority designed for use internally at Snowplow.

There are however a selection that were intended for public use and that can assist you in modelling Snowplow data. The documentation for these macros can be found below.

get_columns_in_relation_by_column_prefix (source)

This macro returns an array of column objects within a relation that start with the given column prefix. This is useful when you have multiple versions of a column within a table and want to dynamically identify all versions.

Arguments:

Returns:

Usage:

{% set matched_columns = snowplow_utils.get_columns_in_relation_by_column_prefix(
                          relation=ref('snowplow_web_base_events_this_run'),
                          column_prefix='custom_context_1_0_'
                          ) %}

{% for column in matched_columns %}
  {{ column.name }}
{% endfor %}

# Renders to something like:
'custom_context_1_0_1'
'custom_context_1_0_2'
'custom_context_1_0_3'

The order of the matched columns is denoted by their ordinal position.

combine_column_versions (source)

BigQuery Only. This macro is designed primarily for combining versions of custom context or an unstructured event column from the Snowplow events table in BigQuery.

As your schemas for such columns evolve, multiple versions of the same column will be created in your events table e.g. custom_context_1_0_0, custom_context_1_0_1. These columns contain nested fields i.e. are of a datatype RECORD. When modeling Snowplow data it can be useful to combine or coalesce each nested field across all versions of the column for a continuous view over time. This macro mitigates the need to update your coalesce statement each time a new version of the column is created.

Fields can be selected using 4 methods:

By default any returned fields will be assigned an alias matching the field name e.g. coalesce(<col_v2>.product, <col_v1>.product) as product. For heavily nested fields, the alias will be field's path with . replaced with _ e.g. for a field product.size.height will have an alias product_size_height. A custom alias can be supplied with the required_fields arg (see below).

Arguments:

Returns:

Usage:

The following examples assumes two 'product' context columns with the following schemas:

Example nested fields

All fields

{%- set all_fields = snowplow_utils.combine_column_versions(
                                relation=ref('snowplow_web_base_events_this_run'),
                                column_prefix='product_v'
                                ) -%}

select
{% for field in all_fields %}
  {{field}} {%- if not loop.last %},{% endif %}
{% endfor %}

# Renders to:
select
  coalesce(product_v2[safe_offset(0)].name, product_v1[safe_offset(0)].name) as name,
  coalesce(product_v2[safe_offset(0)].specs, product_v1[safe_offset(0)].specs) as specs,
  coalesce(product_v2[safe_offset(0)].specs.power_rating, product_v1[safe_offset(0)].specs.power_rating) as specs_power_rating,
  coalesce(product_v2[safe_offset(0)].specs.volume) as specs_volume,
  coalesce(product_v2[safe_offset(0)].specs.accessories, product_v1[safe_offset(0)].specs.accessories) as specs_accessories

Note fields within accessories are not unnested as accessories is of mode REPEATED. See limitations section below.

Fields filtered by name

{%- set required_fields = snowplow_utils.combine_column_versions(
                                relation=ref('snowplow_web_base_events_this_run'),
                                column_prefix='product_v',
                                required_fields=['name', ('specs.power_rating', 'product_power_rating')]
                                ) -%}

select
{% for field in required_fields %}
  {{field}} {%- if not loop.last %},{% endif %}
{% endfor %}

# Renders to:
select
  coalesce(product_v2[safe_offset(0)].name, product_v1[safe_offset(0)].name) as name,
  coalesce(product_v2[safe_offset(0)].specs.power_rating, product_v1[safe_offset(0)].specs.power_rating) as product_power_rating

Note we have renamed the power rating field by passing a tuple of the field name and desired field alias.

Fields filtered by level

{%- set fields_by_level = snowplow_utils.combine_column_versions(
                                relation=ref('snowplow_web_base_events_this_run'),
                                column_prefix='product_v',
                                nested_level=1
                                ) -%}

select
{% for field in fields_by_level %}
  {{field}} {%- if not loop.last %},{% endif %}
{% endfor %}

# Renders to:
select
  coalesce(product_v2[safe_offset(0)].name, product_v1[safe_offset(0)].name) as name,
  coalesce(product_v2[safe_offset(0)].specs, product_v1[safe_offset(0)].specs) as specs

Limitations

is_run_with_new_events (source)

This macro is designed for use with Snowplow data modelling packages like snowplow-web. It can be used in any incremental models, to effectively block the incremental model from being updated with old data which it has already consumed. This saves cost as well as preventing historical data from being overwritten with partially complete data (due to a batch back-fill for instance).

The macro utilizes the snowplow_[platform]_incremental_manifest table to determine whether the model from which the macro is called, i.e. {{ this }}, has already consumed the data in the given run. If it has, it returns false. If the data in the run contains new data, true is returned.

Arguments:

Returns:

Usage:

{{
  config(
    materialized='incremental',
    unique_key='screen_view_id',
    upsert_date_key='start_tstamp'
  )
}}

select
  ...

from {{ ref('snowplow_mobile_base_events_this_run' ) }}
where {{ snowplow_utils.is_run_with_new_events('snowplow_mobile') }} --returns false if run doesn't contain new events.

snowplow_web_delete_from_manifest (source)

The snowplow-web package makes use of a centralised manifest system to record the current state of the package. There may be times when you want to remove the metadata associated with particular models from the manifest, for instance to replay events through a particular model.

This can be performed as part of the run-start operation of the snowplow-web package, as described in the docs. You can however perform this operation independently using the snowplow_web_delete_from_manifest macro.

Arguments:

Usage:

dbt run-operation snowplow_web_delete_from_manifest --args "models: ['snowplow_web_page_views','snowplow_web_sessions']"
# or
dbt run-operation snowplow_web_delete_from_manifest --args "models: snowplow_web_page_views"

snowplow_mobile_delete_from_manifest (source)

The snowplow-mobile package makes use of a centralised manifest system to record the current state of the package. There may be times when you want to remove the metadata associated with particular models from the manifest, for instance to replay events through a particular model.

This can be performed as part of the run-start operation of the snowplow-mobile package, as described in the docs. You can however perform this operation independently using the snowplow_mobile_delete_from_manifest macro.

Arguments:

Usage:

dbt run-operation snowplow_mobile_delete_from_manifest --args "models: ['snowplow_mobile_screen_views','snowplow_mobile_sessions']"
# or
dbt run-operation snowplow_mobile_delete_from_manifest --args "models: snowplow_mobile_screen_views"

get_value_by_target (source)

This macro is designed to dynamically return values based on the target (target.name) you are running against. Your target names are defined in your profiles.yml file. This can be useful for dynamically changing variables within your project, depending on whether you are running in dev or prod.

Arguments:

Usage:

# dbt_project.yml
...
vars:
  snowplow_web:
    snowplow__backfill_limit_days: "{{ snowplow_utils.get_value_by_target(dev_value=1, default_value=30, dev_target_name='dev') }}"

Returns:

set_query_tag (source)

This macro takes a provided statement as argument and generates the SQL command to set this statement as the query_tag for Snowflake databases, and does nothing otherwise. It can be used to safely set the query_tag regardless of database type.

Arguments:

Usage:

{{ snowplow_utils.set_query_tag('snowplow_query_tag') }}

Returns:

get_sde_or_context (source)

This macro exists for Redshift and Postgres users to more easily select their self-describing event and context tables and apply de-duplication before joining onto their (already de-duplicated) events table. The root_id and root_tstamp columns are by default returned as schema_name_id and schema_name_tstamp respectively, where schema_name is the value in the schema_name column of the table. In the case where multiple entities may be sent in the context (e.g. products in a search results), you should set the single_entity argument to false and use an additional criteria in your join (see the snowplow docs for further details).

Note that it is the responsibility of the user to ensure they have no duplicate names when using this macro multiple times or when a schema column name matches a column already in the events table. In this case the prefix argument should be used and aliasing applied to the output.

Usage:

With at most one entity per context:

with {{ snowplow_utils.get_sde_or_context('atomic', 'nl_basjes_yauaa_context_1', "'2023-01-01'", "'2023-02-01'")}}

select
...
from my_events_table a
left join nl_basjes_yauaa_context_1 b on
    a.event_id = b.yauaa_context__id
    and a.collector_tstamp = b.yauaa_context__tstamp

With the possibility of multiple entities per context, your events table must already be de-duped but still have a field with the number of duplicates:

with {{ snowplow_utils.get_sde_or_context('atomic', 'nl_basjes_yauaa_context_1', "'2023-01-01'", "'2023-02-01'", single_entity = false)}}

select
...,
count(*) over (partition by a.event_id) as duplicate_count
from my_events_table a
left join nl_basjes_yauaa_context_1 b on
    a.event_id = b.yauaa_context__id
    and a.collector_tstamp = b.yauaa_context__tstamp
    and mod(b.yauaa_context__index, a.duplicate_count) = 0

Returns:

CTE sql for deduplicated records from the schema table, without the schema details columns. The final CTE is the name of the original table. e.g.

dd_my_context_table as (
  select ..., ... as dedupe_index from my_schema.my_context_table
),

my_context_table as (
  select ... from dd_my_context_table where dedupe_index = 1
)

With at most one entity per context:

dd_my_context_table as (
  select ..., ... as dedupe_index from my_schema.my_context_table
),

my_context_table as (
  select ..., root_id as my_context_table__id, root_tstamp as my_context_table__tstamp from dd_my_context_table where dedupe_index = 1
)

With the possibility of multiple entities per context, your events table must already be de-duped but still have a field with the number of duplicates:

dd_my_context_table as (
  select ..., ... as dedupe_index from my_schema.my_context_table
),

my_context_table as (
  select ..., , root_id as my_context_table__id, root_tstamp as my_context_table__tstamp, ... as my_context_table__index from dd_my_context_table
)

get_field (source)

This macro exists to make it easier to extract a field from our unstruct_ and contexts_ type columns for users in Snowflake, Databricks, and BigQuery (using a wildcard version number is only possible for BigQuery e.g. column_name = 'contexts_nl_basjes_yauaa_context_1_*'). The macro can handle type casting and selecting from arrays.

Returns:

sql line to select the field specified from the column

Usage:

Extracting a single field


select
{{ snowplow_utils.get_field(column_name = 'contexts_nl_basjes_yauaa_context_1',
                            field_name = 'agent_class',
                            table_alias = 'a',
                            type = 'string',
                            array_index = 0)}} as yauaa_agent_class
from
    my_events_table a

Extracting multiple fields


select
{% for field in [('field1', 'string'), ('field2', 'numeric'), ...] %}
  {{ snowplow_utils.get_field(column_name = 'contexts_nl_basjes_yauaa_context_1',
                            field_name = field[0],
                            table_alias = 'a',
                            type = field[1],
                            array_index = 0)}} as {{ field[0] }}
{% endfor %}

from
    my_events_table a

Array functions

get_array_to_string (source)

This macro takes care of harmonizing cross-db functions that flatten an array to a string. It takes an array column, a column prefix and a delimiter as an argument.

Usage:

{{ snowplow_utils.get_array_to_string('array_column', 'column_prefix', 'delimiter') }}

Returns:

This macro takes care of harmonizing cross-db functions that create an array out of a string. It takes a string column, a column prefix and a delimiter as an argument.

Usage:

{{ snowplow_utils.get_split_to_array('string_column', 'column_prefix', 'delimiter') }}

Returns:

get_string_agg (source)

This macro takes care of harmonizing cross-db list_agg, string_agg type functions. These are aggregate functions that take all expressions from rows and concatenate them into a single string.

A base column and its prefix have to be provided, the separator is optional (default is ',').

By default ordering is defined by sorting the base column in ascending order. If you wish to order on a different column, the order_by_column and order_by_column_prefix have to be provided. If you wish to order in descending order, then set order_desc to true.

In case the field used for sorting happens to be of numeric value (regardless of whether it is stored as a string or as a numeric type) the sort_numeric parameter should be set to true, which takes care of conversions from sting to numeric if needed.

There is also an optional boolean parameter called is_distinct which, when enabled, takes care of deduping individual elements within the array.

Usage:

{{ snowplow_utils.get_string_agg('base_column', 'column_prefix', ';', 'order_by_col', sort_numeric=true, order_by_column_prefix='order_by_column_prefix', is_distict=True, order_desc=True) }}

Returns:

get_array_size (source)

This macro takes care of harmonizing cross-db get_array type functions. It returns the number of elements in an array.

Usage:

{{ snowplow_utils.get_array_size('array_column') }}

Returns:

unnest (source)

This macro takes care of unnesting of arrays regardles of the data warehouse. An id column and the colum to base the unnesting off of needs to be specified as well as a field alias and the source table. Optionally you can extract the indexed element number (starting from 0) to maintain the original order by setting the 'with_index' variable to true.

Usage:

{{ snowplow_utils.unnest('id_column', 'array_to_be_unnested', 'field_alias', 'source_table', with_index= false) }}

Returns:

Date & time functions

timestamp_diff (source)

This macro mimics the utility of the dbt_utils version however for BigQuery it ensures that the timestamp difference is calculated, similar to the other DB engines which is not the case in the dbt_utils macro. This macro calculates the difference between two dates. Note: The datepart argument is database-specific.

Arguments:

Usage:

{{ snowplow_utils.timestamp_diff('2022-01-10 10:23:02', '2022-01-14 09:40:56', 'day') }}

Returns:

timestamp_add (source)

This macro mimics the utility of the dbt_utils version however for BigQuery it ensures that the timestamp difference is calculated, similar to the other DB engines which is not the case in the dbt_utils macro. This macro adds a date/time interval to the supplied date/timestamp. Note: The datepart argument is database-specific.

Arguments:

Usage:

{{ snowplow_utils.timestamp_add('day', 5, '2022-02-01 10:05:32') }}

Returns:

cast_to_tstamp (source)

This macro casts a column to a timestamp across databases. It is an adaptation of the type_timestamp() macro from dbt-core.

Arguments:

Usage:

{{ snowplow_utils.cast_to_tstamp('events.collector_tstamp') }}

Returns:

to_unixtstamp (source)

This macro casts a column to a unix timestamp across databases.

Arguments:

Usage:

{{ snowplow_utils.to_unixtstamp('events.collector_tstamp') }}

Returns:

current_timestamp_in_utc (source)

This macro returns the current timestamp in UTC.

Usage:

{{ snowplow_utils.current_timestamp_in_utc() }}

Returns: The current timestamp in UTC.

n_timedeltas_ago (source)

This macro takes the current timestamp and subtracts n units, as defined by the timedelta_attribute, from it. This is achieved using the Python datetime module, rather than querying your database.

Arguments:

Usage:

{{ snowplow_utils.n_timedeltas_ago(1, 'weeks') }}

Returns:

By combining this with the get_value_by_target macro, you can dynamically set dates depending on your environment:

# dbt_project.yml
...
vars:
  snowplow_mobile:
    snowplow__start_date: "{{ snowplow_utils.get_value_by_target(
                                      dev_value=snowplow_utils.n_timedeltas_ago(1, 'weeks'),
                                      default_value='2020-01-01',
                                      dev_target_name='dev') }}"

Materializations

Optimized incremental

This package provides an enhanced version of the standard incremental materialization. This builds upon the out-of-the-box incremental materialization provided by dbt, by limiting the length of the table scans on the destination table. This improves both performance and reduces cost. The following methodology is used to calculate the limit of the table scan:

To enable this optimized version you must add snowplow_optimize=true to the config of any model using it, and add the following once to your dbt_project.yml file:

dispatch:
  - macro_namespace: dbt
    search_order: ['snowplow_utils', 'dbt']

This optimization adds an additional predicate, based on the logic above, to the per-warehouse sql generated by dbt, and we use the default incremental strategy for each warehouse.

Because we only overwrite the get_merge_sql/get_delete_insert_merge_sql this means all options and features of the standard incremental materialization are available, including on_schema_change and incremental_predicates.

Each config must contain, in addition to snowplow_optimize, an upsert_date_key and a unique_key. We support Snowflake, BigQuery, Redshift, Postgres, Spark, and Databricks, however some warehouses have some additional config options that we recommend using to get the most out of the optimization.

BigQuery

For BigQuery it is advised (and required for the optimization to work) to add a partition_by to the config.

Usage:

{{
  config(
    materialized='incremental',
    unique_key='page_view_id', # Required: the primary key of your model
    upsert_date_key='start_tstamp', # Required: The date key to be used to calculate the limits
    partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={
      "field": "start_tstamp",
      "data_type": "timestamp",
      "granularity": "day" # Only introduced in dbt v0.19.0+. Defaults to 'day' for dbt v0.18 or earlier
    }) # Adds partitions to destination table.
  )
}}

Snowflake

During testing we found that providing the upsert_date_key as a cluster key results in more effective partition pruning. This does add overhead however as the dataset needs to be sorted before being upserted. In our testing this was a worthwhile trade off, reducing overall costs. Your mileage may vary, as variables like row count can affect this.

Usage:

{{
  config(
    materialized='incremental',
    unique_key='page_view_id', # Required: the primary key of your model
    upsert_date_key='start_tstamp', # Required: The date key to be used to calculate the limits
    cluster_by='to_date(start_tstamp)' # Optional.
  )
}}

Notes

Base Macro

Create Snowplow Quarantined Sessions (source)

This macro generates a table which contains a column named session_identifier, containing all session identifiers of sessions that have been quarantined due to exceeding the maximum session length, to avoid long table scans.

Usage:

{% set quarantined_query = snowplow_utils.base_create_snowplow_quarantined_sessions() %}

{{ quarantined_query }}

Returns:

Create Snowplow Incremental Manifest (source)

This macro generates the incremental manifest table that Snowplow leverages, and does not require any arguments.

Usage:

{% set incremental_manifest_query = snowplow_utils.base_create_snowplow_incremental_manifest() %}

{{ incremental_manifest_query }}

Returns:

Create Snowplow Sessions Lifecycle Manifest (source)

This macro generates the sessions lifecycle manifest table that Snowplow leverages.

Arguments:

Usage:

{% set sessions_lifecycle_manifest_query = snowplow_utils.base_create_snowplow_sessions_lifecycle_manifest(
    var('snowplow__session_identifiers', '[{"schema": "atomic", "field": "domain_sessionid"}]'),
    var('snowplow__session_timestamp', 'collector_tstamp'),
    var('snowplow__user_identifiers', '[{"schema": "atomic", "field": "domain_userid"}]'),
    var('snowplow__quarantined_sessions', 'snowplow_base_quarantined_sessions'),
    var('snowplow__derived_tstamp_partitioned', true),
    var('snowplow__days_late_allowed', 3),
    var('snowplow__max_session_days', 3),
    var('snowplow__app_ids', []),
    var('snowplow__events_schema', 'atomic'),
    var('snowplow__events_table', 'events'),
    var('snowplow__event_limits', 'snowplow_base_new_event_limits'),
    var('snowplow__incremental_manifest', 'snowplow_incremental_manifest'),
    var('snowplow__package_name, 'snowplow')
 ) %}

{{ sessions_lifecycle_manifest_query }}

Returns:

Create Snowplow Sessions This Run (source)

This macro generates the sessions this run table that Snowplow leverages.

Arguments:

Usage:

{% set sessions_query = snowplow_utils.base_create_snowplow_sessions_this_run(
    lifecycle_manifest_table='snowplow_base_sessions_lifecycle_manifest',
    new_event_limits_table='snowplow_base_new_event_limits') %}

{{ sessions_query }}

Returns:

Create Snowplow Events This Run (source)

This macro generates the events this run table that Snowplow leverages.

Arguments:

Usage:

{% set base_events_query = snowplow_utils.base_create_snowplow_events_this_run(
    var('snowplow__base_sessions', 'snowplow_base_sessions_this_run'),
    var('snowplow__session_identifiers', '[{"schema": "atomic", "field": "domain_sessionid"}]'),
    var('snowplow__session_timestamp', 'collector_tstamp'),
    var('snowplow__derived_tstamp_partitioned', true),
    var('snowplow__days_late_allowed', 3),
    var('snowplow__max_session_days', 3),
    var('snowplow__app_ids', []),
    var('snowplow__events_schema', 'atomic'),
    var('snowplow__events_table', 'events')) %}

{{ base_events_query }}

Returns:

Join the Snowplow community

We welcome all ideas, questions and contributions!

For support requests, please use our community support Discourse forum.

If you find a bug, please report an issue on GitHub.

Copyright and license

The snowplow-utils package is Copyright 2021-present Snowplow Analytics Ltd.

This distribution is all licensed under the Snowplow Personal and Academic License . (If you are uncertain how it applies to your use case, check our answers to frequently asked questions.)