dbt-labs / dbt-bigquery

dbt-bigquery contains all of the code required to make dbt operate on a BigQuery database.
https://github.com/dbt-labs/dbt-bigquery
Apache License 2.0
219 stars 154 forks source link

[Bug] incremental run with __dbt_tmp table does not log the real bytes_billed in run_results.json #1230

Closed matthias-fbi closed 6 months ago

matthias-fbi commented 6 months ago

Is this a new bug in dbt-core?

Current Behavior

I have a model A that uses a big table with web events with multiple thousand new rows per minute. I'm using an incremental config like:

{{
    config(
        materialized="incremental",
        on_schema_change="append_new_columns",
        partition_by={
            "field": "derived_tstamp",
            "data_type": "timestamp",
            "granularity": "day"
        },
        partition_expiration_days = 1
    )
}}

with
    events as (
        select
            derived_tstamp
          , event_id
          , event_name
        from
            {{ source('atomic', 'events') }}
        where
              timestamp_trunc(derived_tstamp, day)                                      = timestamp(current_date())
          {% if is_incremental() %}
          -- this filter will only be applied on an incremental run
          -- (uses >= to include records whose timestamp occurred since the last run of this model)
          and derived_tstamp >= (select coalesce(max(derived_tstamp), timestamp(current_date())) from {{ this }} where timestamp_trunc(derived_tstamp, day) = timestamp(current_date()))
          {% endif %}
          -- event_ids not guaranteed to be unique --> dedup needed
          qualify row_number() over (partition by event_id order by derived_tstamp) = 1
    )

select * from events

I have another model B that aggregates the data from model A. I need to run this model every 5 minutes for the current day for near realtime KPI tracking for some business reasons.

select
    extract(date from derived_tstamp) as date
  , count(case when event_name = "page_view" then 1) as page_view_count
  , count(case when event_name = "page_ping" then 1) as page_ping_count
from
    {{ ref(model_A) }}
group by
    all

I run this project with

dbt run --select +model_B

The target/run_results.json does not contain all executions and their bytes_billed. I get the information for model_A, model_B but the MERGE statement into/from the __dbt_tmp table is NOT included.

Per run I should get the following bytes_billed per run in the run_results.json according to the BQ logs: model_A has ~20 Megabytes model_B has ~100 Megabytes MERGE has ~1.29 Gigabyte

but I only have: model_A has ~20 Megabytes model_B has ~100 Megabytes

The one that creates the most bytes_billed is missing in the logs:

create or replace table `rumblebdp-premium`.`dbt_matthias`.`stg_events__dbt_tmp`
...

Expected Behavior

The resulting target/run_results.json file contains all the executions and their bytes_billed value, including the execution of model_A and model_B and everything that the "incremental" config does in the background with the MERGE and the __dbt_tmp table.

Steps To Reproduce

  1. Have a source table in BigQuery
  2. build a model_A with incremental config
  3. build a second model_B that references model_A
  4. set both models to materialization: table
  5. dbt run --select +model_B
  6. compare run_results.json and BigQuery query logs

Relevant log output

{
    "metadata": {
        "dbt_schema_version": "https://schemas.getdbt.com/dbt/run-results/v5.json",
        "dbt_version": "1.7.13",
        "generated_at": "2024-05-08T14:01:58.167241Z",
        "invocation_id": "947ca4de-e64d-48fc-a1f5-00dc2569fd3a",
        "env": {
            "DBT_CLOUD_PROJECT_ID": "263276",
            "DBT_CLOUD_RUN_ID": "281590108",
            "DBT_CLOUD_JOB_ID": "550253",
            "DBT_CLOUD_RUN_REASON": "scheduled",
            "DBT_CLOUD_RUN_REASON_CATEGORY": "scheduled",
            "DBT_CLOUD_RUN_TRIGGER_CATEGORY": "RUN_REASON_CATEGORY_SCHEDULED",
            "DBT_CLOUD_ENVIRONMENT_ID": "230967",
            "DBT_CLOUD_ACCOUNT_ID": "42862"
        }
    },
    "results": [
        {
            "status": "success",
            "timing": [
                {
                    "name": "compile",
                    "started_at": "2024-05-08T14:01:33.932621Z",
                    "completed_at": "2024-05-08T14:01:33.939517Z"
                },
                {
                    "name": "execute",
                    "started_at": "2024-05-08T14:01:33.941235Z",
                    "completed_at": "2024-05-08T14:01:47.666766Z"
                }
            ],
            "thread_id": "Thread-1 (worker)",
            "execution_time": 13.737401247024536,
            "adapter_response": {
                "_message": "MERGE (109.9k rows, 10.8 MiB processed)",
                "code": "MERGE",
                "rows_affected": 109936,
                "bytes_processed": 11325621,
                "bytes_billed": 20971520,
                "location": "EU",
                "project_id": "somecompanybdp-premium",
                "job_id": "dff0ff13-d13c-4444-9fef-087cd781e7af",
                "slot_ms": 2424
            },
            "message": "MERGE (109.9k rows, 10.8 MiB processed)",
            "failures": null,
            "unique_id": "model.dbt_bdp_premium.stg_events",
            "compiled": true,
            "compiled_code": "-- fmt: off\n\n\n\nwith\n    events as (\n        select\n            event_id\n          , event_name\n          , contexts_com_snowplowanalytics_snowplow_web_page_1_0_0[SAFE_OFFSET(0)].id as page_view_id\n          , regexp_extract(page_urlpath, r\"-w(\\d+)-\")                                 as article_id\n          , derived_tstamp\n        from\n            `somecompanybdp-premium`.`atomic`.`events`\n        where\n              timestamp_trunc(derived_tstamp, day)                                      = timestamp(current_date())\n          and event_name                                                                in (\"page_view\", \"page_ping\")\n          and regexp_extract(page_urlpath, r\"-w(\\d+)-\")                                 is not null\n          and contexts_com_snowplowanalytics_snowplow_web_page_1_0_0[SAFE_OFFSET(0)].id is not null -- filters ghost pings\n          \n          -- this filter will only be applied on an incremental run\n          -- (uses >= to include records whose timestamp occurred since the last run of this model)\n          and derived_tstamp >= (select coalesce(max(derived_tstamp), timestamp(current_date())) from `somecompanybdp-premium`.`bdp_premium`.`stg_events` where timestamp_trunc(derived_tstamp, day) = timestamp(current_date()))\n          \n          -- event_ids not guaranteed to be unique --> dedup needed (same logic as in NOZ which got it from: base/scratch/bigquery/snowplow_web_base_events_this_run.sql)\n          qualify row_number() over (partition by event_id order by derived_tstamp) = 1\n    )\n\n\nselect * from events",
            "relation_name": "`somecompanybdp-premium`.`bdp_premium`.`stg_events`"
        },
        {
            "status": "success",
            "timing": [
                {
                    "name": "compile",
                    "started_at": "2024-05-08T14:01:47.673694Z",
                    "completed_at": "2024-05-08T14:01:47.677819Z"
                },
                {
                    "name": "execute",
                    "started_at": "2024-05-08T14:01:47.678873Z",
                    "completed_at": "2024-05-08T14:01:58.145991Z"
                }
            ],
            "thread_id": "Thread-3 (worker)",
            "execution_time": 10.474908113479614,
            "adapter_response": {
                "_message": "CREATE TABLE (16.4k rows, 117.7 MiB processed)",
                "code": "CREATE TABLE",
                "rows_affected": 16366,
                "bytes_processed": 123368594,
                "bytes_billed": 123731968,
                "location": "EU",
                "project_id": "somecompanybdp-premium",
                "job_id": "1078b873-3c2a-4699-9f83-04849c19d100",
                "slot_ms": 30085
            },
            "message": "CREATE TABLE (16.4k rows, 117.7 MiB processed)",
            "failures": null,
            "unique_id": "model.dbt_bdp_premium.int_article_kpis",
            "compiled": true,
            "compiled_code": "-- fmt: off \nwith\n  events as (\n    select\n        *\n    from\n        `somecompanybdp-premium`.`bdp_premium`.`stg_events`\n    where\n        timestamp_trunc(derived_tstamp, day) = timestamp(current_date())\n)\n\n\n, article_metadata as (\n    select\n        events.article_id\n      --, max(content_article_metadata.article_title)    as article_title\n      --, max(content_article_metadata.article_newsroom) as article_newsroom\n      , extract(date from events.derived_tstamp)       as date \n    from\n        events\n    --left join\n    --    `analytics-304220.content.article_metadata` as content_article_metadata\n    --    on events.article_id = content_article_metadata.article_id\n    group by\n        all\n)\n\n\n, article_clicks as (\n    select\n        article_id\n      , extract(date from derived_tstamp) as date\n      , count(distinct event_id)          as clicks\n    from\n        events\n    where\n        event_name = \"page_view\"\n    group by\n        all\n)\n\n\n, article_engagement_per_page_view_id as (\n      select\n          article_id\n        , page_view_id\n        , extract(date from derived_tstamp) as date\n        /*\n          aggregate pings:\n          - divides epoch tstamps by snowplow__heartbeat to get distinct intervals\n          - floor rounds to nearest integer - duplicates all evaluate to the same number\n          - count(distinct) counts duplicates only once\n          - adding snowplow__min_visit_length accounts for the page view event itself\n        */\n        , 10 * (count(distinct(floor(unix_seconds(derived_tstamp) / 10))) - 1) + 5 as engaged_time_in_sec\n      from\n          events\n      where\n          event_name = 'page_ping'\n      group by\n          all\n)\n\n\n, article_engagement as (\n      select\n          *\n      from\n          (\n              select\n                  article_id\n                , date\n                , sum(engaged_time_in_sec) as engaged_time_in_sec\n              from\n                  article_engagement_per_page_view_id\n              group by\n                  all\n          )\n)\n\n\n, article_kpis as (\n      select\n          article_metadata.date\n        , article_metadata.article_id\n        --, article_metadata.article_newsroom\n        --, article_metadata.article_title\n        , article_clicks.clicks                                                as clicks\n        , cast(round(article_engagement.engaged_time_in_sec / 60, 2) as int64) as engaged_time_in_min\n      from\n          article_metadata\n      left join\n          article_clicks\n          on  article_metadata.article_id = article_clicks.article_id\n      left join\n          article_engagement\n          on  article_metadata.article_id = article_engagement.article_id\n)\n\n\nselect\n    *\nfrom\n    article_kpis\norder by\n    engaged_time_in_min desc\n  , article_id",
            "relation_name": "`somecompanybdp-premium`.`bdp_premium`.`int_article_kpis`"
        }
    ],
    "elapsed_time": 34.74582481384277,
    "args": {
        "defer": false,
        "warn_error_options": {
            "include": [],
            "exclude": []
        },
        "macro_debugging": false,
        "cache_selected_only": false,
        "project_dir": "/tmp/jobs/281596108/target",
        "printer_width": 80,
        "log_path": "/tmp/jobs/281596108/target/logs",
        "log_file_max_bytes": 10485760,
        "vars": {},
        "indirect_selection": "eager",
        "favor_state": false,
        "version_check": true,
        "quiet": false,
        "partial_parse": true,
        "log_level_file": "debug",
        "log_format_file": "json",
        "populate_cache": true,
        "strict_mode": false,
        "debug": true,
        "invocation_command": "dbt --log-format json --debug run --select +int_article_kpis --target bigquery --profile user --profiles-dir /tmp/jobs/281596108/.dbt --project-dir /tmp/jobs/281596108/target",
        "static_parser": true,
        "profile": "user",
        "introspect": true,
        "use_colors": true,
        "target": "bigquery",
        "log_format": "json",
        "write_json": true,
        "print": true,
        "send_anonymous_usage_stats": true,
        "show_resource_report": false,
        "use_colors_file": true,
        "profiles_dir": "/tmp/jobs/281596108/.dbt",
        "log_level": "info",
        "enable_legacy_logger": false,
        "select": [
            "+int_article_kpis"
        ],
        "which": "run",
        "exclude": [],
        "partial_parse_file_diff": true
    }
}

Environment

- OS: Windows 11
- Python: 3.12.3
- dbt: core 1.7.14
- dbt: bigquery 1.7.8
- dbt: cloud 1.7.13
- dbt: cloud bigquery: 1.7.7

Which database adapter are you using with dbt?

bigquery

Additional Context

No response

dbeatty10 commented 6 months ago

@matthias-fbi thanks for reporting this along with such a detailed write-up 🤩

This looks to me like the same thing reported in https://github.com/dbt-labs/dbt-bigquery/issues/602, so I'm going to close this one as a duplicate to consolidate the discussion there.