Datavault-UK / automate-dv

A free to use dbt package for creating and loading Data Vault 2.0 compliant Data Warehouses (powered by dbt, an open source data engineering tool, registered trademark of dbt Labs)
https://www.automate-dv.com
Apache License 2.0
511 stars 131 forks source link

[FEATURE] Ability to control ORDER BY in FIRST_RECORD_IN_SET/UNIQUE_SOURCE_RECORDS on SAT macro compilation #213

Closed erikwilkins-doosan closed 1 year ago

erikwilkins-doosan commented 1 year ago

Is your feature request related to a problem? Please describe. Having "ORDER BY sd.load_datetime ASC" as default Order By in SAT macro covers most cases but on the chance that the load_datetime is not complete ORDER BY list, it would be ideal if additional user provided fields could be used instead to control the ORDER BY. This could be similar to the AUTOMATE_DV_RANK creation

first_record_in_set AS (
    SELECT
    sd.hash_key, sd.hash_diff, sd.effective_from_datetime, sd.load_datetime, sd.record_source,
    RANK() OVER (
            PARTITION BY sd.hash_key
            ORDER BY sd.load_datetime ASC
        ) as asc_rank
    FROM source_data as sd
    QUALIFY asc_rank = 1
)

unique_source_records AS (
    SELECT DISTINCT
        d.hash_key, sd.hash_diff, sd.effective_from_datetime, sd.load_datetime, sd.record_source
    FROM source_data as sd
    QUALIFY sd.hash_diff != LAG(sd.hash_diff) OVER (
        PARTITION BY sd.hash_key
        ORDER BY sd.load_datetime ASC)
)

Describe the solution you'd like Optional parameter/method for SAT macro that allows ORDER BY override

Describe alternatives you've considered Tried adjusting the LOAD_DATETIME in DERIVED_COLUMNS of STG macro by adding a ROW_NUMBER to it via TIMESTAMPADD but would want to partition ROW_NUMBER by HASH_KEY from the HASHED_COLUMNS section that comes after the DERIVED_COLUMNS so would have to partition by same fields used in HASH_KEY which has its own risk

Additional context N/A

DVAlexHiggs commented 1 year ago

Hi. Thank you for this suggestion.

I'd like to understand your use case for why you need this feature in a bit more detail as this is non-standard and will most likely break the satellite loading pattern.

I'm inclined to say either you are misusing the satellite or you have data quality issues which shouldn't be solved in your satellite, but downstream as a business rule in your business vault.

Also, when you say "missing" what do you mean? This LOAD_DATETIME is not derived from the source data. Please read our documentation on what the LOAD_DATETIME should be

erikwilkins-doosan commented 1 year ago

@DVAlexHiggs , (1) Mentioned here it recommends using dbt's run_started_at variable as the value for your derived column. If the LOAD_DATETIME is then a static value for the model execution (which is what we are doing), then it won't be able to properly order the records if there are multiple versions for a single hash_key to be loaded like what we are capturing from our Oracle CDC via replication process. (2) Where are you seeing "missing"?

DVAlexHiggs commented 1 year ago

@DVAlexHiggs , Mentioned here it recommends using dbt's run_started_at variable as the value for your derived column. If the LOAD_DATETIME is then a static value for the model execution (which is what we are doing), then it won't be able to properly order the records if there are multiple versions for a single hash_key to be loaded.

That's not how it works, generally speaking. Your stage must have a different LOAD_DATETIME per batch.

We advise the run_started_at assuming that users will run dbt on a schedule or for each batch. The order by logic is standard and necessary if you have intra-day records (multiple LDTS for one day) in a single batch. We also use it to grab a single instance of a record per key for each sub/micro batch within a single batch being processed by the satellite.

If your source data is being being loaded periodically into your warehouse by an external orchestration tool then you should be using the timestamp of each batch it loads.

If using the apply_source_filter flag then this shouldn't be a problem.

I was referring to "not complete" when I said missing. Sorry for the confusion.

Please see also the extensively documented Satellite loading section of our best practises for how to correctly load Satellites according to the standards and AutomateDV assumptions. Let me know if this needs further clarification and I'd be more than happy to update the docs!

erikwilkins-doosan commented 1 year ago

@DVAlexHiggs , let me try taking a step back and better explaining our current process.

We are streaming data from an Oracle system on a near real-time basis. We are running hourly dbt executions to load data from Snowflake streams on these replicated Oracle tables into Data Vault tables. If an Oracle record is updated 3 times during an hour, there would be 3 records on the Snowflake stream. Based on Oracle docs the best way to order the records is the System Change Number (SCN) and the Record Set ID (RS_ID) that come across with the CDC'd changes. In current STG macro we have the following code being used to populate AUTOMATE_DV_RANK that is then being used by the SAT macro and VAULT_INSERT_BY_RANK materialization to cycle thru them in the proper order.

ranked_columns:
  automate_dv_rank:
    partition_by: "hash_key"
    order_by:
      - "scn"
      - "rs_id"

With the System Change Number being in numeric form and not timestamp, it can't be used for LOAD_DATETIME which goes back to the ask to allow for overriding the default _sd.loaddatetime in the first_record_in_set and unique_source_records CTEs.

DVAlexHiggs commented 1 year ago

Ok I understand better now. My argument is that we shouldn't be changing the satellite loading pattern, as this is standard and will work in 99.999...% of situations. This should really be solved in staging, which is why we have things like the ranked_columns config.

Now, saying this we are considering deprecating the vault_insert_by_x materialisations as they are mostly redundant now, since the satellite change sin AutomateDV 0.10.x.

My recommendation would be to process this CDC data in a pre-staging area for your specific use case and then feed your primed stage (AutomateDV models using stage()) the values as necessary, having already solved it.

A future improvement to AutomateDV will be having native support for Snowflake streams, but this isn't helpful for solving this right now.

erikwilkins-doosan commented 1 year ago

@DVAlexHiggs , will look into pre-processing these records. Closing this FEATURE request for now and hopefully for good.