delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

[BUG] Merge Fails with ambiguous errors on some types using python API #1329

Open PadenZach opened 2 years ago

PadenZach commented 2 years ago

Bug

Describe the problem

When trying to merge with condition using python data API sometimes an pyspark.sql.utils.AnalysisException: Failed to resolve is raised.

Steps to reproduce

df =  spark.createDataFrame([(1,2,3),(4,5,6),(7,8,9)]).select("*", F.create_map("_1","_2").alias("mapCol"))
df.write.format("delta").save("/tmp/delta/test_table")
dt = DeltaTable.forPath(spark, "/tmp/delta/test_table")
dt.alias("old").merge(
    df.alias("new"),
    condition=F.expr("old._1 == new._1"),
).whenMatchedUpdate(
    condition=F.expr("old.mapCol != new.mapCol"), # This is where the issue is.
    set={"mapCol": F.col("new.mapCol")},
).whenNotMatchedInsertAll().execute()

Observed results

Traceback (most recent call last):
  File "<stdin>", line 6, in <module>
  File "<snip>io.delta_delta-core_2.12-2.0.0.jar/delta/tables.py", line 897, in execute
  File "/usr/lib/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Failed to resolve
;
'DeltaMergeInto (_1#1150L = _1#785L), [Update [condition: NOT (mapCol#1153 = mapCol#791), actions: [`mapCol` = mapCol#791]]], [Insert [actions: [`_1` = _1#785L, `_2` = _2#786L, `_3` = _3#787L, `mapCol` = mapCol#791]]], false, StructType(StructField(_1,LongType,true), StructField(_2,LongType,true), StructField(_3,LongType,true), StructField(mapCol,MapType(LongType,LongType,true),true))
:- SubqueryAlias old
:  +- Relation [_1#1150L,_2#1151L,_3#1152L,mapCol#1153] parquet
+- SubqueryAlias new
   +- Project [_1#785L, _2#786L, _3#787L, map(_1#785L, _2#786L) AS mapCol#791]
      +- LogicalRDD [_1#785L, _2#786L, _3#787L], false

Expected results

Ideally, it to "Just work", however, since I believe this to be an issue with the type I was comparing, propagating the actual error would the more realistic fix :)

Further details

The type that this happened to me with was a column of MapType, when trying to compare the column against itself, I got the following error:

pyspark.sql.utils.AnalysisException: cannot resolve '(spark_catalog.my_db.my_table.data = spark_catalog.my_db.my_table.data)' due to data type mismatch: EqualTo does not support ordering on type map<string,double>; line 1 pos 0;

I'd strongly prefer to have some of this information in the error reported by deltalake, rather than just "Failed to Resolve".

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

(I'm not well versed in the scala/java side of the project -- if this is something that could be resolved by editing the python bindings I may be able to help out).

nkarpov commented 2 years ago

Thanks @PadenZach. Acknowledging the repro is successful and it's a legitimate issue. The command should not swallow the specific error, in this case that map is not a comparable type

We'll leave this open for now; there is no corruption since an error is still thrown. We'll prioritize it accordingly to improve usability. Thank you!