dbt-labs / dbt-core

dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.
https://getdbt.com
Apache License 2.0
10.01k stars 1.64k forks source link

[CT-1851] [Feature] Re-render `incremental_predicates` at run-time, similar to hooks #6658

Open rlh1994 opened 1 year ago

rlh1994 commented 1 year ago

Is this your first time submitting a feature request?

Describe the feature

Currrently, as discussed in many other issues (https://github.com/dbt-labs/dbt-core/issues/3680, https://github.com/dbt-labs/dbt-core/issues/4023, https://github.com/dbt-labs/dbt-core/issues/2921, https://github.com/dbt-labs/dbt-core/issues/4692, https://github.com/dbt-labs/dbt-core/issues/2793, https://github.com/dbt-labs/dbt-core/issues/3985), the config state of a model is all rendered at parse time to be able to build dependencies etc, with the exception of hooks which are re-rendered at runtime (if you use the nested curly approach). The new incremental_predicates config option in 1.4 could be really powerful for optimising merges but often this optimisation will require some context of the data that has been run, such as being based off values within the same or other models, but currently it is not possible to use ref functions in the config and have them render correctly.

The main functionality I can see this having is being able to query the source table for something like a date range and use this as a predicate on the merge for the destination table to greatly reduce scanning (currently do this how the custom snowplow materialization works, the hope is with 1.4 that this materialization wouldn't require it anymore). But I am sure there are also other times when having this predicate be able to be generated at run time would be useful.

The main request is for internal predicates to be re-rendered at runtime the same way that hooks currently are.

Describe alternatives you've considered

I have attempted the following, none of which work for various reasons and even if they did they might rely on some side-effect of the way the config uses placeholder values rather than a documented functionality:

1) Try using a ref to another model. Outcome - uses the this placeholder so cannot reference another model 2) Try directly using the DBT_INTERNAL_SOURCE alias in a subquery. Outcome - does not work, the alias can only be used for column references not in subquerys within the on clause 3) Try hardcoding a __dbt_tmp after the ref name in the subquery. Outcome - does not work when the model has any +schema settings, the ref is passed as the default connection schema and datbase, rather than the one set in the config. Also relies on no changes to both __dbt_tmp naming AND what the placeholder on parse returns.

With the current limitation we cannot see a way to replace the optimisations we get from the Snowplow materialization with using the standard incremental with predicates.

An example macro I have tried to use to return the predicate is below, the execute block does run as I can see in the logs but is not actually used, and the non-execute block is used in the merge predicate but does not render with the correct schema (it renders with the target connection schema only).

{% macro get_snowplow_incremental_predicates(source, upsert_date_key, disable_upsert_lookback) -%}
  {{ adapter.dispatch('get_snowplow_incremental_predicates', 'snowplow_utils')(source, upsert_date_key, disable_upsert_lookback) }}
{%- endmacro %}

{% macro default__get_snowplow_incremental_predicates(source, upsert_date_key, disable_upsert_lookback) -%}

{# Get the upsert limits from the changed file - need actual values here not calculated at run time #}
{% if execute %}
    {% set upsert_limits -%}
    {% if disable_upsert_lookback %}
              select
                cast(min({{ upsert_date_key }}) as {{ dbt.type_string() }}) as lower_limit,
                cast(max({{ upsert_date_key }}) as {{ dbt.type_string() }}) as upper_limit
              from {{ source }}
      {% else %}
              select
                cast({{ dateadd('day',
                                    -var("snowplow__upsert_lookback_days", 30),
                                    'min('~upsert_date_key~')') }} as {{ dbt.type_string() }}) as lower_limit,
                    cast(max({{ upsert_date_key }}) as {{ dbt.type_string() }}) as upper_limit
              from {{ source }}
      {% endif %}

    {%- endset %}
    {# {{ snowplow_utils.log_message(upsert_limits) }} #}
    {% set limits = run_query(upsert_limits) %}

  {%- set snowplow_predicate -%}
    {{"DBT_INTERNAL_DEST."~upsert_date_key~" between timestamp '"~limits.rows[0][0]~"' and timestamp '"~limits.rows[0][1]~"'"}}
  {%- endset -%}
  {{ return(snowplow_predicate) }}
  {# {{ snowplow_utils.log_message(snowplow_predicate) }} #}

{% else %}
  {%- set snowplow_predicate -%}
    {{"DBT_INTERNAL_DEST."~upsert_date_key~" between (select min("~upsert_date_key~") from "~source~"__dbt_tmp) and (select max("~upsert_date_key~") from "~source~"__dbt_tmp)"}}
  {%- endset -%}
  {{ snowplow_utils.log_message(snowplow_predicate) }}
  {{ return(snowplow_predicate) }}

{% endif %}

{% endmacro %}

Who will this benefit?

Any users who wish to apply dyanmic predicates to their upserts/merges based on values in their data processed as part of the run.

Are you interested in contributing this feature?

Possibly, if you can point me to where hooks are re-rendered and predicates aren't

Anything else?

https://discourse.getdbt.com/t/issue-passing-a-ref-table-name-to-a-custom-materialization/973/5

https://github.com/dbt-labs/docs.getdbt.com/issues/2492

jtcohen6 commented 1 year ago

@rlh1994 Thanks for opening, and for having done your research :)

As you've noted/linked, this is an area where users have regularly run up against the limitations of resolving (at parse time) configurations that are narrowly targeting a bit of runtime behavior.

The main request is for internal predicates to be re-rendered at runtime the same way that hooks currently are.

I'm not strictly opposed to doing this—and doing the same for other "runtime-only" configs, such as merge_include_columns + merge_exclude_columns—with the caveat that the way we do this for hooks today is pretty gross, and so it's not a pattern I'd be thrilled about extending as-is.

Keep the easy things easy

The pattern you're pursuing here sounds a lot like insert_overwrite with "dynamic" partition replacement on BigQuery (docs). That level of precision is important on databases (such as BigQuery) where every gigabyte counts.

As a general rule, though, the trickiest things about incremental models is debugging too-clever-by-half logic when it goes wrong. That costs a lot in human time & cognition, which is expensive! The simpler, the better.

For now, in the first cut, I expect there's quite a lot of mileage you'd get from incremental_predicates simply by limiting to a static lookback window ("always 30 days"), or the same window that you're using in your is_incremental logic (since {{ this }} should work just fine):

{% set lookback_subquery %}
    select dateadd('day', -30, date_day) from {{ this }}
{% endset %}

{% set lookback_predicate %}
    DBT_INTERNAL_DEST.date_day between ({{ lookback_subquery }}) and current_date
{% endset %}

{{ config(
  materialized = 'incremental',
  unique_key = 'id',
  incremental_predicates = [lookback_predicate]
) }}

-- coordinate with the max lookback window
with upstream_table as (
    select * from {{ ref('upstream_table') }}
    {% if is_incremental() %}
    where date_day > ({{ lookback_subquery }})
    {% endif %}
)

Easy to configure, easy to reason about, and your database should still thank you plenty if date_day is a cluster or sort key. (Though it's never guaranteed with a subquery, so a conservative static value may well be safer.)

Make hard(er) things possible

If you do need to get fancier, and dynamically produce an incremental predicate by first running queries that get min/max from the source table, this is possible (if trickier) with user-space code. You could do this by overriding the get_merge_sql macro to run some extra introspective queries at runtime. For example, incremental_predicates needs to be a list, but I could establish a (totally custom, just-for-me) convention whereby the first element is the string "dynamic_date_range", and the second element is the name of the date column.

{{ config(
  materialized = 'incremental',
  unique_key = 'id',
  incremental_predicates = ["dynamic_date_range", "date_day"]
) }}

select 1 as id, current_date as date_day

Then, I check for my custom condition within get_merge_sql, and recalculate the predicates, before passing all arguments on to the built-in macro. Borrowing from some of the code you've included above:

{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
    {% set predicate_override = "" %}
      {% if incremental_predicates[0] == "dynamic_date_range" %}
        -- run some queries to dynamically determine the min + max of this 'date_column' in the new data
        {% set date_column = incremental_predicates[1] %}
        {% set get_limits_query %}
            select
                min({{ date_column }}) as lower_limit,
                max({{ date_column }}) as upper_limit
            from {{ source }}
        {% endset %}
        {% set limits = run_query(get_limits_query)[0] %}
        {% set lower_limit, upper_limit = limits[0], limits[1] %}
        -- use those calculated min + max values to limit 'target' scan, to only the days with new data
        {% set predicate_override %}
            DBT_INTERNAL_DEST.{{ date_column }} between '{{ lower_limit }}' and '{{ upper_limit }}'
        {% endset %}
    {% endif %}
    {% set predicates = [predicate_override] if predicate_override else incremental_predicates %}
    -- standard merge from here
    {% set merge_sql = dbt.get_merge_sql(target, source, unique_key, dest_columns, predicates) %}
    {{ return(merge_sql) }}

{% endmacro %}

Result:

merge into analytics.dbt_jcohen.my_incremental_model as DBT_INTERNAL_DEST
        using analytics.dbt_jcohen.my_incremental_model__dbt_tmp as DBT_INTERNAL_SOURCE
        on (
            DBT_INTERNAL_DEST.date_day between '2023-01-19' and '2023-01-19'
        ) and (
                DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
            )

    when matched then update set
        "ID" = DBT_INTERNAL_SOURCE."ID","DATE_DAY" = DBT_INTERNAL_SOURCE."DATE_DAY"

    when not matched then insert
        ("ID", "DATE_DAY")
    values
        ("ID", "DATE_DAY")

;
rlh1994 commented 1 year ago

Thanks for the response @jtcohen6!

The pattern you're pursuing here sounds a lot like insert_overwrite with "dynamic" partition replacement on BigQuery (docs). That level of precision is important on databases (such as BigQuery) where every gigabyte counts.

You're right, this is pretty much the core idea behind the Snowplow Incremental Materialization, for example this is the output of a merge on a snowflake warehouse. For massive tables we get a lot of optimization out of this (because the tables are partitioned/clustered/other depending on warehouse), but as I'm sure you'll appreciate keeping a custom materialization up-to-date with 4 adaptors is not an easy task, hence our hope to move back to the standard incremental method.

-- define upsert limits
      set (dbt_partition_lower_limit, dbt_partition_upper_limit) = (
            select
              dateadd(day, -30, min(start_tstamp)) as lower_limit,
              max(start_tstamp) as upper_limit
            from <database>.dbt_ryan_derived.snowplow_web_sessions__dbt_tmp
      );

  -- run the merge statement  
  merge into <database>.dbt_ryan_derived.snowplow_web_sessions as DBT_INTERNAL_DEST
  using (
    select * from <database>.dbt_ryan_derived.snowplow_web_sessions__dbt_tmp
  ) as DBT_INTERNAL_SOURCE
  on 
        DBT_INTERNAL_SOURCE.domain_sessionid = DBT_INTERNAL_DEST.domain_sessionid
    and DBT_INTERNAL_DEST.start_tstamp between $dbt_partition_lower_limit and $dbt_partition_upper_limit 

  when matched then update set
    /* cut for space */
  when not matched then insert
     /* cut for space */
;

  -- Unset variables
  unset (dbt_partition_lower_limit, dbt_partition_upper_limit);

Unfortunately I think the keeping things easy approach is a little too easy for us, our packages are built to encourage multiple smaller runs when backdating a large volume of data, as well as having a not entirely simple logic that defines what time range our processing tables will cover (if you're interested in reading more), which means anything referencing a static window won't work.

(Though it's never guaranteed with a subquery, so a conservative static value may well be safer.)

I've had horrible optimizing sessions before where the solution turned out to be setting a value rather than generating it in a subquery, so I entirely agree with this, hence the preferred option of querying first and getting static values. One of those things that will probably be fine either way but the 1 time it isn't will upset some people...

The second option is definitely interesting - would it work in a package imported into a project? It would mean we only have to maintain support for a single macro (maybe 2 if we added delete+insert as well) instead of 4 entire materializations, and would still allow for additional predicates should anyone want with some tweaks, and finally it would still preserve standard incremental models for anyone using them. I can definitely explore this for now, so long as it works from a package, thanks!

Overall I think it'd still be beneficial to have this runtime-only configs as you mentioned, with whatever pattern works for people, it opens up a lot more flexibility at a higher level for everyone and I could see some really cool stuff coming out of it!

jtcohen6 commented 1 year ago

The second option is definitely interesting - would it work in a package imported into a project?

Check out dispatch for this: https://docs.getdbt.com/reference/dbt-jinja-functions/dispatch#overriding-global-macros

In projects that install snowplow_utils as a package, just add (once):

# dbt_project.yml
dispatch:
  - macro_namespace: dbt
    search_order: ['snowplow_utils', 'dbt']

Now, any time dbt looks for a "global" macro (e.g. get_merge_sql), it will prefer the version defined in the snowplow_utils package.

(The opt-in has to be explicit, and in case of a conflict between multiple packages, you have to specify a search order that yields a deterministic "winnter.")

rlh1994 commented 1 year ago

Great thanks, I'm assuming that if someone installed the snowplow_web package that uses the snowplow_utils package, the dispatch needs to be set at the user project level (i.e. if we set it in snowplow_web it wouldn't work?). That might make it a little tricky to make sure people enable it, but will work for some testing at least!

jtcohen6 commented 1 year ago

the dispatch needs to be set at the user project level

Correct - the dispatch config has to be set in the root-level (user) project. dbt needs an explicit order in which to resolve conflicts, in the event that multiple installed packages all try to override the same "global" macro. (There Can Only Be One!) The good news is, it's only a few lines of copy-paste code.

github-actions[bot] commented 1 year ago

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please comment on the issue or else it will be closed in 7 days.

rlh1994 commented 1 year ago

bdewilde commented 1 year ago

To show support for the original feature request -- and hold the stale label at bay -- I'd like to +1 this! I tried @jtcohen6 's "keep the easy things easy" approach, but {{ this }} did not reflect the model's custom schema; I then tried hard-coding the db.schema.table, but this caused errors in dbt unit ("functional"?) tests, which use a different, test-specific schema; I'm now planning to fall back to a fixed lookback window, though this has its own shortcomings. I think it would be very helpful if we could use the usual jinja magic in incremental predicates, as we do in hooks.

github-actions[bot] commented 9 months ago

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please comment on the issue or else it will be closed in 7 days.

rlh1994 commented 9 months ago

Inigo Montoya 20022024073336

johnny-lt commented 9 months ago

Big +1 to this. I also tried @jtcohen6 's "keep the easy things easy" approach, as it seemed perfect for our use-case, but as @bdewilde mentioned, {{ this }} does not reflect the model's custom schema when used in the predicate.

jpburbank commented 8 months ago

I also need this. There are other cases that need jinja variables expanded in an incremental_predicates string. That all being said, I would be happy to work on this feature request myself, if that is an appropriate thing to do. Any guidance is appreciated. Thanks.