dbt-labs / dbt-core

dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.
https://getdbt.com
Apache License 2.0
9.8k stars 1.62k forks source link

allow start-end-time-range specification for BigQuery insert_overwrite #2396

Closed hui-zheng closed 2 years ago

hui-zheng commented 4 years ago

Describe the feature

problem

Currently, the BigQuery insert_overwrite uses a list for partitions for partition replacement. In a static mode, it requires an config partitions = partitions_to_replace In a dymaic mode, it calculates the list of partitions from insert temp tables.

There is a limitation in some situations. 1) that we only want to run the incremental for less than 24 hours of data. For example, we do hourly runs that shall only update the data in the last 2 hours. 2) Another common insert_overwrite use-case is to re-process historical data (i.e. due to source data change or model logic change), however, the re-processing date range that doesn't fit precisely to a full-UTC-date range. For example, I only want to process data in 2020-04-01 00:00:00 PST to 2020-05-01 00:00:00 PST, in "America/Los_Angeles" timezone, not in UTC.

I understand that Partitions have to be at the day level, but I don't want to be limited to only allow data replacement data at the day level. I would like the ability to specify a range of any timestamp.

I also understand that when upserting 6 hours of data, BQ is still scanning the whole day partition. As far as optimizing for in-demand cost (bytes scanned), day partition is the atomic unit.

However, insert_overwrite is more than just cost-optimization, it needs to first fulfill the business requirements. it's important that it is flexible to replace exactly the given time-range of data that requires to be replaced and not touch data outside that range. In my use case #1, that means I am fine with some over-scanning (scanning 1 day of data), but I only want to replace the last 2 hours data, I don't want to touch or change data outside that 2 hours. It's an important business requirement.

proposed solution

To enhance static insert_overwrite to accept timestamp-range-based replacement_range, In addition to a list of utc-dates.

something like below in static insert_overwrite

  config(
    materialized = 'incremental',
    incremental_strategy = 'insert_overwrite',
    partition_by = {'field': 'session_start', 'data_type': 'timestamp'},
    partitions = {start: start_timestamp, end:end_timestamp}
  )

Additional context

this is BigQuery-specific

Who will this benefit?

It would be beneficial for users who use BigQuery and dbt, where dbt incremental models are executed based on a timestamp-range provided by some external process, such as scheduler.

appendix #1 Our full use case and context

We use the scheduler (airflow/prefect/dagster) to run dbt incremental with a specifically-defined time-range for every dbt run, ( not using incremental on max(stamp) approach ) We run a lot of time-series data models. every hour, our scheduler (airflow/prefect/dagster) triggers a dbt run that performs an incremental update for data in the defined-time-range (i.e. the past 2 hours). We also often do re-processing and backfill on those models, where we do dbt incremental run for data only in a defined time-range. (i.e. Feb 2020, or year 2019). We want to be very precise about the data time range to update and want the caller to provide that information to dbt run. The > this.max(stamp) approach is not sufficient for us.

Below are the requirements for these time-range-defined incremental models

jtcohen6 commented 4 years ago

Thanks for the issue. If I'm understanding you right, what you're asking for is a different way of configuring the static insert_overwrite incremental strategy using `partitions.

Current

You supply partitions = ['2020-05-01', '2020-05-02'], and dbt generates a merge statement that looks like

merge into my_dataset.my_incr_model as DBT_INTERNAL_DEST
        using ( ... sql ...) as DBT_INTERNAL_SOURCE
        on FALSE

    when not matched by source
        and DBT_INTERNAL_DEST.session_start in ('2020-05-01', '2020-05-02')
        then delete

    when not matched then insert
        ( ... columns ... )
    values
        ( ... columns ... )

Desired

You could specify partitions = {'start': '2020-04-01 00:00:00 PST', end: '2020-05-01 00:00:00 PST'} and have dbt generate a merge statement that looks like

merge into my_dataset.my_incr_model as DBT_INTERNAL_DEST
        using ( ... sql ...) as DBT_INTERNAL_SOURCE
        on FALSE

    when not matched by source
        and DBT_INTERNAL_DEST.session_start between '2020-04-01 00:00:00 PST' and '2020-05-01 00:00:00 PST'
        then delete

    when not matched then insert
        ( ... columns ... )
    values
        ( ... columns ... )

I think this specific use case is something you could accomplish today, in your own project, by overriding the macro bq_insert_overwrite and changing these lines: https://github.com/fishtown-analytics/dbt/blob/7c916e9bdb78f36f49b0258ede0cddc1edab8a9c/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql#L25-L29 E.g., to

      {% set predicate -%}
          {{ partition_by.render(alias='DBT_INTERNAL_DEST') }} between
              {{ partitions[0] }} and {{ partitions[1] }}
      {%- endset %}

General case

Here's what I'm trying to square:

To my mind, the more coherent and compelling resolution here would be to use a different incremental strategy, halfway between merge and insert_overwrite, which both prunes partitions (for performance) and upserts on a unique key (so as not to drop + replace all preexisting data in the partition).

I'd be curious to hear what other BQers think! In the long run, I'm especially interested in building out a more extensible framework for users to write and use their own incremental strategies without needing to copy-paste override all the materialization code (#2366).

hui-zheng commented 4 years ago

@jtcohen6

yes, you are right about my desired approach, and thank you for the suggested overwrite. yes, that's what we are doing now to overwrite the existing behavior.

For the general case, I would like to suggest that there is a general case need for time-range partition insert_overwrite. To answer your two points:

jtcohen6 commented 4 years ago

@hui-zheng I think BigQuery may have beaten us both to the punch here: hourly partitioning is now in beta.

hui-zheng commented 4 years ago

indeed!

hui-zheng commented 4 years ago

@jtcohen6 Hi hope all is well.

Just want to continue the discussion here. I don't think the BigQuery hourly partitioning is the final solution. hourly-partitioning is not ideal and shall not be used for long time range historical data, such as data over years.

The fundamental limitation about the existing insert_overwrite() macro is that it assumes that the result for insert_overwrite always contains data of complete days in UTC time. if the to-be-inserted result contains any partial date data in it, the insert_overwrite will generate data with gaps.

github-actions[bot] commented 2 years ago

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days.

github-actions[bot] commented 2 years ago

Although we are closing this issue as stale, it's not gone forever. Issues can be reopened if there is renewed community interest; add a comment to notify the maintainers.

jpburbank commented 2 months ago

@hui-zheng do you remember what you ended up doing in this situation? Im in a similar situation 2 years later.