dbt-labs / dbt-bigquery

dbt-bigquery contains all of the code required to make dbt operate on a BigQuery database.
https://github.com/dbt-labs/dbt-bigquery
Apache License 2.0
210 stars 148 forks source link

[CT-2051] [Bug] New insert_overwrite Bigquery partitioning with integer keys can create huge temporary array variables, exceeding BQ limits #16

Open haukeduden opened 4 years ago

haukeduden commented 4 years ago

Describe the bug

When doing an incremental update of an integer-partitioned bigquery table with the new insert_overwrite merge strategy then DBT calculates which partitions it should replace. In the process it can generate a huge BigQuery ARRAY value.

The cause is that DBT does not take the "interval" parameter of the partitioning specification into account. The generated SQL code selects "array_agg(distinct PARTITION_FIELD)" when calculating the partitions to replace. This selects ALL distinct partition field values of the incremental update, even if these values are actually in the same partition. This causes a potentially huge array to be created. If there is enough data in the table then this will even cause an error because BQ's maximum array size (100 MB) is exceeded.

Note that this bug is not triggered by time partitioned tables because for those all partition fields are dates and the partition size is always one day (i.e. there is only one valid value per partition).

Steps To Reproduce

1) Generate a model with:

config(
    materialized = 'incremental',
    unique_key = 'partkey',

    partition_by = {
        "field": "partkey",
        "data_type": "int64",

        "range": {
            "start": 0,
            "end": 100000,
            "interval": 1000
        }
    },
   incremental_strategy = 'insert_overwrite'
  )

2) Run the model once with empty source data

3) Add 1000 records in the source data with values for partkey from 0 to 999

4) Generate the incremental update code for the model and look at the values in the dbt_partitions_for_replacement internal variable.

5) You will see that it contains all 1000 values for partkey from 0 to 999, even though those are all inside the same single partition.

Expected behavior

DBT should ensure that no huge temporary array variable is generated. The dbt_partitions_for_replacement array should have at most as many elements as the number of partitions being updated. In my opinion the way to go would be to store only the starting values of each partition in the array and then modify the merge clause to use a range for each partition.

Screenshots and log output

N/A

System information

Which database are you using dbt with?

The output of dbt --version:

installed version: 0.16.0
   latest version: 0.16.0

Up to date!

The operating system you're using:

macOS

The output of python --version: Python 3.7.4

Additional context

jtcohen6 commented 4 years ago

Thanks for laying this out @haukeduden! This issue is at the intersection of two dbt+BigQuery features that are net-new in 0.16.0: support for integer-range partitioning, and dynamic partition overwrite in incremental models. It's not surprising to hear that there's some roughness around the edges between them.

What do you think makes most sense for dbt's behavior here? It sounds from your description like, if the partition is an integer range, instead of the materialization script running

select as struct
    array_agg(distinct PARTITION_FIELD)
from TEMP_TABLE

you'd prefer that dbt run something like

with range as (
    select generate_array(PARTITION_START, PARTITION_END, PARTITION_INTERVAL) as range_array
),

values_to_replace as (
    select distinct PARTITION_FIELD as int_value from TEMP TABLE
),

joined as (
    select *,
        range_bucket(values_to_replace.int_value, range.range_array) as range_index
    from range
    cross join int_values
)

select as struct array_agg(distinct range_array[offset(range_index)]) as partitions_for_replacement
from joined
where range_index between 0 and (array_length(range_array) - 1)

One conceptual hurdle is that the insert_overwrite incremental strategy drops and replaces entire partitions of data. It's up to the user to ensure that they're selecting all requisite data for that integer subset. Does that track with how you've been using the incremental strategy thus far?

haukeduden commented 4 years ago

Hello @jtcohen6. Could you not simply add a filter for the field values to calculate a representative value for the partition (e.g. the first value of the partition)? This would match what is done for timestamp values with time partitioned tables, which are sent through the 'date' filter. I am thinking of something like this:

set (dbt_partitions_for_replacement) = ( select as struct array_agg(distinct get_partition(PARTFIELD)) from TABLE );

The get_partition macro would simply calculate this:

(PARTFIELD - MOD( PARTFIELD, PARTITION_INTERVAL)

And then in the merge clause you could write:

.... when not matched by source and get_partition(DBT_INTERNAL_DEST.PARTFIELD) in unnest(dbt_partitions_for_replacement) then delete

Wouldn't that work? The only necessary modification would be to send the integer values through the get_partition filter macro, otherwise the code could stay exactly the same.

haukeduden commented 4 years ago

@jtcohen6 I just realized that I am have only answered half your question ;). Yes, that is how I had understood the partitioning to work and yes, I knew that I would have to select full partitions of source data.

By the way: I believe that for many integer-partitioned tables a simpler merge strategy would also be sufficient. I think a common use case is a table with an auto-increment integer id field and rows themselves being immutable / never updated. At least that is the case for the table I have been using to try out insert_overwrite.

For these cases, a simple "append" strategy would be sufficient. I.e. a strategy where one asserts that the new data and the existing data have no overlap.

The merge statement would become much simpler, as the entire "when not matched by source" block could be left out. It would also be easier to select the source data, since one could simply select everything above the current max value, without worrying about partition sizes and modulos.

judahrand commented 2 years ago

Hello @jtcohen6. Could you not simply add a filter for the field values to calculate a representative value for the partition (e.g. the first value of the partition)? This would match what is done for timestamp values with time partitioned tables, which are sent through the 'date' filter. I am thinking of something like this:

set (dbt_partitions_for_replacement) = ( select as struct array_agg(distinct get_partition(PARTFIELD)) from TABLE );

The get_partition macro would simply calculate this:

(PARTFIELD - MOD( PARTFIELD, PARTITION_INTERVAL)

And then in the merge clause you could write:

.... when not matched by source and get_partition(DBT_INTERNAL_DEST.PARTFIELD) in unnest(dbt_partitions_for_replacement) then delete

Wouldn't that work? The only necessary modification would be to send the integer values through the get_partition filter macro, otherwise the code could stay exactly the same.

@haukeduden I'm not sure the approach suggested here would work as functions of the partition key don't seem to be supported: https://cloud.google.com/bigquery/docs/querying-partitioned-tables#query_an_integer-range_partitioned_table

I'm trying to solve a related problem and struggling! Any thoughts on this problem would be greatly appreciated.

I'm thinking liberal use of dynamic SQL may be required. This would allow you to create the t.PARTFIELD BETWEEN x_1 AND y_1 AND ... t.PARTFIELD BETWEEN x_n AND y_n dynamically and embed it in the MERGE.

martinburch commented 2 years ago

Hi, I think I'm also affected by this problem. When I set incremental_strategy to insert_overwrite on my integer-partitioned model, I get this error: Script expression exceeded evaluation limit of 1048576 bytes but when I use the default strategy of merge, the run completes without errors.

github-actions[bot] commented 1 year ago

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days.

judahrand commented 1 year ago

I think this is still an issue?

haukeduden commented 1 year ago

Yes, this is not solved yet. If a proper solution is too difficult then I believe DBT should simple yield an error and declare insert_overwrite incompatible with integer partitioned tables. At least then one does not risk running into huge BigQuery costs or waste time with cryptic error messages.

elyobo commented 1 year ago

Just ran into this myself. My use case is not append only, it's a series of updates to entities that have a unique integer ID and efficiently updating just the current state for them.

It seems like the (very similar) approach by @jtcohen6 in https://github.com/dbt-labs/dbt-core/pull/1971 that was ultimately abandoned in favour of https://github.com/dbt-labs/dbt-core/pull/2140 would still be viable here - a merge that uses statically calculated min and max values on the partition field in the MERGE ON condition would still potentially offer substantial improvements, depending on how often the older IDs get updated.

For some of my data it's almost append only and so the benefits would be large (they'd be small ranges almost all the time), for some less so.

Perhaps https://github.com/dbt-labs/dbt-core/pull/1971 is worth resurrecting as something that would at least offer a viable path?

Also trying to poke around at the old code and figure out how to define my own materialisation to make this happen for myself at least :sweat:

elyobo commented 1 year ago

I haven't opened a PR over here because I really have very little idea what I'm doing, but it looks like the insert_overwrite implementation could be tweaked a little to let it to the #1971 behaviour as well - https://github.com/raywhite/dbt-bigquery/pull/1. Feels slightly wrong to be tweaking the insert_overwrite strategy to optionally no longer be an insert overwrite strategy, but it's much closer to what we want already than the merge strategy.

elyobo commented 1 year ago

Here's a version that you can drop in and enable on a model by defining match_strategy to anything except exact in the config block. First time I've dug into doing anything like this so feedback very welcome.

{# Note that this is implemented in the partition_by object in recent dbt-bigquery #}
{% macro _render_wrapped(partition_by, alias=none) -%}
  {%- if partition_by.data_type == 'int64' -%}
    {{ partition_by.render(alias) }}
  {%- else -%}
    {{ partition_by.data_type }}({{ partition_by.render(alias) }})
  {%- endif -%}
{%- endmacro %}

{% macro bigquery__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %}
  {% if config.get('match_strategy', 'exact') == 'exact' %}
    {# Standard merge strategy #}
    {{ default__get_merge_sql(target, source, unique_key, dest_columns, predicates) }}
  {% else %}
    {# Use predicates merge strategy #}
    {%- set tmp_relation = make_temp_relation(this) %}
    {%- set raw_partition_by = config.get('partition_by', none) -%}
    {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
    {%- set tmp_relation_exists = tmp_relation.render() in source -%}
    {%- set predicates = [] if predicates is none else predicates -%}

    declare _dbt_min_partition_value {{ partition_by.data_type }};
    declare _dbt_max_partition_value {{ partition_by.data_type }};

    {# have we already created the temp table to check for schema changes? #}
    {% if not tmp_relation_exists %}
      {{ declare_dbt_max_partition(this, partition_by, sql) }}

     -- 1. create a temp table with model data
      {# TODO support ingestion time partitioning but IDK how to call BQ adapter bq_create_table_as #}
      {# bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql, 'sql') S#}
      {{ bigquery__create_table_as(True, tmp_relation, sql) }}

      {# Disable the sql header, as if there was one it was output to make the temp table; DOES NOT WORK. #}
      {% call set_sql_header(config) %}
        -- header already rendered
      {% endcall %}

      {%- set source -%}
        (
        select
        {% if partition_by.time_ingestion_partitioning -%}
        _PARTITIONTIME,
        {%- endif -%}
        * from {{ tmp_relation }}
        )
      {% endset %}

    {% else %}
      -- 1. temp table already exists, we used it to check for schema changes
    {% endif %}

    -- 2. define partition ranges to update
    set (_dbt_min_partition_value, _dbt_max_partition_value) = (
        select as struct
            min({{ _render_wrapped(partition_by) }}),
            max({{ _render_wrapped(partition_by) }})
        from {{ tmp_relation }}
    );

    -- 3. run the merge statement
    {% set predicate -%}
      {{ _render_wrapped(partition_by, 'DBT_INTERNAL_DEST') }}
        between _dbt_min_partition_value and _dbt_max_partition_value
    {%- endset %}
    {% do predicates.append(predicate) %}
    {{ default__get_merge_sql(target, source, unique_key, dest_columns, predicates) }};

    -- 4. clean up the temp table
    drop table if exists {{ tmp_relation }};
  {% endif %}

{% endmacro %}

:rotating_light: one issue is that sql headers are output twice if building the temp table and default__get_merge_sql doesn't have a way to turn it off and I can't seem to ovewrite it by setting it again before calling default__get_merge_sql.

elyobo commented 1 year ago

Anecdata only, but this approach reduces one incremental build of mine on a sample run from 3.4TB to 431.4GB, huge win.

Fleid commented 1 year ago

Re-opening that one following @elyobo's PR (#371).

Trying to sum-up the entire situation before diving in:

Those are the options currently available to one exploring that space.

Here we are talking about an optimization of the specific case of insert_overwrite + integer range + dynamic, regardless of copy_partitions.

Currently when generating the list of partitions to process, dbt generates the distinct list of values. Optimally, what we would need instead is the distinct list of partitions (see above for why). This is what @jtcohen6 mentioned up above almost 3 years ago (omg). I'm not sure why we abandoned that plan?

In the meantime, @elyobo you offer instead to pick the min and max values from that list, and process every partitions in between - even if they don't need to be. Not as optimal as targeting specific partitions, but better than the current implementation in some scenarios. Now we would need a knob to turn that on/off since the behavior could makes things worst in some situations.

Did you explore the original idea of generating the distinct list of partitions? I'd rather we make the thing better all the time, and not have to surface another setting to a situation already quite complex in my opinion.

@github-christophe-oudar I know you're knowledgeable about that area. Any wisdom to share? ;)

elyobo commented 1 year ago

Thanks @Fleid, that sounds like a pretty accurate summary.

I'm not sure why we abandoned that plan?

The approach that was chosen instead seems to be better where it's viable - the abandoned approach in https://github.com/dbt-labs/dbt-core/pull/1971 just used min and max like I brought back. I don't think that the corner case here (the IDs being so many that storing them exceeds the available memory limit in BQ scripting) was considered - I don't recall seeing it anywhere.

Now we would need a knob to turn that on/off since the behavior could makes things worst in some situations.

The food-for-thought-probably-throw-away implementation in that PR includes such a toggle.

Did you explore the original idea of generating the distinct list of partitions?

No, I didn't - honestly I didn't even think of it, just brought back the min/max as a simple solution that has (through the workaround in https://github.com/dbt-labs/dbt-bigquery/issues/16#issuecomment-1296028981) been a decent money saver for us. I guess there's somewhere in the BQ information schema that exposes this information and we could use that to calculate the affected partitions from the temp table? If we had e.g. value 1 in an integer partitioned field, and we could calculate that fell into the partition from 1 to 1000, we could translate that to a BETWEEN 1 AND 1000 clause, and repeat that for all unique partitions and OR them all together.

Fleid commented 1 year ago

Ok so we are solving for 2 things here:

  1. Overall performance, getting the exact list of partitions to be processed seems like a no brainer, but it's already the case when we use a list of values instead. The engine will sort this out for us. But sometimes that list is too long...
  2. ... so we need to get around the limitation of that list size, which is encountered for the list of values, but could as well be encountered for a list of partitions. Here an alternative solution is to get an interval instead (min to max for integer based), trading overall performance for getting the job done instead of crashing

I buy that!

What I would like then, is an additional flag in the partition_by bag (like done for copy_partitions in 1.4) that signals the difference in behavior (default is current behavior) - below with replace_partitions:

{{ config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    partition_by={
      "field": "user_id",
      "data_type": "int64",
      "range": {
        "start": 0,
        "end": 100,
        "interval": 10
      },
      "replace_partitions": "min_max"
    }
)}}

select
  user_id,
  event_name,
  created_at

from {{ ref('events') }}

I'm making it a enumeration rather than a boolean, as I'm expecting more strategies to appear (current would be list_values, an alternative could be distinct_partitions).

If that sounds not too far off, the thing still an unknown for me if it should be restricted to integer range partition table only, or if it's applicable to the time based partitioning schemes too. My guess would be that it does - and from what I see from the PR, you do too.

Also what we would need is to check that replace_partitions is insert_overwrite only. Something similar to this.

Fleid commented 1 year ago

Not that this is absolutely not a todo list for you, but a design exercise with you on what it should look like in terms of ergonomics ;) Once we know that, you can dig in, or we can take that on (not sure when though...).

elyobo commented 1 year ago

That sounds pretty good; enum instead of boolean is sensible.

The min/max approach would also work with date partitioning but is unlikely to be a good choice - sequential integer IDs can easily exceed the BQ scripting memory limits, but the number of unique dates in a set of data are likely to be far smaller and fit within the limits. I wouldn't prevent such a choice but would definitely not default to it and would mention this in the docs.

My head is totally out of this space at the moment so will take me some time to dig back in, but would be happy to give it a go once I get some time to wrap my head around it again.

haukeduden commented 1 year ago

Just a side note: BigQuery has a maximum limit of 10.000 partitions. So I do not think that a partition list could cause a crash in the BigQuery case (don't know about other databases, though). Also, in my experience you normally just replace a few chosen partitions with insert_overwrite, so the list is likely very short.

Having said that: I like the min/max idea better. It is more generic and should work under all circumstances, even edge cases, and with all backends. But if that does not work out for some reason, then I think listing the partitions would also be fine.

elyobo commented 1 year ago

I think the limit is 4k and yeah, it should be fine. I think there's some work involved to convert the set of integer values into a set of parameters to effectively prune the partitions - each partition has many integers in range, so we need to do something like

The set of mins and maxes would therefore be two integers per partition, 8k max values, which still seems fine - just harder to figure out how to do this (and I can't see where to get the relevant partitioning parameters).

Fleid commented 1 year ago

@elyobo should I close the PR you have open for now, since we're switching strategy? Or do you want me to leave it open for now?

elyobo commented 1 year ago

@Fleid happy to close it and have done so.

tmatalla-wagner commented 1 year ago

I recently refactored many incremental models I maintain to make use of the insert_overwrite strategy to reduce query cost and this appeared to work out pretty well. However, when dealing with integer partitioned tables I ran into similar (or identical) limitations discussed in this issue. The discussed "min_max" strategy deviates from my approach so I would like to add something to the discussion.

I startet by looking at the compiled SQL and found:

      -- 2. define partitions to update
      set (dbt_partitions_for_replacement) = (
          select as struct
              array_agg(distinct **paritioned_by_column**)
          from `**table**__dbt_tmp`
      );

with

    when not matched by source
         and DBT_INTERNAL_DEST.**paritioned_by_column** in unnest(dbt_partitions_for_replacement) 
        then delete

As already pointed out in this issue the array in dbt_partitions_for_replacement does not refer directly to the partitions of the table but all elements in that array are within the partitions that are targeted for an update. Something I found interesting is that the query cost is as large as if the table would have no partitioning at all if the array becomes too long. I assume that the query optimizer of BigQuery prefers speed over cost at a certain point. Anyway, I changed to code to the following to achieve the cost-reduction while keeping the functionality I expected from the insert_overwrite method:

      -- 2. define partitions to update
      set (dbt_partitions_for_replacement) = (
          select as struct
              array_agg(distinct (**paritioned_by_column** - mod(**paritioned_by_column**, **parition_interval**) ) )
          from `**table**__dbt_tmp`
      );

and


    when not matched by source
         and ( false
         or DBT_INTERNAL_DEST.**paritioned_by_column** between dbt_partitions_for_replacement[offset(0)] and (dbt_partitions_for_replacement[offset(0)] + **parition_interval** - 1)
         or DBT_INTERNAL_DEST.**paritioned_by_column** between dbt_partitions_for_replacement[offset(1)] and (dbt_partitions_for_replacement[offset(1)] + **parition_interval** - 1)
         )
        then delete

Note that I had two partitions to replace in my example.

This code drops the entire partition and writes it again, which is the behavior I assumed from reading the documentation, i.e. the line '[...] because it operates on partitions of data rather than individual rows. [...]'. Using a replace_partitions flag sounds fine to me as well.

After proposing my view on the topic: @elyobo, @Fleid, is someone working on that issue at the moment? If a change is in the pipeline I would lean back and wait for an update. If not, I could try to follow the process to contribute to dbt myself.

PS: dbt is a great piece of software :-)

Fleid commented 1 year ago

@dbeatty10 is this something on your radar for incremental materialization? Yes there is a little bit of history to catch-up in this thread... ;)

elyobo commented 1 year ago

I hadn't noticed that query cost increase @tmatalla-wagner, what sort of size arrays are you dealing with to hit that? Or perhaps I did, it was just still drastically cheaper than it had been anyway so it seemed like a win :sweat_smile:

This isn't something I've had time to look at and it would take me a fair bit of work just to understand enough to get started tbh, I don't know the internals of dbt well at all and even figuring out :point_up: that hacky stuff above took a loooong time :D

tmatalla-wagner commented 1 year ago

@elyobo: the array length of dbt_partitions_for_replacement was around 30k.

@Fleid, @dbeatty10: I will wait for your response whether the dbt core team is working on that issue. If not, I would try to open a PR addressing the issue while (hopefully) following your desired process for contributing :-)

dbeatty10 commented 1 year ago

@tmatalla-wagner we don't have any relevant changes in the pipeline, and we'd welcome your contributions towards this effort!

Our contributing guide has some helpful information about the process of opening a PR and getting it reviewed. Please reach out if you need any help!

carlos-raylo commented 1 month ago

Hi all, just wanted to see if there had been any progress on this issue? Running into it myself now, and would be really useful to know what the current best strategy to overcome this is. Thanks!

tmatalla-wagner commented 1 month ago

Hi all, just wanted to see if there had been any progress on this issue? Running into it myself now, and would be really useful to know what the current best strategy to overcome this is. Thanks!

Hi @carlos-raylo, unfortunately I haven't worked on a solution within dbt. A simple workaround is to setup your incremental model in a way that the partition_by column itself contains the transformation as suggested by me earlier (link).

This would look somewhat like this:

{% set _stepsize = 20 %}

{{
    config(
        materialized = 'incremental',
        incremental_strategy = 'insert_overwrite',
        partition_by = {
            'field': 'actuall_used_partition_by_column',
            'data_type': 'int64',
            'range': {
                'start': 0,
                'end': 100000,
                'interval': _stepsize,
            }
        },
    )
}}

SELECT
    *,
    _partition_by_column - MOD(_partition_by_column, _stepsize) AS actuall_used_partition_by_column
FROM {{ ref('source' }}
carlos-raylo commented 1 month ago

@tmatalla-wagner Thanks very much for this suggestion, will see if it helps!

elyobo commented 1 month ago

@carlos-raylo we're still running this one https://github.com/dbt-labs/dbt-bigquery/issues/16#issuecomment-1296028981; works well except that models using the strategy can't put things into the sql header because they end up being output twice IIRC.