apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Does spark.sql("MERGE INTO") supports schema evolution write option #8502

Open jhchee opened 1 year ago

jhchee commented 1 year ago

Describe the problem you faced I have created a table with 2 columns namely userId and updatedAt. Now I'm passing new column nested in the merge into command but gotten an exception.

spark.sql("" +
                "MERGE INTO target USING source ON target.userId = source.userId " +
                "WHEN MATCHED THEN UPDATE SET target.nested = struct(source.colA), target.updatedAt = source.updatedAt " +
                "WHEN NOT MATCHED THEN INSERT (userId, nested, updatedAt) " +
                "VALUES (source.userId, struct(source.colA), source.updatedAt)" +
                "")

Error

Cannot resolve 'target.nested`

To Reproduce

Steps to reproduce the behavior:

  1. Merge into command with new column specified.
  2. Try setting .config("hoodie.schema.on.read.enable", "true") doesn't help.

Expected behavior The schema should evolve and detect that this is a new column.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

ad1happy2go commented 1 year ago

@jhchee Spark sql parser doesn't supports this so not sure if we can do anything on our end. All configs comes into play during the execution of sql.

you can do ALTER table first and add column before calling the merge.

kazdy commented 1 year ago

@ad1happy2go
Not sure if this is really something blocked by spark sql parser, as an example Delta Lake supports schema evolution in MERGE INTO (both for partial updates as well as for update and insert ): https://docs.delta.io/latest/delta-update.html#-merge-schema-evolution

Would be great to have something similar in Hudi. Currently, Hudi tries to use target table schema during MERGE INTO (and drops incoming columns if schema is wider for example).

ad1happy2go commented 1 year ago

@kazdy @jhchee You are correct, this should be supported for MERGE INTO.. I confirmed master also doesn't support it. Attaching the same code which should work.

create table test_insert3 (
    id int,
name string,
updated_at timestamp
) using hudi
options (
    type = 'cow',
    primaryKey = 'id',
    preCombineField = 'updated_at'
) location 'file:///tmp/test_insert3';

merge into test_insert3 as target
using (
    select 1 as id, 'c' as name, 1 as new_col, current_timestamp as updated_at
union select 1 as id,'d' as name, 1 as new_col, current_timestamp as updated_at
union select 1 as id,'e' as name, 1 as new_col, current_timestamp as updated_at
) source
on target.id = source.id
when matched then update set target.new_col = source.new_col
when not matched then insert *;

Create JIRA to track - https://issues.apache.org/jira/browse/HUDI-6483

Feel free to contribute.