delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

[BUG][Spark] INSERT INTO struct evolution in map/arrays breaks when a column is renamed #3227

Open johanl-db opened 5 months ago

johanl-db commented 5 months ago

Bug

Describe the problem

Schema evolution in INSERT doesn't always work properly when the new column is added to a struct nested within an array or map. If another column is renamed, the operation fails when it should succeed.

Steps to reproduce

For example with a map, in python, renaming column key to renamed_key and added a field comment in the a struct inside the map:

(
  spark.createDataFrame([[1, {"event": (1, 1)}]], "key int, metrics map<string, struct<id: int, value: int>>")
    .write
    .option("overwriteSchema", "true")
    .format("delta")
    .saveAsTable("insert_map_schema_evolution")
)
(
  spark.createDataFrame([[1, {"event": (1, 1, "deprecated")}]], "renamed_key int, metrics map<string, struct<id: int, value: int, comment: string>>")
    .write
    .mode("append")
    .option("mergeSchema", "true")
    .format("delta")
    .insertInto("insert_map_schema_evolution")
)
[[DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION](https://docs.databricks.com/error-messages/error-classes.html#datatype_mismatch.cast_without_suggestion)] Cannot resolve "metrics" due to data type mismatch: cannot cast "MAP<STRING, STRUCT<id: INT, value: INT, comment: STRING>>" to "MAP<STRING, STRUCT<id: INT, value: INT>>". 

Note that the struct inside the map isn't evolved to add the new field. Without the unrelated column being renamed, this works well:

(
  spark.createDataFrame([[1, {"event": (1, 1)}]], "key int, metrics map<string, struct<id: int, value: int>>")
    .write
    .option("overwriteSchema", "true")
    .format("delta")
    .saveAsTable("insert_map_schema_evolution")
)
(
  spark.createDataFrame([[1, {"event": (1, 1, "deprecated")}]], "key int, metrics map<string, struct<id: int, value: int, comment: string>>")
    .write
    .mode("append")
    .option("mergeSchema", "true")
    .format("delta")
    .insertInto("johan_lasperas.playground.insert_renamed_array_map")
)

Observed results

The operation fails.

Expected results

The operation succeeds, the table schema is changed to key int, metrics map<string, struct<id: int, value: int, comment: string>> and the data is inserted.

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

Richard-code-gig commented 2 months ago

Hello @johanl-db, Could you please assign this issue to me? I am confident I can help resolve it and would appreciate any guidance as needed.

johanl-db commented 2 months ago

@Richard-code-gig thanks for volunteering for this. I would start by adding a test in e.g. deltaInsertIntoSuite to cover the problematic case and work from there.

I believe this only impacts INSERT by position: