databricks / dbt-databricks

A dbt adapter for Databricks.
https://databricks.com
Apache License 2.0
229 stars 119 forks source link

Advanced MERGE incremental settings #245

Closed mi-volodin closed 1 month ago

mi-volodin commented 1 year ago

Describe the feature

Currently merge conditions are not supported in merge strategy.

If following the Databricks terminology, merge conditions are matched_conditions and not_matched_conditions in MERGE definition.

Taken from DBricks docs:

MERGE INTO target_table_name [target_alias]
   USING source_table_reference [source_alias]
   ON merge_condition
   { WHEN MATCHED [ AND matched_condition ] THEN matched_action |
     WHEN NOT MATCHED [ AND not_matched_condition ] THEN not_matched_action } [...]

As for now the full power of MERGE statement is not supported.

Describe alternatives you've considered

Writing customized adapter with own merge macro to perform such operations. Add conditions to configuration for model.

Additional context

Databricks is more and more often used as a tool to support a variety of data operations that was previously relied on some analytical MPPs. Specifically preparing some DWH-like normalized models for long-term entity-oriented storage and creating non-denormalized data marts.

One of the drawbacks of the merge is that update operation is basically DELETE + INSERT, so it creates some additional expenses for each record that you are updating. So it is widely used techniques to minimise such updates by additional conditions which may apply only for already observed records, and thus can't be defined as merge predicates (in dbt terminology).

Moreover, there are business cases that require that. For example, I am tracking some object and its properties as an entry of the form <o, P>. If I observe some specific instance of such record (<1, A>) I may want to skip all consequent updates if the properties are not changing. And do the update only if I see <1, B> (for example).

Who will this benefit?

Anyone who uses MERGE as a strategy for model updates and requires additional business checks for the records before making updates / inserts.

Are you interested in contributing this feature?

Probably, but due to variety of possible solutions I will appreciate the solution design suggestion to be discussed with maintainers.

adrianabreu commented 1 year ago

Any updates on this? Seems an interesting feature

mi-volodin commented 1 year ago

No updates. Just to note, that there's a work-around that currently might work - with dbt you have to manually rebuild the whole update set by your model. That makes the model code unbearably huge, but it works.

For me this just outlines the cases when dbt modelling approach is not nice. Also the usage of null-aware joins I think is missing as a feature, as MERGE is frequently used for denormalized "cubes" where missing dimensions are allowed to be NULL. So using <=> on merge predicates instead of = is also required. And I don't know a workaround for this.

github-actions[bot] commented 1 year ago

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue.

mi-volodin commented 5 months ago

Okay, now I feel the urge to make a try and come up with my own solution.

What I generally propose as the capabilities:

I see that there's a connected context in #645 . @IvanDerdicDer are you really on it? Do you see any potential conflicts with what is proposed above?

Also, @benc-db WDYT about creating the umbrella issue that will highlight the gap between Databricks merge capabilities and dbt implementation, so we can track the progress.

IvanDerdicDer commented 5 months ago

Hey,

Still not working on it, collage got the priority. I was thinking mostly the same but just for the match statement, rest of it sounds good.

I did a proof of concept at work when we were developing delta data delivery method, and we tried to create a custom merge connector. That didn't work because there is no support for custom connectors. In short what I did was that I just copied the merge_update_columns function and modified it to return an update condition ie. one column not equal. Haven't tested it because custom incremental strategies don't work but the generated SQL looked good.

I believe the same approach can be utilized for everything you described. Also I believe that you don't have to worry about string injection, none of the functions that take merge_update_columns, or merge_exclude_columns take that into account. As well most of dbt code is generally written by hand and not auto generated by an API or such.

mi-volodin commented 5 months ago

Ok, I also will wait for @benc-db response.

I also see a potential issue with the matched conditions, as they have to be explicit. And therefore should use aliases.

And here it seems that it creates unnecessary bound with the aliases in the code.

So if I understand correctly now one should write

DBT_INTERNAL_DEST.A > DBT_INTERNAL_SOURCE.A

I know that it is possible to workaround this with particular placeholders that we can automatically transcribe. For instance, allow to write dest.A > source.A... But I am not sure it is solid.

benc-db commented 5 months ago

@mi-volodin at the moment I don't have capacity myself, but I'm happy to help you develop a solution. I would recommend after your initial investigation that you write your proposed approach here (what files you plan on modifying, and how), and I can give you some initial feedback before the PR. Once you put a PR out, I can pull/push to get our integration tests to run on it.

IvanDerdicDer commented 5 months ago

I don't see a need for transcribing DBT_INTERNAL_SOURCE/DEST seem fine to me. Plus they would be autogenerated, at least that is what I had in mind. You would pass a list of columns that need to be different so that update occurs. To achieve that i modified get_merge_update_set function. Replaced '=' with '<>', and ',' with 'AND'. That works for wast majority of upserts/merge operations I believe.

From what I gather you want to let the user write a SQL string as a condition. I don't think that is necessary, a list of columns should suffice. But if you decide to go the SQL string route then the same as for the incremental predicate the user should write DBT_INTERNAL_SOURCE/DEST.

mi-volodin commented 5 months ago

@IvanDerdicDer the matched conditions could be an arbitrary predicate, very complex in general case. So I probably don't follow, but I see no way to autogenerate it.

For example, if I want something like this

when matched 
    and SRC.change_ts > TGT.change_ts
    and hash(SRC.a, SRC.b) <> hash(TGT.a, TGT.b)

I will have to either use DBT_INTERNAL_SOURCE here as an alias (which I don't think is smart), or introduce special hardcoded aliases like I used above which will be then mapped to DBT_INTERNAL_SOURCE/TARGET.

Yes, as an alternative - it is possible to limit the space of predicate that could be used, and then use generators.

@benc-db got it. Will get back.

mi-volodin commented 5 months ago

@benc-db I am starting with a higher level vision, will be glad to specify more after the discussion.

Features logical descriptions

In general I think it make sense to extend the functionality with the following features

Ideally it would be great to also extend for support of WHEN NOT MATCHED BY SOURCE condition, but let's keep it out of scope due to complexity. Though, I would keep it in mind, to narrow the solution space for better compatibility.

The matched_conditions and non_matched_conditions

Both of the features will expect the following:

Important note is that for now I don't see any other way but to allow explicit predicates, where user should use source and target aliases to define the logic. The surfaced solution is that we allow to use specific aliases like s. / t. or src./tgt. (maybe several) to describe logic and then transcode it to this "DBT_INTERNAL_SOURCE" thingy after.

If we keep this idea - that will require 1 extra macro in strategies.sql.

Also I don't know if DBT_INTERNAL_* prefix is required for something specific, otherwise we could just replace it with SRC/TGT and allow the explicit usage of it.

The skip_ flags

First of all, I understand that it is not, probably, very elegant. As ideally we actually should expect the user to ASK for a certain step. And why we perform both insert and update by default is probably the dbt standard (but not the SQL style).

So if we later go for WHEN NOT MATCHED ON SOURCE - the implementation will have to differ with insert/update blocks.

Nevertheless, I would be glad to have that logic implemented, so we can avoid ugly dbt hacks with left anti join "if target exists" to avoid updates.

From the implementation perspective I also see for now that it can be done on the databricks__get_merge_sql macro level and will require additional keys in config.

Summary of changes

Questions to clarify

benc-db commented 5 months ago

Also I don't know if DBTINTERNAL* prefix is required for something specifi

To the best of my knowledge, these are just aliases assigned to help the readability of the generated SQL. You could use a different placeholder, it will just need to be documented.

I am not sure if we need to care about security... The example with matched_conditions might allow string injections. There are cases when it is dangerous, but they are anyway superseded by the ability to explicitly put any code in the model. Thus - I ignore that risk.

Agreed, safe to ignore as we already allow users to write arbitrary SQL in their dbt project.

There's an options key in the config. It is not currently used in merge and I am not sure if I need to move certain config keys in the options dict actually.

I believe this is for OPTIONS clause on table create, which are nearly the same as TBLPROPERTIES, but I think older, Hive-related; should not be reused.

Any new config that is not defined by dbt-core can be read from model.config.extra dict, so you don't strictly need to modify DatabricksConfig either, but it's probably a good idea for visibility sake. I believe adding them to DatabricksConfig makes them available in config, rather than config.extra.

Could you give a couple of examples of config and what the translated SQL should like?

mi-volodin commented 5 months ago

Any new config that is not defined by dbt-core can be read from model.config.extra dict, so you don't strictly need to modify DatabricksConfig either, but it's probably a good idea for visibility sake. I believe adding them to DatabricksConfig makes them available in config, rather than config.extra.

Then it probably makes sense to prototype first with config.extra and if we will not receive any objections from the review - extend the config. For users it will make no difference if I understand correctly.

Could you give a couple of examples of config and what the translated SQL should like?

Yeah, let me prepare few examples.

mi-volodin commented 5 months ago

@benc-db I generated couple of examples, will attach the file to save space, but here are some of them.

Example with step skip.

/*
    skip_matched_step: True,
    skip_not_matched_step: False,
    matched_conditions: 'None',  
    not_matched_conditions: 'None',
*/

merge into 
    TABLE1 as tgt
using 
    TABLE2 as src
on 
    predicates ...
when not matched
then insert >>get_merge_insert(on_schema_change, source_columns) <<

Example with matched conditions

/*
    skip_matched_step: False,
    skip_not_matched_step: False,
    matched_conditions: 'src.updated_at_ts > tgt.updated_at_ts or tgt.updated_at_ts is NULL',  
    not_matched_conditions: 'some_attribute > 0',
*/

merge into 
    TABLE1 as tgt
using 
    TABLE2 as src
on 
    predicates ...
when matched
    and src.updated_at_ts > tgt.updated_at_ts or tgt.updated_at_ts is NULL
then update set 
     >> get_merge_update_set(update_columns, on_schema_change, source_columns)<<
when not matched
    and some_attribute > 0
then insert >>get_merge_insert(on_schema_change, source_columns) <<

This is how it works right now

/*
    skip_matched_step: False,
    skip_not_matched_step: False,
    matched_conditions: 'None',  
    not_matched_conditions: 'None',
*/

merge into 
    TABLE1 as tgt
using 
    TABLE2 as src
on 
    predicates ...
when matched
then update set 
     >> get_merge_update_set(update_columns, on_schema_change, source_columns)<<
when not matched
then insert >>get_merge_insert(on_schema_change, source_columns) <<

output.txt

mi-volodin commented 5 months ago

One of the problem I don't know how to elegantly fight with - is that these aliases. In my example I used src and tgt to distinguish between the source and destination, but what I initially thought I would do was something like this

{{ conditions_str | regexp_replace('(^| )s[\w]+\.', 'DBT_INTERNAL_SOURCE.', ...flags...) }}

i.e. we allow to write any alias like source., s., Src. etc and replace it.

But appears that for this I will have to register a custom filter for Jinja, which I would prefer to avoid to keep complexity lower.

benc-db commented 5 months ago

But appears that for this I will have to register a custom filter for Jinja, which I would prefer to avoid to keep complexity lower.

Anything that is not just formatting I would put in the Adapter class and write it/unit test it in python. I hate Jinja personally :P. I wish we had better mechanisms for exposing logic than just always throw in the adapter class, but using Python will make testing/maintenance simpler in the long run. Also, if you look at tests/unit/macros, I've tried to make it more painless to tests Jinja macros, but it's still a bigger lift than doing the same in python.

benc-db commented 5 months ago

I think you're in a good place to do a PR. I think I will understand the nuances better once I see some tests :).

mi-volodin commented 5 months ago

Anything that is not just formatting I would put in the Adapter class and write it/unit test it in python.

Gosh... it's my Friday evening dumbness. Why on earth haven't I thought about Python side implementation 🤦🏻‍♂️ I also hate Jinja.

Ok, I will need some time to implement a draft and go through all contributions preparation steps. And of course pass through the tests 😄

benc-db commented 5 months ago

@mi-volodin take your time. This seems like a great include for 1.9, which I'm thinking will land sometime in July. I appreciate your effort/contribution!

mi-volodin commented 4 months ago

@benc-db I ran functional tests on all-purpose cluster (the only one option I have now). And 6 tests in total are failing.

image

No code change done yet. Since it is likely not OK, I wanted to ask for a quick tip from an experienced maintainer. Probably I missed some requirement for DBR Runtime, for example.

I will dig into it anyway, but hopefully you can help me to save some time.

LOG ```bash FAILED tests/functional/adapter/incremental/test_incremental_on_schema_change.py::TestIncrementalOnSchemaChange::test_run_incremental_ignore - dbt_common.exceptions.base.DbtRuntimeError: Runtime Error FAILED tests/functional/adapter/basic/test_snapshot_check_cols.py::TestSnapshotCheckCols::test_snapshot_check_cols - AssertionError: dbt exit state did not match expected FAILED tests/functional/adapter/basic/test_snapshot_timestamp.py::TestSnapshotTimestamp::test_snapshot_timestamp - AssertionError: dbt exit state did not match expected ERROR tests/functional/adapter/persist_docs/test_persist_docs.py::TestPersistDocsWithSeeds::test_reseeding_with_persist_docs - AssertionError: dbt exit state did not match expected ERROR tests/functional/adapter/simple_snapshot/test_snapshot.py::TestSnapshot::test_deletes_are_captured_by_snapshot - AssertionError: dbt exit state did not match expected ERROR tests/functional/adapter/simple_snapshot/test_snapshot.py::TestSnapshot::test_revives_are_captured_by_snapshot - AssertionError: dbt exit state did not match expected ```
benc-db commented 4 months ago

@mi-volodin if you want to email me your cluster config, and we can compare notes? ben.cassell@databricks.com. Also, good call running all of the integration tests before code change, cause otherwise we'd be running down dead ends :)

kmarq commented 4 months ago

@mi-volodin will you be adding support for the delete option of the merge statement as well? I know that's something we were just looking to do but it looks like you are well ahead of us with your ongoing changes.

mi-volodin commented 4 months ago

@kmarq Yes, I was thinking about this, as the next step. Also there's a request for SCHEMA EVOLUTION in #707, which also looks relatively straightforward.

But I would certainly commit only once I can achieve the first changes passing the tests.

@benc-db 🙏 sent you an e-mail.

mi-volodin commented 4 months ago

Short update. Tests are still failing, but I have studied the logs and found the following reasons:

  1. One test is failing, because it tries to store the data in s3 bucket (which is non-existent). I think it is fine, because it is not supposed to work in my case. Probably that test should not be included in my test-set (General purpose cluster).
  2. Other 5 tests are failing because of the same error of duplicated reference to dbt_unique_key. My hunch is that it is because of that peace of code that snapshot macro generates:
    snapshotted_data as (

        select *,
            id as dbt_unique_key

        from `hive_metastore`.`snapshots`.`target_snap`
        where dbt_valid_to is null

    ),

And the reason for failure is that there is already a dbt_unique_id in the target_snap. Thus in snapshotted_data CTE there are exactly 2 dbt_unique_id attributes.

Databricks allows this (at least in Runtimes < 15)

image

@benc-db I need an advice if I need to deal with this. Seems also an interesting problem, but it also might be related to some of my local policies, so if the test passes during pull request review - I can try to just ingore that precise reason in the context of my local dev tests.

UPD I asked support if it is fine to return no error in the case above (select * works in my snippet).

benc-db commented 4 months ago

One test is failing, because it tries to store the data in s3 bucket (which is non-existent). I think it is fine, because it is not supposed to work in my case. Probably that test should not be included in my test-set (General purpose cluster).

This is almost certainly fine. External location tests should happen with all types of compute, I believe, but they require setup outside of Databricks.

kmarq commented 4 months ago

@mi-volodin I was wondering if you have any thoughts on this being near completion? Is there anything that I could help assist with testing?

mi-volodin commented 4 months ago

@kmarq Sorry, actually I had not much time working on this. And since it is my first PR in this repo, getting it to the point even with minor changes is tricky.

Let's collaborate! I think we can focus first on adding flags and cover it with tests. Then I finally become acquainted with the whole process, I will add the more challenging change with match conditions.

WDYT?

p.s. @benc-db haven't found it in the rules. Is it okay to open WIP Draft PR here, or is it discouraged?

benc-db commented 4 months ago

@mi-volodin absolutely open a draft :)