dbt-labs / dbt-core

dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.
https://getdbt.com
Apache License 2.0
9.89k stars 1.63k forks source link

[EPIC] Incremental Model Improvements - Microbatch #10624

Open QMalcolm opened 2 months ago

QMalcolm commented 2 months ago

Incremental models in dbt is a materialization strategy designed to efficiently update your data warehouse tables by only transforming and loading new or changed data since the last run. Instead of processing your entire dataset every time, incremental models append or update only the new rows, significantly reducing the time and resources required for your data transformations.

Even with all the benefits of incremental models as they exist today, there are limitations with this approach, such as:

In this project we're aiming to make incremental models easier to implement and more efficient to run.

P0s - Core

### P0s - Core Framework
- [ ] https://github.com/dbt-labs/dbt-core/issues/9490
- [ ] https://github.com/dbt-labs/dbt-adapters/issues/294
- [ ] https://github.com/dbt-labs/dbt-core/issues/10635
- [ ] https://github.com/dbt-labs/dbt-core/issues/10637
- [ ] https://github.com/dbt-labs/dbt-core/issues/10638
- [ ] https://github.com/dbt-labs/dbt-core/issues/10636
- [ ] https://github.com/dbt-labs/dbt-core/issues/10662
- [ ] https://github.com/dbt-labs/dbt-core/issues/10639
- [ ] https://github.com/dbt-labs/dbt-adapters/issues/302
- [ ] https://github.com/dbt-labs/dbt-core/issues/10700
- [ ] https://github.com/dbt-labs/dbt-core/issues/10713
- [ ] https://github.com/dbt-labs/dbt-snowflake/issues/1182
- [ ] https://github.com/dbt-labs/dbt-core/issues/10729
- [ ] https://github.com/dbt-labs/dbt-core/issues/10715
- [ ] https://github.com/dbt-labs/dbt-core/issues/10701
- [ ] https://github.com/dbt-labs/dbt-core/issues/10709
- [ ] https://github.com/dbt-labs/iglu.getdbt.com/issues/28
- [ ] https://github.com/dbt-labs/dbt-core/issues/10714
- [ ] https://github.com/dbt-labs/dbt-core/issues/10761
- [ ] https://github.com/dbt-labs/dbt-core/issues/10785
- [ ] https://github.com/dbt-labs/dbt-core/issues/10800
- [ ] https://github.com/dbt-labs/dbt-core/issues/10798

P0s - Adapters

### P0s - Adapters
- [ ] https://github.com/dbt-labs/dbt-postgres/issues/149
- [ ] https://github.com/dbt-labs/dbt-snowflake/issues/1182
- [ ] https://github.com/dbt-labs/dbt-spark/issues/1109
- [ ] https://github.com/dbt-labs/dbt-bigquery/issues/1354
- [ ] https://github.com/dbt-labs/dbt-core/issues/10815
- [ ] [dbt-athena] Microbatch Strategy

Bugs

### Beta bugs
- [ ] https://github.com/dbt-labs/dbt-core/issues/10867
- [ ] https://github.com/dbt-labs/dbt-adapters/issues/330
- [ ] https://github.com/dbt-labs/dbt-core/issues/10862
- [ ] https://github.com/dbt-labs/dbt-core/issues/10868
- [ ] https://github.com/dbt-labs/dbt-bigquery/issues/1376
- [ ] https://github.com/dbt-labs/dbt-core/issues/10926
- [ ] https://github.com/dbt-labs/dbt-core/issues/10927
- [ ] https://github.com/dbt-labs/dbt-snowflake/issues/1228
- [ ] https://github.com/dbt-labs/dbt-core/issues/10840

P1s

### P1s
- [ ] cold_storage configuration - I can limit how much old data we need to maintain as active or readily available
- [ ] https://github.com/dbt-labs/dbt-core/issues/10786
- [ ] https://github.com/dbt-labs/dbt-core/issues/10819
- [ ] https://github.com/dbt-labs/dbt-core/issues/10874
- [ ] https://github.com/dbt-labs/dbt-bigquery/issues/1355
- [ ] https://github.com/dbt-labs/dbt-core/issues/10855
- [ ] https://github.com/dbt-labs/dbt-core/issues/10853

P2s

### P2s
- [ ] https://github.com/dbt-labs/dbt-core/issues/10702
MaartenN1234 commented 1 month ago

Just for my understanding: Is it right that this issue seeks to address technical (performance/load) issues in models that take just one single ref as source (or if it has other sources as well we assume them to be stale) ?

I am looking for ways to support incremental processing of multi-table join models (e.g. https://discourse.getdbt.com/t/template-for-complex-incremental-models/10054, but I've seen many more similar help requests on community forums). To be sure, such features will not be in scope right ?

QMalcolm commented 1 month ago

Just for my understanding: Is it right that this issue seeks to address technical (performance/load) issues in models that take just one single ref as source (or if it has other sources as well we assume them to be stale) ?

I am looking for ways to support incremental processing of multi-table join models (e.g. https://discourse.getdbt.com/t/template-for-complex-incremental-models/10054, but I've seen many more similar help requests on community forums). To be sure, such features will not be in scope right ?

@MaartenN1234 I'm not sure that I fully understand the question being asked. For my clarity, is the question whether this new functionality will support more than one input to an incremental model? If so, the answer is yes!

For example, say we turn the jaffle-shop customers model into an incremental microbatch model. It'd look like the following

{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='created_at', batch_size='day') }}

with

customers as (
    select * from {{ ref('stg_customers') }}
),

orders as (
    select * from {{ ref('orders') }}
),

customer_orders_summary as (
    select
        orders.customer_id,
        count(distinct orders.order_id) as count_lifetime_orders,
        count(distinct orders.order_id) > 1 as is_repeat_buyer,
        min(orders.ordered_at) as first_ordered_at,
        max(orders.ordered_at) as last_ordered_at,
        sum(orders.subtotal) as lifetime_spend_pretax,
        sum(orders.tax_paid) as lifetime_tax_paid,
        sum(orders.order_total) as lifetime_spend
    from orders
    group by 1
),

joined as (
    select
        customers.*,
        customer_orders_summary.count_lifetime_orders,
        customer_orders_summary.first_ordered_at,
        customer_orders_summary.last_ordered_at,
        customer_orders_summary.lifetime_spend_pretax,
        customer_orders_summary.lifetime_tax_paid,
        customer_orders_summary.lifetime_spend,
        case
            when customer_orders_summary.is_repeat_buyer then 'returning'
            else 'new'
        end as customer_type
    from customers

    left join customer_orders_summary
        on customers.customer_id = customer_orders_summary.customer_id
)

select * from joined

If the models orders and stg_customers both have an event_time defined (they don't need to be incremental themselves), then they will automatically be filtered and batched by the generated event time filters.

MaartenN1234 commented 1 month ago

Just for my understanding: Is it right that this issue seeks to address technical (performance/load) issues in models that take just one single ref as source (or if it has other sources as well we assume them to be stale) ? I am looking for ways to support incremental processing of multi-table join models (e.g. https://discourse.getdbt.com/t/template-for-complex-incremental-models/10054, but I've seen many more similar help requests on community forums). To be sure, such features will not be in scope right ?

@MaartenN1234 I'm not sure that I fully understand the question being asked. For my clarity, is the question whether this new functionality will support more than one input to an incremental model? If so, the answer is yes!

For example, say we turn the jaffle-shop customers model into an incremental microbatch model. It'd look like the following

{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='created_at', batch_size='day') }}

with

customers as (
    select * from {{ ref('stg_customers') }}
),

orders as (
    select * from {{ ref('orders') }}
),

customer_orders_summary as (
    select
        orders.customer_id,
        count(distinct orders.order_id) as count_lifetime_orders,
        count(distinct orders.order_id) > 1 as is_repeat_buyer,
        min(orders.ordered_at) as first_ordered_at,
        max(orders.ordered_at) as last_ordered_at,
        sum(orders.subtotal) as lifetime_spend_pretax,
        sum(orders.tax_paid) as lifetime_tax_paid,
        sum(orders.order_total) as lifetime_spend
    from orders
    group by 1
),

joined as (
    select
        customers.*,
        customer_orders_summary.count_lifetime_orders,
        customer_orders_summary.first_ordered_at,
        customer_orders_summary.last_ordered_at,
        customer_orders_summary.lifetime_spend_pretax,
        customer_orders_summary.lifetime_tax_paid,
        customer_orders_summary.lifetime_spend,
        case
            when customer_orders_summary.is_repeat_buyer then 'returning'
            else 'new'
        end as customer_type
    from customers

    left join customer_orders_summary
        on customers.customer_id = customer_orders_summary.customer_id
)

select * from joined

If the models orders and stg_customers both have an event_time defined (they don't need to be incremental themselves), then they will automatically be filtered and batched by the generated event time filters.

The critical requirement for me, is that matching rows (on the join condition) in both sources are not neccesarily created in the same batch. So when the filter is on the sources independently: select from {{ ref('stg_customers') }} where event_time > last_processed_event_time and select from {{ ref('orders') }} where event_time > last_processed_event_time

stuff will be wrong (e.g. if we would load one more order, we would loose all previous from the aggregate or when the customer data is updated while no new orders for this client are to be processed the update will not be propagated).

To get it right, it should become somewhat like this: select from {{ ref('stg_customers') }} where event_time > last_processed_event_time or (customer_id IN ( select customer_id from {{ ref('orders') }} where event_time > last_processed_event_time)) and select from {{ ref('orders') }} where (customer_id IN ( select customer_id from {{ ref('stg_customers') }} where event_time > last_processed_event_time UNION ALL select customer_id from {{ ref('orders') }} where event_time > last_processed_event_time))

So one needs to incorporate the join clause and the aggregation into the change detection

QMalcolm commented 1 month ago

Sorry for accidentally closing this as completed last week. As penance, here is a photo of my cat Misu. He is very excited about microbatch models

IMG_4238