dbt-labs / dbt-core

dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.
https://getdbt.com
Apache License 2.0
9.77k stars 1.62k forks source link

Incremental models should handle column additions/deletions #1132

Closed drewbanin closed 3 years ago

drewbanin commented 5 years ago

Feature

Feature description

dbt should add or remove columns in the destination table for incremental models to make the target schema match the schema produced by the model SQL. dbt already has some existing logic that's responsible for "expanding" column types to make model SQL and the target table share a compatible schema. dbt should use a similar mechanism to add new columns and delete extraneous columns as required to make the target schema and the model SQL identical.

Ideas:

Caveats:

Who will this benefit?

users of incremental models

elexisvenator commented 5 years ago

Well.. this will open a can of worms.

Personally I am against any implicit smartness in this area by dbt. I feel this will only lead to confusion about why data has unexpected values.

Some other considerations:

bobzoller commented 5 years ago

dbt handling schema changes automatically is akin to how fivetran/stitchdata ingestors behave. the upside is they don't stop ingesting, the downside is you inevitably end up chasing down issues and fixing things retroactively.

I think I'd prefer being forced to deal with the discrepancy when it happens (eg dbt should throw an error). My only issue with the current behavior is there seemed to be some cases where it would fail silently and others where it would throw an error.

Perhaps there's still value so long as you have to opt-in to this behavior?

clrcrl commented 5 years ago

I think it's undesirable that there is currently no warning whatsoever, however I tend to agree with @elexisvenator that implicit smartness may not be the way to go.

A happy medium might be to have dbt warn you, and provide migration statements that need to be run. Something along the lines of:

The incremental model "analytics"."events" contains column(s) that do not exist
in the target table. These columns WERE NOT appended to the target table.
To append these columns:
- If backfilling this data is important, run a full-refresh on this table, or,
- If backfilling this data is not important, and it is desirable to preserve the
existing target table, manually update the target table to include the new
column(s), by running the following SQL:
    alter table "analytics"."events"
        add column "my_new_column" int default null

^ even that doesn't feel great though, but just an idea

adrianisk commented 5 years ago

I'd definitely be in favor of having this as opt-in behavior. What @clrcrl wrote up sounds great when you're not opting-in, but if dbt knows what needs to be done, and is capable of doing it, why not provide that option? The alternative is everyone who wants this ends up building it themselves in whatever tool they're using to schedule dbt (we've actually already built something like this in an Airflow operator).

clrcrl commented 5 years ago

Oh so maybe like:

in the target table. These columns WERE NOT appended to the target table:
- my_new_integer (int)
- my_new_varchar (int)

To append these columns:
- If backfilling this data is important, run a full-refresh on this table by running:
dbt run --models events --full-refresh
- If backfilling this data is not important, and it is desirable to preserve the
existing target table, run:
dbt run --models events --expand-target-cols
adrianisk commented 5 years ago

Or maybe add it as an option in the model configuration when the model is incremental? That way model owners could decided on an individual basis what the behavior should be when the schema changes. Default to do nothing, but give the option to add/remove columns automatically and also the option to do a full refresh when doing so. Putting it in the model config would also avoid adding yet another command line flag which I'm always in favor of

markgolazeski commented 5 years ago

Proposal

Problem

Right now, we (Convoy) use Airflow workflows to run DBT jobs that transform data across several different Snowflake schemas. It’s fairly hands off, with little need for any users to manually run DBT commands. The current, painful, exception to this is when there is a schema change (mostly columns being added/removed), and a manual DBT command is needed to kick off a full refresh. Since this is predominantly the only DBT command that our users run, it is often missed as schema changes are rolled out, causing workflow failures until the command is run and the data is populated. If either DBT or the Airflow workflow were able to handle the logic of the schema change and full refresh, we could have a more reliable workflow.

It’s also worth noting that right now we don't have a use case where we add columns and do not want to do a backfill of the data (i.e. we never want to add columns that have null historical data).

Solution

Based off some brief brainstorming conversations with drewbanin and andriank-convoy, I think there are two potential DBT code changes that can help solve our problem.

Both would require the addition of a new model config that allows incremental materialization users to opt-in to the new functionality:

{{ config(materialized="incremental", on_schema_change=<VALUES>) }}

Where valid VALUES are defined in one of two ways:

Option 1

Add support for ignore (default), add, add_remove, add_refresh, and add_remove_refresh values. The ignore configuration value is the default and maintains current behavior. Values ending in refresh will kick off a full refresh when passed in, while values not ending in refresh will simply make the column change. The add and add_refresh values mean these actions will only be taken when a column is added to the schema, while the add_remove and add_remove_refresh values mean the actions will be taken when a column is added or removed.

Option 1 has the benefit of allowing users to choose between doing a full refresh or only changing the schema going forward, and also allows DBT to be the one system for handing all schema changes. However, this option requires three sets of work: Implementing the logic for column addition, implementing the logic for column deletion, and implementing the logic for kicking off the full data refresh from an incremental materialization. The schema changes would need to be implemented for all supported databases which increases the implementation cost, and right now the full refresh logic is only kicked off at the beginning of the DBT code, so some inversion of logic might(?) be necessary.

Option 2 (Recommended)

Add support for ignore (default) and fail values. The ignore configuration value is the default and maintains current behavior. The fail value, on detection of a schema change, will cause DBT to return a unique exit code and error message indicating there is a schema change and the updated model will not be processed because of this configuration option.

Option 2 allows users to opt for DBT to fail fast instead of using DBT to perform the required schema change. The user is allowed to make the decision about what to do immediately, instead of waiting for later data failures. Users will be able to differentiate requests that are using an incompatible schema from requests that have other errors, such as from invalid SQL syntax. This enables workflows to automatically kick off a full-refresh when they encounter the schema changing use case. One risk is that this option will need a unique error code to be allocated and a clear error message to prevent user confusion. This option also may require work across all supported databases, and would not support the case when users want to add a new column but not include historical data.

Next steps

To unblock our use case, I'm hoping to:

  1. Add a new model configuration on_schema_changes.
  2. Add support for the ignore (do nothing) and fail (exit with a specific non-zero error code) values.

This approach is smaller scope than this full issue, but I think this approach would also lay scaffolding for future values as discussed in Option 1. They could be added as new values for the on_schema_change configuration to support more addition/removal of columns within DBT.

I'd appreciate any feedback or comments on these approaches and how they might fit into DBT development. Thanks!

drewbanin commented 5 years ago

Thanks for the really thoughtful writeup @markgolazeski! I hadn't considered failing here as a viable answer to this question, but it definitely makes sense (and should hopefully be easy to implement).

One thing to be aware of here is that full refreshes account for two types of changes:

While the comments above all provide examples of how to account for schema changes, I'd also want to consider what we should do (if anything) about logic changes.

Schema changes

I'd be interesting in supporting the following on_schema_changes values:

I think the add_refresh and add_remove_refresh options outlined above don't exactly map to code. If dbt does a "refresh", it's just going to drop the table and recreate it, automatically making the table schema congruent to the schema defined in the model code.

I do think there's merit to adding columns (backfilling them with nulls), but that doesn't sound like a use case that's present for your team at present. Of these, ignore and fail are certainly going to be the most straightforward to implement!

One last thing to say here: Column types sometimes change in reconcilable ways. Imagine that the first time you run an incremental model, the longest last_name column value you see is 16 characters long. Then imagine that a subsequent run processes a new last_name which is 32 characters long. At this point, the schema for that resulting last_name column has changed from varchar(16) to varchar(32). dbt currently accounts for changes like this, and we'd have to make some decision about whether or not these constitute "schema changes".

Logic changes

Schema changes are easy to wrap your head around, but I think that if we revisit full refreshes, we'll also need to account for logic changes too.

You can imagine a model with logic like:

select sum(order_price) as revenue

which is later changed to:

select sum(order_price - order_tax) as revenue

While the schema isn't changing here, this logical SQL change will take effect for all new records processed by the incremental model, but historical data will be incorrect! A full refresh ensures that "old" and "new" records are both processed with the same logic.

Just in the interest of brainstorming, I've always imagined that we'd handle cases like this by MD5ing the model code and inserting it as a column into the incremental model. The incremental materialization could have some mechanism to compare the MD5 that was used to process a record with the MD5 of the model that's running. If these two MD5s don't match up, then we'll know that some records were processed with a previous version of the incremental logic.

This obviously isn't perfect (adding a single space to the end of model will look like a more fundamental logic update...) but I'm not sure of any alternate methods for solving this problem. In practice, this would look something like:

Table: analytics.orders

| id | dbt_model_md5 | | 1 | abc123 | | 2 | abc123 |

We can run a query like:

select count(*) from analytics.orders where dbt_model_md5 != '{{ md5_of_current_model }}'

If the resulting count is > 0, then we know that some sort of model change has occurred since the last time dbt ran.

So, some questions for you @markgolazeski (and anyone else following along):

markgolazeski commented 5 years ago

Thanks for the great response @drewbanin! I agree with your thoughts around the on_schema_change values. My thoughts on your questions:

adrianisk commented 5 years ago

I agree that handling logic changes would be great, but I think that's a separate (and much more complicated) issue.

I like the idea of adding an on_schema_change per-model config option, and implementing the first option other than ignore: fail & return a specific error code. It's pretty simple, solves our immediate problem, and I think it establishes a good pattern fo handling things like this going forward (both for additional on_schema_change options, and potentially for on_logic_change options).

drewbanin commented 5 years ago

Got it, I agree that on_schema_change is probably the right place to start here.

In thinking through this a little further, I'm not sure that setting a custom exit code is going to work for us here. Imagine that we assign error code 7 to this type schema migration error, but we also assign error code 8 to some other type of error. What should the return code be if both of these errors occur in the same run?

I think dbt does too many things in an invocation for error codes to be a sensible way of communicating specific failures back to the caller. Instead, I think we might be better served by adding additional information to the run_results.json file for incremental models that fail with this type of error. In the future, dbt's API will likely return results using this same contract too.

markgolazeski commented 5 years ago

Yeah, I can totally see the limitations of using exit code because of DBT's nature of running multiple transactions. I can support making this information part of the run_results.json, with consumers looking at that to make followup decisions.

My initial thought is to make it the error that gets returned, but my concern would be that matching against the specific error message seems like a more error-prone strategy.

Do you consider the node object within the run_results.json completely stateless (i.e. it should only reflect the input of the run as opposed to the output)? If so, then I think the most reasonable options are to either change the error result string into an object with a code and message (although this would be backwards incompatible), or add a new property on the result to reflect the code and convey the same information. Maybe something like code, result_code, or error_code?

I think adding a new field is probably the least intrusive, but I'm not sure if you have thoughts around what you'd want the future API to look like, and whether moving towards that is worth doing now.

bashyroger commented 4 years ago

Some input on what I think is doable / copied from my slack input: Scenario's that should be doable:

1) Removing a attribute. Only requires comparing the inbound query and target model schema, deleting attributes from target when it does not exist in inbound query

2) Adding a new attribute This comes down to adding the new attribute without backfilling the new attribute in existing rows (or backfill it with a default per data type), but only when the incremental load SCD strategy is of type- 0; _only add data to and incremental model based on a new matching key does not exist in target (https://en.wikipedia.org/wiki/Slowly_changing_dimension)_

3) Data type change Attribute schema data type change where the schema becomes more loose; IE from int to string

Trying anything more complex then this (IE, and SCD-1 or 2 type comparison where you need to start comparing the values of all model attributes to prevent dups) will become inherently more complex to do... IMO changing an existing column now also falls in this category: how to separate that from removing an existing column and adding a new one at the same time?

Further, IMO we should not focus on supporting logic changes here, if you do that and you want to backfill that logic, you should make the conscious decision of still doing a full-refresh.

In addition, if you want to change your logic, but still be able to do an deterministic full refresh to an incremental model without changing past data, one should make business rules time sensitive. Taking the logic change example change from @drewbanin, making that time sensitive would boil down to writing it like this

SELECT case when REPORTING_DATE_COLUMN >DATE_OF_RULE_CHANGE then sum(order_price - order_tax) else sum(order_price) END as revenue

discarding the fact that I probably would be against this / solve this by introducing a new measure with a different name...

AndrewCathcart commented 3 years ago

Any movement on this? This is the last thing we're missing explicit handling for in our CICD pipeline.

PatrickCuba-zz commented 3 years ago

Hello, I think schema evolution has merit within some rules (in my mind this is only applicable to incremental and snapshots);

  1. A new column
    • Business is notified, business approves or declines adding the new column, column was sent in error.
  2. Existing column is amended, that depends
    • Same type but column length changed - easy to change and does not cause a failure or problems for the existing data in the column (column length increase only!).
    • Different type - extremely rare but possible, what happens to the data in the column already? Must be managed by business, existing or new column is renamed but not deleted
  3. Column is deprecated - do not drop the column, retain the column but populate it with null; no issue here.

Of course all of these are optional and should be and should not be the default behaviour, only if an enterprise "opts in".

The merits for having this option are:

Vojtech-Svoboda commented 3 years ago

Hello, this issue has been opened for 2 yrs now so I suppose it is dead, right?

jtcohen6 commented 3 years ago

@Vojtech-Svoboda This issue is very much alive. It's just not something we can prioritize right now, since we're focused on the stability and consistency improvements that we need to make before we can release dbt v1.0.0.

I'm hopeful that we can give materializations some love next year, not just to refactor some code (which has grown quite gnarly), but also to add long-lived-and-popular features such as this one. In the meantime, the detailed proposals are seriously helpful in grounding that future work.

fabrice-etanchaud commented 3 years ago

Hi ! Recently I had to reopen "building a scalable data warehouse with datavault 2.0" from Linstedt & Olschimke, and found a very clever solution for data changes detection even in case of structural changes, leading to a insert only strategy. I would like to add it to the discussion, in echo to @PatrickCuba 's post.

I used it to write a materialization for persistent staging area tables.

Provided that you are tracking change with a hash key of the columns (in table physical order) - something like the macro at the end of this post - you don't have to recompute existing rows' hash key under these conditions :

{%- macro hash_diff(colnames) -%}
    md5(
    {%- if colnames | length > 0 -%}
        {%- set strings = [] -%}
        {%- for colname in colnames %}
            {%- do strings.append("upper(trim(cast(" ~ colname ~ " as text)))") -%}
            {%- if not loop.last -%}
                {%- set next_strings = [] -%}
                {%- for next_string in colnames[loop.index:] -%}
                    {%- do next_strings.append("cast(" ~ next_string ~ " as text)") -%}
                {%- endfor -%}
                {%- do strings.append("case when " ~ batch_function_call('coalesce', next_strings, 100) ~ " is not null then ';' end") -%}
            {%- endif -%}
        {%- endfor -%}
        {{ batch_function_call('concat', strings, 100) }}
    {%- else -%}
        ''
    {%- endif -%}
    )::uuid
{%- endmacro %}
jtcohen6 commented 3 years ago

Thanks to the amazing work of @matt-winkler over the past few months, this will be coming in dbt v0.21 :)

Docs forthcoming. Check out #3387 for details.