tnightengale / dbt-activity-schema

A dbt-Core package for generating models from an activity stream.
GNU General Public License v3.0
38 stars 5 forks source link

Add Snowflake-specific implementation of `min_or_max` #32

Open wylbee opened 1 year ago

wylbee commented 1 year ago

Per #26, the generic implementation of the _min_or_max macro does not work correctly on Snowflake, with the root cause being dbt's safe_cast using a function in Snowflake with limitations that make it unsuitable for this purpose.

Given that this macro is intended to create the equivalent of Snowflake's min_by/max_by functions, the work of this PR is to:

Test 1- Integration Suite

image

Test 2- Prod project

dbt_project.yml vars

  dbt_activity_schema:
      included_columns:
        - activity_id
        - customer
        - ts
        - activity
        - anonymous_customer_id
        - feature_json 
        - link
        - revenue_impact
        - activity_occurrence
        - activity_repeated_at

query

with

    stream_query as (
        {{
            dbt_activity_schema.dataset(
                ref("account_stream"),
                dbt_activity_schema.activity(
                    dbt_activity_schema.all_ever(), "originates_loan"
                ),
                [
                    dbt_activity_schema.activity(
                        dbt_activity_schema.last_before(), "updates_autopay"
                    )
                ],
            )
        }}

    ),

    final as (

        select * from stream_query

    )

select *
from final

compiled query


        create or replace transient table analytics.dev_wbrown.ee_loan_at_origination  as
        (with

    stream_query as (

with

filter_activity_stream_using_primary_activity as (
    select

        stream.activity_id,

        stream.customer,

        stream.ts,

        stream.activity,

        stream.anonymous_customer_id,

        stream.feature_json,

        stream.link,

        stream.revenue_impact,

        stream.activity_occurrence,

        stream.activity_repeated_at

    from analytics.dev_wbrown.account_stream as stream

    where stream.activity = 'originates_loan'
        and 
(true)

),

append_and_aggregate__1__last_before
 as (
    select

        -- Primary Activity Columns

        stream.activity_id,

        stream.customer,

        stream.ts,

        stream.activity,

        stream.anonymous_customer_id,

        stream.feature_json,

        stream.link,

        stream.revenue_impact,

        stream.activity_occurrence,

        stream.activity_repeated_at,

        max_by(
            appended.activity_id
            , appended.ts)
 as 
last_before_updates_autopay_activity_id
            ,

        max_by(
            appended.customer
            , appended.ts)
 as 
last_before_updates_autopay_customer
            ,

        max_by(
            appended.ts
            , appended.ts)
 as 
last_before_updates_autopay_ts
            ,

        max_by(
            appended.activity
            , appended.ts)
 as 
last_before_updates_autopay_activity
            ,

        max_by(
            appended.anonymous_customer_id
            , appended.ts)
 as 
last_before_updates_autopay_anonymous_customer_id
            ,

        max_by(
            appended.feature_json
            , appended.ts)
 as 
last_before_updates_autopay_feature_json
            ,

        max_by(
            appended.link
            , appended.ts)
 as 
last_before_updates_autopay_link
            ,

        max_by(
            appended.revenue_impact
            , appended.ts)
 as 
last_before_updates_autopay_revenue_impact
            ,

        max_by(
            appended.activity_occurrence
            , appended.ts)
 as 
last_before_updates_autopay_activity_occurrence
            ,

        max_by(
            appended.activity_repeated_at
            , appended.ts)
 as 
last_before_updates_autopay_activity_repeated_at

    from filter_activity_stream_using_primary_activity as stream

    left join analytics.dev_wbrown.account_stream as appended
        on (
            -- Join on Customer UUID Column
            appended.customer = stream.customer

            -- Join the Correct Activity
            and appended.activity = 'updates_autopay'

            -- Relationship Specific Join Conditions
            and (

(
    appended.ts <= coalesce(stream.ts, '1900-01-01'::timestamp)
)

            )
            -- Additional Join Condition
            and ( true )
        )

    group by

        stream.activity_id,

        stream.customer,

        stream.ts,

        stream.activity,

        stream.anonymous_customer_id,

        stream.feature_json,

        stream.link,

        stream.revenue_impact,

        stream.activity_occurrence,

        stream.activity_repeated_at

),

rejoin_aggregated_activities as (
    select

        stream.activity_id,

        stream.customer,

        stream.ts,

        stream.activity,

        stream.anonymous_customer_id,

        stream.feature_json,

        stream.link,

        stream.revenue_impact,

        stream.activity_occurrence,

        stream.activity_repeated_at,

append_and_aggregate__1__last_before
.
last_before_updates_autopay_activity_id,

append_and_aggregate__1__last_before
.
last_before_updates_autopay_customer,

append_and_aggregate__1__last_before
.
last_before_updates_autopay_ts,

append_and_aggregate__1__last_before
.
last_before_updates_autopay_activity,

append_and_aggregate__1__last_before
.
last_before_updates_autopay_anonymous_customer_id,

append_and_aggregate__1__last_before
.
last_before_updates_autopay_feature_json,

append_and_aggregate__1__last_before
.
last_before_updates_autopay_link,

append_and_aggregate__1__last_before
.
last_before_updates_autopay_revenue_impact,

append_and_aggregate__1__last_before
.
last_before_updates_autopay_activity_occurrence,

append_and_aggregate__1__last_before
.
last_before_updates_autopay_activity_repeated_at

    from filter_activity_stream_using_primary_activity as stream

    left join 
append_and_aggregate__1__last_before

        on 
append_and_aggregate__1__last_before
.activity_id = stream.activity_id

)

select * from rejoin_aggregated_activities

    ),

    final as (

        select * from stream_query

    )

select *
from final
        );

image

bcodell commented 1 year ago

Hey @brown5628 - thanks for opening this! For sake of auditing, would you mind installing your fork in the Snowflake-based project you have and running a query that uses the dataset macro to validate that the query compiles and runs successfully, then pasting a screenshot of the dbt CLI logs or pasting the compiled query here for reference? Sorry for the tedious ask, but since the project's CI pipeline only runs on duckdb and this PR is a Snowflake-specific fix, it'd be nice to have evidence of the code change working as expected.

wylbee commented 1 year ago

@bcodell Should be ready for review. Thanks for your patience on this one with the slow turnaround time. Quick callouts:

Two questions from me:

  1. Does the project have a style guide? belated realizing that my IDE reformatted everything using sqlfmt. Happy to turn that off and restate a cleaner diff if that is preferred, just let me know.
  2. Is there utility in maintaining a parallel set of integration tests for non-DuckDB connectors, even if they are not being run automatically? If so I can open a new PR with a cleaned-up version of what I did to get the tests running on Snowflake that can sit in a separate folder from the integration tests. Changes needed were:
    • In the models, change every instance of json_extract({{ dbt_activity_schema.primary() }}.feature_json, 'type') = json_extract({{ dbt_activity_schema.appended() }}.feature_json, 'type') to parse_json({{ dbt_activity_schema.primary() }}.feature_json):"type"= parse_json({{ dbt_activity_schema.appended() }}.feature_json):"type"
    • Point to the Snowflake profile
    • Remove the model configs in the dbt_project.yml