delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.65k stars 1.72k forks source link

[BUG] Unstable error messages when merging null values on NOT NULL columns #1279

Open pedrosmv opened 2 years ago

pedrosmv commented 2 years ago

Bug

Describe the problem

​ We have a merge operation using Delta + pySpark that deals with CDC data, mostly Inserts and Updates. On our testing we found out that the behaviour when dealing with null values on NOT NULL columns is very erratic. Depending on the operations, we have different results. ​

Steps to reproduce

Spark setup:

def spark_loader():
    builder = pyspark.sql.SparkSession.builder.appName("testing-schema-evo") \
        .master("local[4]") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog",
                "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.databricks.delta.schema.autoMerge.enabled", "true")

    return configure_spark_with_delta_pip(builder).getOrCreate()

​ Create the table:

spark.sql(f"""
CREATE TABLE IF NOT EXISTS delta.`testing-schema-evo` (
name STRING NOT NULL,
age STRING NOT NULL
) USING DELTA
""")

​ Load data into the table: ​

test_data = [
        {"name": "charly", "age": "sixteen"},
        {"name": "fabien", "age": "one"},
        {"name": "sam", "age": "two"},
        {"name": "sam", "age": "three"},
]

# Create the dataframe

test_data_df = spark.createDataFrame(Row(**data_row) for data_row in test_data)

# Then we merge using the first df:
condition = "current.name = new.name"
deltaTable = DeltaTable.forPath(spark, "delta-tables/test_table")
deltaTable.alias("current") \
    .merge(test_data_df.alias("new"), condition) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

​ After loading the initial data, we run the merge again, trying to load data with null values:

test_data_with_null = [
    {"name": "joson", "age": "eleven"},
    {"name": "icarson", "age": None},
    {"name": "sam", "age": None},
]

test_data_with_null_df = spark.createDataFrame(Row(**data_row) for data_row in test_data_with_null)

condition = "current.name = new.name"
deltaTable = DeltaTable.forPath(spark, "delta-tables/test_table")
deltaTable.alias("current") \
    .merge(test_data_with_null_df.alias("new"), condition) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

Observed results

​ ​Running the test with the complete statement, the result is the following:

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:218)
    ... 53 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:500)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:321)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.lang.RuntimeException: Error while decoding: java.lang.NullPointerException
createexternalrow(input[0, string, false].toString, input[1, string, false].toString, StructField(name,StringType,false), StructField(age,StringType,false))
    at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1047)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand$JoinedRowProcessor.$anonfun$processPartition$8(MergeIntoCommand.scala:821)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:304)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311)
    ... 9 more
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:181)
    ... 20 more

When we have only the whenNotMatchedInsertAll(), the result is the expected one:

deltaTable.alias("current") \
    .merge(test_data_with_null_df.alias("new"), condition) \
    .whenNotMatchedInsertAll() \
    .execute()
py4j.protocol.Py4JJavaError: An error occurred while calling o98.execute.
: org.apache.spark.sql.delta.schema.InvariantViolationException: NOT NULL constraint violated for column: age.
​
    at org.apache.spark.sql.delta.schema.InvariantViolationException$.apply(InvariantViolationException.scala:49)
    at org.apache.spark.sql.delta.schema.InvariantViolationException.apply(InvariantViolationException.scala)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.delta.constraints.DeltaInvariantCheckerExec.$anonfun$doExecute$3(DeltaInvariantCheckerExec.scala:87)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:304)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Expected results

Our expected result was the InvariantViolationException in both cases

Further details

Environment information

nkarpov commented 2 years ago

Hi @pedrosmv - thank you for reporting this. Confirming we can reproduce this based on your steps. Since an error is still being thrown and no data corruption occurs, we'll keep this open for now for anyone who might like to contribute and prioritize accordingly otherwise. Thanks!