dbt-labs / dbt-bigquery

dbt-bigquery contains all of the code required to make dbt operate on a BigQuery database.
https://github.com/dbt-labs/dbt-bigquery
Apache License 2.0
214 stars 149 forks source link

[ADAP-629] [BQ] Support new partition configs with all incremental strategies #24

Open jtcohen6 opened 3 years ago

jtcohen6 commented 3 years ago

Describe the feature

Picking up from dbt-labs/dbt#2928, which added support for two new configs in dbt-bigquery: require_partition_filter and partition_expiration_days.

Let's ensure that require_partition_filter works with all the permutations of incremental models on BigQuery. Anyone is welcome to pick this up as a contribution for v0.20.0!

The merge strategy

We need the merge condition to be

on 
            DBT_INTERNAL_SOURCE.[unique_key] = DBT_INTERNAL_DEST.[unique_key]
            and DBT_INTERNAL_DEST.[partition_col] is not null

This could be accomplished by passing an additional predicate to get_merge_sql here, something like:

{% is_partition_filter_required = config.get('require_partition_filter', false) %}
{% set predicates = [] %}
{% if is_partition_filter_required %}
  {% set partition_filter %} ({{ partition_by.field }} is not null or {{ partition_by.field }} is null) {% endset %}
  {% do predicates.append(partition_filter) %}
{% endif %}

{% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns, predicates) %}

This is a bit of a hack—filtering only in this sense—but to be honest there isn't any straightforward way dbt can know in advance the specific partitions it's merging into. For that, you should use...

The insert_overwrite strategy

The require_partition_filter config works just fine with "static" insert_overwrite strategy—when the user supplies the values in advance via the partitions config—which is also the most performant for updating very large datasets. (It would still be a good idea to add a test for this.)

For the "dynamic" insert_overwrite strategy, the current error comes in step 2:

https://github.com/fishtown-analytics/dbt/blob/1f927a374c8bd52a12a20d892fed9d59cffd04f4/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql#L63-L68

We need to either:

Describe alternatives you've considered

Related unsolved questions

Who will this benefit?

prratek commented 3 years ago

@jtcohen6 I can take a stab at this!

jtcohen6 commented 3 years ago

Thanks for answering the call, @prratek!

The change in dbt-labs/dbt#3386 looks great at first glance. I'll take a more detailed look tomorrow

joshpeng commented 3 years ago

Hi @prratek and @jtcohen6, thanks so much for working on this. I'm wondering if we could solve this for an even more flexible use case though.

My team is running across a scenario where we want to use an incremental strategy in a hybrid kind of mode. Our dataset is very large, in the magnitude of many TBs per day partition. We also have late arriving data for about 3 days and so need an incremental solution.

The current merge strategy without a partition filter on DBT_INTERNAL_DEST causes the query to hit PBs for the full table making it untenable as an option. If we tried insert_overwrite, the sheer size of replacing 3 full partitions also takes too long because it is processing so many rows. The thing is, the older 2 dates only need a small fraction of their rows processed and so almost 66% of the 3-day range is unnecessary work. The merge strategy is so close to being feasible for us if there was just a way to pass to dbt a partition filter for use with merging on DBT_INTERNAL_DEST. The current proposal here to do a static DBT_INTERNAL_DEST.[partition_col] is not null is a good start, but if we could just tell it to only load 3 days, we would have a great solution. We don't need dbt to auto-guess which partitions. We would be very happy telling it.

As far as how this helps when the query will still pull 3 full days worth of bytes, we are on slot billing so even with the extra bytes pulled, any extra row pruning that can be slapped on will save significant slot time usage. For people on bytes billing, it would still benefit them on query time despite costing the same.

jtcohen6 commented 3 years ago

Hey @joshpeng, thanks for all the context there!

I think https://github.com/fishtown-analytics/dbt/issues/3293 + https://github.com/fishtown-analytics/dbt/pull/3294 may actually be the change you're looking for: the ability to pass custom filters to a predicates config, and have them populate in the merge DML statement, alongside the "built-in" predicates such as unique_key match. Per the proposed syntax, you'd be able to do something like:

{{ config(
    partition_by = {"field": "my_partition_col"},
    incremental_strategy = "merge",
    incremental_predicates = [
        "my_partition_col > dateadd(current_date, interval -3 days)"
      ]
) }}

The goal of this issue, in the meantime, is to make sure that all incremental strategies + require_partition_filter "just work" for users who don't want to have to think about supplying custom predicates/filters.

joshpeng commented 3 years ago

@jtcohen6 Amazing. Love how you're always 20 steps ahead of my asks 😄

thucnc commented 3 years ago

I have a use-case that doesn't fit with any available incremental strategies: I have a table with 3 columns as key, and all of them are string. I can't use merge with 3 columns, and I can't use insert_overwrite because all columns are string type.

Hope the new improvement will help.

meurant-naude commented 3 years ago

I'm trying this new feature with merge strategy and there are a few things I'd like to mention:

{{config (
    ...
    partition_filter_field  = 'PARTITION_DATE',
    partition_filter_source = 'dbt_model_name',
    ...
)}}

MERGE Code changes We need to change the standard dbt MERGE code to make use of the new option available in model’s {{config()}} block. A copy of the standard macro was placed in the macros folder:

The logic flow is basically:

  1. Get Partition the field name from partition_filter_field
  2. Get the view/table name to uses for Partition selection from partition_filter_source
  3. Get a distinct list of values for the Partition field. Using a distinct list means that even if old data gets reprocessed for some reason, no duplicates will occur as all valid Partition field values will be part of the check.
  4. Build Partition WHERE clause. MERGE statements can’t use IN so instead an OR list is generated.
  5. Add Partition WHERE clause to the standard predicate

Before:

merge into `project`.`schema`.`table` as DBT_INTERNAL_DEST
  using (
  SELECT 
  ...
         ) as DBT_INTERNAL_SOURCE
        on DBT_INTERNAL_SOURCE.UNIQUE_KEY = DBT_INTERNAL_DEST.UNIQUE_KEY

After:

merge into `project`.`schema`.`table` as DBT_INTERNAL_DEST
  using (
  SELECT 
  ...
         ) as DBT_INTERNAL_SOURCE
            on DBT_INTERNAL_SOURCE.UNIQUE_KEY = DBT_INTERNAL_DEST.UNIQUE_KEY 
        and ( DBT_INTERNAL_DEST.PARTITION_DATE = "2020-11-07" or 
              DBT_INTERNAL_DEST.PARTITION_DATE = "2020-10-28")

Our change cater for integer partitions as well.

jeremyyeo commented 2 years ago

Had a bit of a struggle when trying to make the following work:

Was getting "ambiguous column names" with dbt run and when placing the compiled sql into the BQ console, I had gotten the "Cannot query over table ... without a filter over column(s) ..." error.


The following worked for me:

(1) Override the bq_generate_incremental_build_sql() macro by adding the following macro to the project:

Click to expand ```jinja -- macros/overrides.sql {% macro bq_generate_incremental_build_sql( strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists ) %} {#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#} {% if strategy == 'insert_overwrite' %} {% set missing_partition_msg -%} The 'insert_overwrite' strategy requires the `partition_by` config. {%- endset %} {% if partition_by is none %} {% do exceptions.raise_compiler_error(missing_partition_msg) %} {% endif %} {% set build_sql = bq_insert_overwrite( tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change ) %} {% else %} {# strategy == 'merge' #} {%- set source_sql -%} {%- if tmp_relation_exists -%} ( select * from {{ tmp_relation }} ) {%- else -%} {#-- wrap sql in parens to make it a subquery --#} ( {{sql}} ) {%- endif -%} {%- endset -%} {% set is_partition_filter_required = config.get('require_partition_filter', False) %} {% set predicates = [] %} {% if is_partition_filter_required %} {% set partition_filter %} (DBT_INTERNAL_DEST.{{ partition_by.field }} is not null or DBT_INTERNAL_SOURCE.{{ partition_by.field }} is null) {% endset %} {% do predicates.append(partition_filter) %} {% endif %} {% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns, predicates) %} {% endif %} {{ return(build_sql) }} {% endmacro %} ```

The only thing I changed from the above suggestions was adding DBT_INTERNAL_DEST, DBT_INTERNAL_SOURCE to the partition_filter jinja var and it looked like there was a missing set in the {% is_partition_filter_required = config.get('require_partition_filter', false) %} line which I added back in.

(2) And then to test the above override out, add a simple incremental model like so:

-- models/my_inc_model.sql
{{ 
  config(
    materialized = 'incremental',
    incremental_strategy = 'merge',
    unique_key = 'user_id',
    pre_hook = "alter table if exists {{ this }} set options (require_partition_filter = false)",
    post_hook = "alter table if exists {{ this }} set options (require_partition_filter = true)",
    partition_by = {
      "field": "updated_at",
      "data_type": "date",
      "granularity": "month"
    },
    require_partition_filter = true
  )
}}

select 1 as user_id, 100 as sales, parse_date('%Y-%m-%d', '2021-01-01') as updated_at
union all 
select 2 as user_id, 200 as sales, parse_date('%Y-%m-%d', '2021-02-01') as updated_at
union all 
select 3 as user_id, 300 as sales, parse_date('%Y-%m-%d', '2021-03-01') as updated_at

(3) Do a dbt run --full-refresh and inspect the table in BigQuery by selecting from it.

(4) Modify the incremental model above to simulate new incoming data like so:

-- models/my_inc_model.sql
{{ 
  config(
    materialized = 'incremental',
    incremental_strategy = 'merge',
    unique_key = 'user_id',
    pre_hook = "alter table if exists {{ this }} set options (require_partition_filter = false)",
    post_hook = "alter table if exists {{ this }} set options (require_partition_filter = true)",
    partition_by = {
      "field": "updated_at",
      "data_type": "date",
      "granularity": "month"
    },
    require_partition_filter = true
  )
}}

select 1 as user_id, 100 as sales, parse_date('%Y-%m-%d', '2021-01-01') as updated_at
union all 
select 2 as user_id, 200 as sales, parse_date('%Y-%m-%d', '2021-02-01') as updated_at
union all 
select 3 as user_id, 400 as sales, parse_date('%Y-%m-%d', '2021-03-01') as updated_at
union all 
select 4 as user_id, 400 as sales, parse_date('%Y-%m-%d', '2021-04-01') as updated_at

(4) Do a dbt run and inspect the table in BigQuery by selecting from it.


Certainly not ideal with the pre/post hooks that seem to duplicate what the require_partion_filter config would have done but could work for someone experiencing the same errors.

github-actions[bot] commented 2 years ago

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days.

jeremyyeo commented 1 year ago

Reopening issue on behalf of an enterprise customer.

github-actions[bot] commented 1 year ago

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days.

patkearns10 commented 1 year ago

Reopening issue on behalf of an enterprise customer.

Fleid commented 1 year ago

Hey @github-christophe-oudar, was that ever on your radar? Just wondering if you're smelling something fishy here - I trust your instincts ;)

github-christophe-oudar commented 1 year ago

@Fleid That's a lot of history to read! I know there are still issues with require_partition_filter and my workaround so far is to use copy_partitions: true to avoid running into the MERGE related issues for insert_overwrite mode. However for merge mode it might be a problem...

To be honest, I think it would be interesting to create test cases in the ITs from dbt-bigquery to see what's failing and move forward with a TDD approach here as there are a lot of possibilities... (and some are thorny, eg I had an issue where it wouldn't work if the input dataset had no data to merge and it failed...)

Fleid commented 1 year ago

I hope we'll be able to carve some time to make incremental processing better in the back half of the year. In the meantime we can keep this one open, and thanks for the workaround!