Open sai3563 opened 6 months ago
In your offered change, you would like to create an empty tmp_table as you assume it's only used for schema change but actually it's also use to know on which partition restrict the actual MERGE statement
set (dbt_partitions_for_replacement) = (
select as struct
-- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
array_agg(distinct datetime_trunc(createddate, day) IGNORE NULLS)
from `dummy_project`.`target_dataset`.`the_table__dbt_tmp`
);
If you skip that, then your merge statement cost might skyrocket because it won't "prune" the partitions that are not eligible for merge.
You're better off using copy_partition
option in insert_overwrite
strategy as it won't bill you the merge too.
@github-christophe-oudar I went through your comment as well as the DBT query when copy_partition
is set to True
.
2 things I would like to point out:
1) I admit, I had overlooked the temp table being used in dbt_partitions_for_replacement
part to find modified partitions.
But with a slight modification of the query, we can make this work while keeping the cost saving benefit. This involves modifying dbt_partitions_for_replacement
to use the original query itself instead of the temp table. Since we are selecting only CreatedDate
column in outer query with pruning in WHERE
condition, cost would be roughly the same as selecting CreatedDate
from the temp table. Below is the full modified query:
create or replace table `dummy_project`.`target_dataset`.`the_table__dbt_tmp`
partition by datetime_trunc(createddate, day)
OPTIONS(
expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
SELECT
*
FROM
`dummy_project`.`source_dataset`.`the_table`
-- this filter will only be applied on an incremental run
WHERE date(CreatedDate) in ('2023-02-01', '2023-02-02')
and UpdatedDate >= '2023-02-20 19:00:01.873000'
) LIMIT 0; -- 0 MB processed here
-- generated script to merge partitions into `dummy_project`.`target_dataset`.`the_table`
declare dbt_partitions_for_replacement array<datetime>;
-- 1. temp table already exists, we used it to check for schema changes
-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
-- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
array_agg(distinct datetime_trunc(createddate, day) IGNORE NULLS)
from (
SELECT
*
FROM
`dummy_project`.`source_dataset`.`the_table`
-- this filter will only be applied on an incremental run
WHERE date(CreatedDate) in ('2023-02-01', '2023-02-02')
and UpdatedDate >= '2023-02-20 19:00:01.873000'
) -- full query being re-used here
);
-- 3. run the merge statement
merge into `dummy_project`.`target_dataset`.`the_table` as DBT_INTERNAL_DEST
using (
SELECT
*
FROM
`dummy_project`.`source_dataset`.`the_table`
-- this filter will only be applied on an incremental run
WHERE date(CreatedDate) in ('2023-02-01', '2023-02-02')
and UpdatedDate >= '2023-02-20 19:00:01.873000'
) as DBT_INTERNAL_SOURCE
on FALSE
when not matched by source
and datetime_trunc(DBT_INTERNAL_DEST.createddate, day) in unnest(dbt_partitions_for_replacement)
then delete
when not matched then insert
(`Column 1`, `Column 2`, `Column 3`, 'CreatedDate', 'UpdatedDate')
values
(`Column 1`, `Column 2`, `Column 3`, 'CreatedDate', 'UpdatedDate')
;
-- 4. clean up the temp table
drop table if exists `dummy_project`.`target_dataset`.`the_table__dbt_tmp`
2) The copy_partitions
method as you said, avoids data scanning of the merge process. So for this case, it is necessary to create temp table with data in it, as the partitions of the temp table are used later to replace partitions in the main table.
So I see 2 ways to move forward:
1) We apply the logic I've provided only when incremental_strategy
is insert_overwrite
and copy_partitions
is False
. We can also apply this logic when incremental_strategy
is merge
, in which case copy_partitions
cannot be set to True
. In both cases, we will see some cost savings compared to the existing setup.
2) We leave the code as it is, without any modification.
Personally, I do see some benefit in the modified logic, but I'll leave it to you to make the final call. No pressure. 🙂
Since we are selecting only CreatedDate column in outer query with pruning in WHERE condition, cost would be roughly the same as selecting CreatedDate from the temp table
I agree that it would work for a simple query but in some cases that include joins, windows functions and/or aggregations, I don't think it will costs that less. Let's assume we have a query (on a column with a date & a number), that costs: 1 GB for the full query, 500 MB for the partition selection and 1 MB output (eg a group by date and SUM). Then in that case, current solution would mean 1 GB (original query) + 1 MB (reading temp table) + 100 MB (the merge) whereas your approach would use 1 GB (original query) + 500 MB (query "optimized" for selected column) + 100 MB (the merge) which would be higher. My point is that it's not always better to either of those approaches.
So let's say temporary table query processes 5 GB of data and makes a temp table of 500 MB. Since the temp table is selected again during the merge process, total data processed would be 5 GB + 500 MB = 5.5 GB
I think that statement assumes that the destination table is empty too as MERGE statement on a non empty table is not free too.
I understand there are cases where it could be better but there's also cases where it could be worse and I don't know which ones are the most common.
Also I'm not the one making the call in the end 🙃 So I'll let the dbt Labs team figuring out if it's worth it.
You might also look into bringing another option that would let the end user customize the behavior to fit their needs but making it the default could be felt as a regression for those who have models such as my example.
Finally, you're also assuming most dbt users are using "per query pricing" and not "slot time pricing" which might also change a lot the actual outcome of that change.
@github-christophe-oudar
I agree that it would work for a simple query but in some cases that include joins, windows functions and/or aggregations, I don't think it will costs that less.
In some scenarios where window functions are involved or perhaps some complex left joins, I agree finding dbt_partitions_for_replacement
using my logic could cost more compared to existing query for that, but the additional cost would only be for one column i.e. CreatedDate
. We are saving temp table cost completely where all columns are selected in the previous step. Based on the data scanned in source table and data generated in the target, cost consumed might swing either way, so there is an uncertainty here.
But one thing that's for sure is that, at least for static insert_overwrite
and merge
methods of incremental_strategy
, my logic would save costs in 100% of on-demand scenarios, as finding date partitions isn't involved in that process.
I think that statement assumes that the destination table is empty too as MERGE statement on a non empty table is not free too.
I had simplified in the example. Destination table being empty or non-empty would not modify cost incurred in the existing logic or my proposed logic, as we aren't modifying how data is merged with destination. Cost incurred as far the destination table is concerned would be the same in both cases.
Finally, you're also assuming most dbt users are using "per query pricing" and not "slot time pricing" which might also change a lot the actual outcome of that change.
In slot time pricing, the charges are fixed. Based on the compute, query time might increase or decrease. Based on my understanding(not 100% sure, but I've roughly seen this), more data scanned == more compute, so if we manage to reduce data scanned, compute time should also reduce in most cases as it has less data to process. Cost would remain the same regardless.
Anyway, I understand that optimizing cost at the expense of everything else(in this case, increased code complexity) might not always be preferred, so I'm perfectly fine if this is marked as not planned
. 🙂
Is this your first time submitting a feature request?
Describe the feature
Currently, in static as well as dynamic
insert_overwrite
, whenon_schema_change
is set to anything other thanignore
, a temporary table is created, then that temporary table is used in the merge statement.E.g. (Dynamic insert_overwrite)
In the current setup, the query for temporary table is run for schema comparison, then the temporary table is used later.
So let's say temporary table query processes 5 GB of data and makes a temp table of 500 MB. Since the temp table is selected again during the merge process, total data processed would be 5 GB + 500 MB = 5.5 GB
Describe alternatives you've considered
My idea is to create a temp table, but with LIMIT 0, so the data processed for schema comparison is always 0 MB. Later, the query used for temporary table creation can be used directly within the merge statement, this way the additional cost of selecting temp table again won't be involved. So in my above example, the data processed would only be 5 GB.
Sample query
Depending on how big the temporary table is, cost savings would wary.
Who will this benefit?
Everyone using insert_overwrite would see lower costs in BigQuery
Are you interested in contributing this feature?
Yes
Anything else?
No response