This dbt package contains Snowflake macros and models that can be (re)used across dbt projects with snowflake as target database.
Add the package into your project
Example : packages.yml
- git: "https://github.com/entechlog/dbt-snow-utils.git"
revision: 0.1.0
- package: entechlog/dbt_snow_utils
version: 0.1.0
✅ Packages can be added to your project using either of above options
✅ Please refer to the release version of this repo/dbt hub for the latest version. The version number mentioned above may not be the updated version number.
Install the package by running below command
dbt deps
Add the following model configuration under the models section of dbt_project.yml. This allows to customize the target database and schema, edit them as needed
models:
dbt_snow_utils:
staging:
database: "DEMO_DB"
schema: staging
marts:
database: "DEMO_DB"
schema: marts
Snowpipe is Snowflake's continuous data ingestion service. Currently there is not a consolidated dashboard in snowflake which shows the summary of Snowpipe.
Copy history in Snowsight gives a dashboard for table level copy history
Table functions INFORMATION_SCHEMA.PIPE_USAGE_HISTORY
and INFORMATION_SCHEMA.COPY_HISTORY
has copy history but its kept retained for 14 days. The table function avoids the 10,000 row limitation of the LOAD_HISTORY View but is also a slow operation. So adjust SQL predicates to filter the data based on your volume
This process materialize data from PIPE_USAGE_HISTORY
and COPY_HISTORY
into a snowflake table. The target tables can be used to visualize the Snowpipe copy history and usage history with the help of dbt macro get_snowpipe_details
and dbt models with tag +tag:snowpipe
Add the following variables under vars section of dbt_project.yml
. This allows to customize the data retrieval filters
vars:
dbt_snow_utils:
pipe_databases: "ALL"
filter_by_date:
pipe_copy_history_filter_key: "hours"
pipe_copy_history_filter_value: -36
pipe_usage_history_filter_key: "day"
pipe_usage_history_filter_value: -2
pipe_databases
(optional): The database name with Snowpipes. Valid values are string “ALL” OR list of databasesfilter_by_date
(optional): The date for filtering data for incremental loads. Should be specified in YYYY-MM-DD
format, if none specified process will use current datepipe_copy_history_filter_key
(optional): The filter key for table function COPY_HISTORY. Some valid values are day
, hour
, minute
, second
etc. See here for list of date and time partspipe_copy_history_filter_value
(optional): The filter value for table function COPY_HISTORY. Should be negative value and relate to the valid values key can acceptpipe_usage_history_filter_key
(optional): The filter key for table function USAGE_HISTORY. Some valid values are day
, hour
, minute
, second
etc. See here for list of date and time partspipe_usage_history_filter_value
(optional): The filter value for table function USAGE_HISTORY. Should be negative value and relate to the valid values key can acceptRun the models using command
dbt run --select +tag:snowpipe --vars '{"filter_by_date": "2022-03-22"}'
OR
dbt run --select +tag:snowpipe --full-refresh
This should create snowpipe__usage_history
and snowpipe__copy_history
which can be integrated with BI tools to build Snowpipe monitoring dashboards.
This process materialize data from QUERY_HISTORY
into a snowflake table.
The role used by dbt should have monitor access so it can fetch query executed by all users.
GRANT MONITOR ON WAREHOUSE <ALL-SNOWFLAKE-WAREHOUSE> to role <DBT-ROLE>;
Add the following variables under vars section of dbt_project.yml
. This allows to customize the data retrieval filters
vars:
dbt_snow_utils:
filter_by_date:
query_history_filter_key: "hours"
query_history_filter_value: -144
query_history_result_limit: 10000
filter_by_date
(optional): The date for filtering data for incremental loads. Should be specified in YYYY-MM-DD
format, if none specified process will use current datequery_history_filter_key
(optional): The filter key for table function QUERY_HISTORY. Some valid values are day
, hour
, minute
, second
etc. See here for list of date and time partsquery_history_filter_value
(optional): The filter value for table function QUERY_HISTORY. Should be negative value and relate to the valid values key can acceptRun the models using command
dbt run --select +tag:snowflake --vars '{"filter_by_date": "2022-03-30"}'
OR
dbt run --select +tag:snowflake --full-refresh
This should create snowflake__query_history
which can be integrated with BI tools to build Snowflake monitoring dashboards.
This macro clones the source schema/schemas into the destination database.
source_schema
(required): The source schema namedestination_postfix
(required): The destination schema name postfixsource_database
(optional): The source database namedestination_database
(optional): The destination database namedbt run-operation dbt_snow_utils.clone_schemas --args "{'source_database': 'demo_db', 'source_schemas': ['dim', 'fact', 'utils'], 'destination_database': 'demo_db', 'destination_postfix': '_20220323_01'}"
pre_hook="{{ dbt_snow_utils.clone_schemas(['dim', 'fact', 'utils'], '_backup', 'demo_db', this.database) }}"
This macro clones the source table into the destination database/schema.
source_table
(required): The source table namedestination_table
(required): The destination table namesource_database
(optional): The source database namesource_schema
(optional): The source schema namedestination_database
(optional): The destination database namedestination_schema
(optional): The destination schema namedbt run-operation clone_table --args '{"source_table": "COUNTRY_CODE", "destination_table": "COUNTRY_CODE_BKP"}'
post_hook="{{ dbt_snow_utils.clone_table(this.identifier,this.identifier~'_temp', this.database, this.schema, this.database, this.schema ) }}"
This macro clones all the tables from source database/schema into the destination database/schema. This also provides an option to truncate the tables after cloning if you just need the table structure and not data.
source_schemas
(required): The list of source schema namessource_database
(optional): The source database namedestination_database
(optional): The destination database nametruncate_table_flag
(optional): Flag to truncate data after copy, When enabled only table structure is copied and not datadbt run-operation clone_tables --args "{'source_database': 'DEV_ENTECHLOG_DW_DB', 'source_schemas': ['dim', 'fact', 'utils'], 'destination_database': 'DEV_ENTECHLOG_DEMO_DB', 'truncate_table_flag': 'True'}"
pre_hook="{{ dbt_snow_utils.clone_tables(['dim', 'fact', 'utils'], 'DEV_ENTECHLOG_DW_DB', 'DEV_ENTECHLOG_DEMO_DB', 'True') }}"
This macro deletes data from a table based on a where clause. Often used as pre-hook in incremental loads to delete the data.
del_key
(required): The column name in WHERE clause of deletesdel_value
(required): The value for column name in WHERE clause of deletesdatabase
(optional): The database nameschema
(optional): The schema nametable
(optional): The table namedbt run-operation delete_records_by_column --args '{"del_key": "payment_date", "del_value": "2005-05-25", "database": "DBT_DEMO", "schema": "MARTS", "table": "tmp_store_revenue"}'
post_hook="{{ dbt_snow_utils.delete_records_by_column('payment_date', '2005-05-24') }}"
post_hook="{{ dbt_snow_utils.delete_records_by_column('payment_date', var('start_date')) }}"
This macro deletes data from a table based on a where clause. Often used as pre-hook in incremental loads to delete the data.
databases_list
(optional): A list of databases (and optionally schemas in the format database.schema) to search for orphaned tables/views. Defaults to the target database defined in your dbt profile.dry_run
(optional): If set to True, the macro will log the tables/views that would be deleted without actually performing the deletion. Defaults to False.To perform a dry run (log but do not delete) on specific databases:
dbt run-operation delete_orphaned_tables --args "{databases_list: ['DATABASE1', 'DATABASE2'], dry_run: True}"
To delete orphaned tables/views in the default target database:
dbt run-operation delete_orphaned_tables --args "{dry_run: False}"
To delete orphaned tables/views in specific databases (e.g., 'DATABASE1' and 'DATABASE2'):
dbt run-operation delete_orphaned_tables --args "{databases_list: ['DATABASE1', 'DATABASE2'], dry_run: False}"
To delete orphaned tables/views in specific databases/schemas (e.g., 'DATABASE1' and 'DATABASE2.SCHEMA1'):
dbt run-operation delete_orphaned_tables --args "{databases_list: ['DATABASE1', 'DATABASE2.SCHEMA1'], dry_run: False}"
This materialization strategy loads data from a configured external stage (cloud storage location) directly into a Snowflake table using the COPY INTO command or an INSERT statement.
stage_name
(optional): The name of the Snowflake stage object to be used or created. Defaults to the model's identifier with _stage postfix.url
(required): The URL of the external stage (cloud storage path).file_format
(optional): The file format option for loading data. Defaults to (type = PARQUET) if not specified.mode
(optional): The loading mode, either COPY or INSERT. Defaults to COPY.pattern
(optional): A regex pattern to match files in the external stage.Configure this materialization in your model's configuration block in dbt_project.yml or within the model SQL file itself using {{ config(materialized = 'stage2table', ...) }}.
{{
config(
materialized="stage2table",
url="s3://" ~ var("s3_bucket") ~ "/cricsheet/all_player_data/",
file_format="(TYPE = CSV SKIP_HEADER = 1 TRIM_SPACE = TRUE ESCAPE_UNENCLOSED_FIELD = NONE)",
mode="INSERT",
tags=["source", "cricsheet"],
pre_hook="{{ delete_data('FILE_LAST_MODIFIED_DT', var('batch_cycle_date'), this) }}",
)
}}
Contributions to this package are welcomed. Please create issues for bugs or feature requests for enhancement ideas or PRs for any enhancement contributions.