Open mpavillet opened 3 years ago
@mpavillet This is a really interesting issue. Thanks for the thoughtful & thorough write-up!
In some sense, you're actually in a better position than most folks using snapshots. Those folks need to use snapshots, because they don't have access to a full roster of historical values; you have them in spades. For me, this begs a question: which is the more important feature of snapshots?
I think it's the latter. The logic underpinning the timestamp
and check_cols
strategies is cool and all, but really a lot of the snapshot-specific functionality exists to make them rock-solid reliable: every time a snapshot runs, it either does its thing, or historical data could be gone for good.
If you have guaranteed access to all historical values, with a full table copy from every single day, and you want to summarize those values into slowly changing dimensions—you can write the SQL for it, and wrap that SQL in an incremental model. That SQL is not trivial, but (I think) it's also less complicated than you'd expect.
Plus, that incremental model can include logic to dynamically grab only the newest days of data, dynamically determined based on a bookmark from the model's own last run—so there's no need to manually reprocess day by day. If at any time you need to change the logic, you can --full-refresh
without fear of losing historical data. It may just cost some extra time and warehouse credits to scan over every date cluster in your data lake.
Here's some (untested) code I'm imagining for that model:
{{ config(
materialized = 'incremental',
unique_key = 'scd_id',
merge_update_columns = ['valid_to']
) }}
with all_or_new as (
select * from {{ source('my_date_clustered', 'source_table') }}
{% if is_incremental() %}
-- get the latest day of data from when this model was last run.
-- always reprocess that day and the day before (just to be safe)
-- plus anything newer, of course
where date_day >= {{ dbt_utils.dateadd(
datepart='day', interval=1, from_date_or_timestamp= '(select max(updated_at) from {{ this }})'
) }}
{% endif %}
),
hashed as (
select *,
{{ dbt_utils.surrogate_key(['id', 'updated_at']) }} as scd_id
from all_or_new
),
windowed as (
select *,
min(date_day) over (partition by scd_id) as valid_from,
-- valid_to should be null if this scd_id appears in the most recent day of data
-- (i.e. the record is still valid)
nullif(
max(date_day) over (partition by scd_id),
max(date_day) over ()
) as valid_to
from windowed
group by 1
)
select * from windowed
-- deduplicate: only take the latest day of data for each scd_id
qualify row_number() over (partition by scd_id order by date_day) = 1
There are a few significant things to call out here:
scd_id
to use as the unique_key
. That's what Snowflake will use to merge on, and update previous records with new values. If updated_at
changes, the scd_id
changes, and generates a new record in the resulting table. When valid_to
changes from today to yesterday, because that value of updated_at
no longer appears in today's data, then the merge will update the associated scd_id
with the new (not-null) valid_to
.updated_at
), since that's what your example above includes. We could equally calculate the scd_id
as a hash of specific column values instead.merge_update_columns
config (docs) is new in dbt v0.20, and it enables you to overwrite only a subset of columns for existing rows during incremental runs, instead of the entire row. This makes our job here a lot easier. When the latest version of a record keeps appearing day after day, we don't need to bend over backwards to keep grabbing its correct valid_from
, since we only care about updating valid_to
.Curious to hear what you think! Does that logic feel unwieldy? Have I missed a few thorny details of your specific use case?
This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days.
Although we are closing this issue as stale, it's not gone forever. Issues can be reopened if there is renewed community interest; add a comment to notify the maintainers.
can you revise your logic @jtcohen6 as i don't think it correct
We need to update the old row (valid_to) and insert the new one. I don't see the logic here. You may try to update the old one but how about the new one?. And the way you create sk is difficult to understand.{{ dbt_utils.surrogate_key(['id', 'updated_at']) }}
. For sure, every single record change log have only sk as every change the unique is id and updated_at. So why we you need to partition by this sk if we just have only one record for it?
on the other hand from windowed as is select its self, it should be select * from hashed
and what is date_day in your query?
Is this the same issue as the one described here?
Describe the feature
dbt snapshot feature to handle scd2 changes when a give id exists several times (data is appended to the source table, as opposed to overwritten). Currenty, dbt snapshots assume that:
Describe alternatives you've considered
The way our data lake works (dbt reads from the data lake and writes to our data warehouse) - those are two distinct databases in Snowflake. We get a FULL copy of the prod table every day, clustered by date. Thus, we have a full table every day. The delta between day and day-1 is what we need for scd2. The only way I’ve made it work so far is: that snapshots pull from the latest version of the table in prod. Problem: if we lose a day or two, we need to re-run for the missing days, which I’ve made work by passing a variable like dbt snapshot --var ref_date = 'my date' Sadly, snapshots in dbt don’t allow to iterate over an array of dates
Additional context
It's not db specific but snowflake does throw this
ERROR_ON_NONDETERMINISTIC_MERGE
if an id exists several times. When set to false, Snowflake doesn't throw any error but does not handle scd2 changes properly. (see dates below):Who will this benefit?
Anyone who wishes to have scd2 snapshots derived from append tables or create a scd2 table off off an incremental table where an id can exists several time.
Are you interested in contributing this feature?
Yes. I am happy to look into the SQL, what needs to be done, update the macro.