delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.57k stars 1.7k forks source link

NPE with schema evolution on non-nullable nested struct #796

Open Kimahriman opened 3 years ago

Kimahriman commented 3 years ago

I have hit this issue twice now in production and finally figured out a reproduction. Basically if you schema evolve a non-nullable struct field to add a new nested field, you will get an NPE when trying to read that field:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

from delta.tables import DeltaTable

spark = SparkSession.builder.getOrCreate()

df1 = spark.range(1).withColumn('nested', F.struct(F.lit('a').alias('field1')))
# Pre-create the table to preserve non-nullability
DeltaTable.create(spark).location('file:///tmp/test-table').addColumns(df1.schema).execute()
df1.write.format('delta').mode('append').save('file:///tmp/test-table')

df2 = spark.range(1).withColumn('nested', F.struct(F.lit('a').alias('field1'), F.lit('b').alias('field2')))
df2.write.format('delta').mode('append').option('mergeSchema', 'true').save('file:///tmp/test-table')

# These work
spark.read.schema(df2.schema).parquet('file:///tmp/test-table/*.parquet').select('nested.field2').show()
spark.read.format('delta').load('file:///tmp/test-table').select('nested.*').show()

# This throws NPE
spark.read.format('delta').load('file:///tmp/test-table').select('nested.field2').show()

Error:

java.lang.NullPointerException
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:96)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Interestingly, the first read using parquet on the files directly works fine, or reading as delta with multiple fields, but reading as delta and only selecting that column throws the NPE. Haven't dug too much into why yet, but this would suggest it's a delta issue vs a spark issue, even though the stack trace has nothing delta related? It also only happens when the struct is non-nullable.

Kimahriman commented 3 years ago

Only difference I can see in the codegen between parquet and delta is it looks like the parquet read adds a null check on nested despite the read schema being non-nullable. I guess that can make sense of why delta has the issue. If nested is non-nullable and you skip the null check when processing it, but it's the only field you selected (and hence schema pruned to), the parquet reader might just give you a null value struct instead of a struct with a null value?