delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.18k stars 1.62k forks source link

Delta merge doesn't update schema (automatic schema evolution enabled) #553

Open louisdubaere opened 3 years ago

louisdubaere commented 3 years ago

Hi,

I am having problems with the Automatic Schema Evolution for merges with delta tables.

I have a certain Delta table in my data lake with around 330 columns (the target table) and I want to upsert some new records into this delta table. The thing is that this 'source' table has some extra columns that aren't present in the target Delta table. I use the following code for the merge in Databricks:

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled ","true")
from delta.tables import *
deltaTarget = DeltaTable.forPath(spark, pathInDataLake)
deltaTarget.alias('target').merge(df.alias('source'), mergeStatement).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

While the documentation on Automatic schema evolution indicates that the schema will be changed when using .whenMatchedUpdateAll() and .whenNotMatchedInsertAll(), this piece of code gives the following error:

AnalysisException: cannot resolve new_column in UPDATE clause given columns [list of columns in the target table].

I have the impression that I had this issue in the past but was able to solve it then with the spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled ","true") setting.

Am I missing something to make the automatic schema evolution work?

jose-torres commented 3 years ago

Looks like you have a trailing space at the end of "spark.databricks.delta.schema.autoMerge.enabled ". Can you check if it works after removing that? (It's unfortunate that there's no Spark handle to catch this and ensure that only valid confs are specified.)

louisdubaere commented 3 years ago

Thanks for your comment, you are right but unfortunately this doesn't solve the problem. The autoMerge option was also enabled in the Spark config of the Databricks cluster so I think it was enabled either way, but it still gives the same error.

Dom-M-C commented 3 years ago

I've come across the same issue when merging a dataframe with 4 new fields into a delta table with >900 fields

gaco commented 2 years ago

I had the same problem. Use spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True) and not spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") and it should work.

roenciso commented 2 years ago

We are having this same issue for our pipelines running on Azure Data Factory. Is there an ETA for the fix? TIA!

thonsinger-rseg commented 2 years ago

Schema automerge, using the same merge to the same delta table, worked in Scala, but not Python. Is there an ETA for the fix? Thanks.

zsxwing commented 2 years ago

@thonsinger-rseg Do you have a reproduction? I tried the following code and it worked for me:

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
from delta.tables import *
path = "/tmp/mergetest"
df = spark.range(20).selectExpr("id", "id as id2")
spark.range(10).write.format("delta").save(path)
target = DeltaTable.forPath(spark, path)
target.alias('target').merge(df.alias('source'), "target.id = source.id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
DeltaTable.forPath(spark, path).toDF().show()
thonsinger-rseg commented 2 years ago

@zsxwing i have a whenMatched() condition that comes before my .whenMatchedUpdateAll() and .whenNotMatchedInsertAll(). that's the only real difference. exact same logic on the exact same delta table worked in Scala, but not Python.

(silver_table.alias("t") .merge(bronze_cdc_df.alias("s"), merge_match) .whenMatchedUpdate("s.DeletedDate IS NOT NULL", soft_deleted_cols_map) .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute())

AlexWRZ01 commented 2 years ago

I have the same issue. A table with about 20 or so columns. I am merging a data frame that has 1 additional column, and a smaller number of records. The new column contains integers and has no null values.

In Scala, this works with no problem.

In Python it fails, on an error similar to:

AnalysisException: cannot resolve 'struct(ID, oldField, newField)' due to data type mismatch: cannot cast struct<ID:int, oldField:int, newField:void> to struct<ID:int, oldField:int, newField:int>;

It seems like the schema is being evolved to a void for the new field, and then merge fails due to mismatch on data types.

zsxwing's example works, but a real table in our warehouse does not.

AlexWRZ01 commented 2 years ago

There is a workaround for this. Do an empty dataframe append with schema merge before doing the delta merge:

df.limit(0).write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(tableName)

Then perform the normal merge using DeltaTable, but don't enable spark.databricks.delta.schema.autoMerge.enabled

For some reason append merge schema works, but delta auto merge does not.

zsxwing commented 2 years ago

@AlexWRZ01 could you provide the table schema and the merged data frame schema if possible so that we can try to create a reproduction? You can just call the schema.json on a DataFrame to get the schema, such as spark.range(10).schema.json().

sdaberdaku commented 1 year ago

I am having a similar problem when I try to use multi-threading to speed up the merge operations of different, independent delta tables with PySpark. When submitting multiple merge operations with parallel threads to Spark, the new columns are not added to the resulting delta tables. I am using the same SparkSession object across all threads (should be thread safe).

zsxwing commented 1 year ago

@sdaberdaku did you hit the same error? Do you have a quick reproduction we can try?

doggerz commented 1 year ago

@zsxwing I have encountered a similar issue while using Delta with Java Spark. I managed to find a workaround by configuring the following properties in the spark-defaults file:

"spark.databricks.delta.schema.autoMerge.enabled": "True",
"spark.databricks.delta.schema.autoMerge.enabledOnWrite": "True"

It's worth noting that I observed Spark being case-sensitive when specifying these properties. If I switch them back to lowercase or use "true" instead of "True," the problem resurfaces. It's important to maintain the correct case and use "True" to ensure the properties are recognized properly.

Furthermore, I should mention that this issue seems to be specific to AWS EMR servers, in my case emr-6.9.0 with delta 2.1.0, as I haven't encountered it in other environments. Therefore, this workaround may be applicable specifically to AWS EMR setups.

I hope this explanation helps in debugging.

zeotuan commented 7 months ago

I am encountering this issue using scala on a standalone cluster with client deployment mode. Going to test to see if

"spark.databricks.delta.schema.autoMerge.enabled": "True"

instead of

"spark.databricks.delta.schema.autoMerge.enabled": "true"

fix the issue

matheus-rossi commented 6 months ago

I am having a similar problem when I try to use multi-threading to speed up the merge operations of different, independent delta tables with PySpark. When submitting multiple merge operations with parallel threads to Spark, the new columns are not added to the resulting delta tables. I am using the same SparkSession object across all threads (should be thread safe).

Having the same issue here. Did you solve the problem ? @sdaberdaku

oliver-was-here commented 4 months ago

@zsxwing i'm experiencing the same issue as those above.

I'm using pyspark on a cluster running LTS 12.2 runtime.

i tried setting the value in the Spark config seen below without success (tried both true and True) image

similarly, i tried setting it explicitly in the script like: