delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

[BUG][Spark] A merge executed with a generated column requires the source to have the generated column #3318

Open tigerhawkvok opened 4 months ago

tigerhawkvok commented 4 months ago

Consider this code merging data with a generated column:

# Databricks notebook source
tableName = "TARGET_SCHEMA.generatedTableTest"

# COMMAND ----------

spark.sql(f"DROP TABLE IF EXISTS {tableName}")

# COMMAND ----------

import warnings
from pyspark import pandas as ps
from pyspark.pandas.utils import PandasAPIOnSparkAdviceWarning
warnings.simplefilter("ignore", category= PandasAPIOnSparkAdviceWarning)

# COMMAND ----------

df = ps.DataFrame({"foo": [1,2,3,4,5], "bar":[6,7,8,9,0]})
df.display()

# COMMAND ----------

from delta.tables import DeltaTable
from pyspark.sql.types import LongType
deltaSession = DeltaTable.create(spark)
dTableBuilder = deltaSession.tableName(tableName)
dTableBuilder.addColumns(df.to_spark().schema)
dTableBuilder.addColumn("baz", LongType(), generatedAlwaysAs= "foo + bar")
dTable = dTableBuilder.execute()

# COMMAND ----------

mergeBuilder = dTable.merge(df.to_spark(), condition= "1 = 1").whenMatchedUpdateAll().whenNotMatchedInsertAll()

# COMMAND ----------

try:
    mergeBuilder.execute()
except Exception as e:
    print("***We raised an error! As of 20240627 this will say 'baz' is missing***\n\n")
    print(e)

Observed results

The merge fails, unable to resolve the generated column. The error will be (or close to)

[DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve baz in UPDATE clause given columns foo, bar.

Expected results

The generated column is, well, generated from the inputs and as such is unnecessary to specify.

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

tigerhawkvok commented 4 months ago

You can workaround this by enumerating every non-generated column in the "All" functions, but that kind of misses the point of both those functions and generated columns IMO. If a column is missing from the source and the target is generated, it should be skipped during validation.

I think (not actually knowing Scala) that

https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/deltaMerge.scala#L145

and

https://github.com/delta-io/delta/blob/b7da7f40a3955d3af3f279f7d18483e686d8d286/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala#L124

can escape the for-each assertion in those cases.