delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.48k stars 1.68k forks source link

[BUG] UnsupportedOperationException when reading a table with deletion vectors #2246

Closed zerodarkzone closed 9 months ago

zerodarkzone commented 11 months ago

Bug

Which Delta project/connector is this regarding?

Describe the problem

I have a delta table with deletion vectors and with the following features enabled written by Databricks (Databricks-Runtime/13.3.x-photon-scala2.12) image

When I try to read it using pyspark 3.4.1 with delta-lake 2.4.0 which I think is compliant with the requested reader protocol versión, I'm getting a java.lang.UnsupportedOperationException error.

Steps to reproduce

  1. Write a table with deletion vectors enabled using Databricks-Runtime/13.3.x-photon-scala2.12
  2. Try to read that table using the open source delta-lake package versión 2.4.0

Observed results

When trying to do a simple "show" command, it responds with the following error:

23/10/27 15:15:57 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID 356)]
java.lang.UnsupportedOperationException
    at org.apache.spark.sql.vectorized.ColumnarBatchRow.update(ColumnarBatchRow.java:193)
    at org.apache.spark.sql.catalyst.InternalRow.setLong(InternalRow.scala:50)
    at org.apache.spark.sql.delta.DeltaParquetFileFormat.$anonfun$iteratorWithAdditionalMetadataColumns$13(DeltaParquetFileFormat.scala:268)
    at org.apache.spark.sql.delta.DeltaParquetFileFormat.$anonfun$iteratorWithAdditionalMetadataColumns$13$adapted(DeltaParquetFileFormat.scala:266)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.sql.delta.DeltaParquetFileFormat.$anonfun$iteratorWithAdditionalMetadataColumns$8(DeltaParquetFileFormat.scala:266)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator$$anon$1.next(RecordReaderIterator.scala:62)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:211)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Expected results

It should show the content of the table.

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

vkorukanti commented 11 months ago

It seems like the API InternalRow.setLong() doesn't work in certain cases. Do you have any settings related to the parquet reader when reading from Delta 2.4? If there is a small repro case, it would help us debug the issue.

zerodarkzone commented 11 months ago

Hi, I'm gonna try to setup a small repo to replicate de issue. But for now, these are some of the configurations we are using related to parquet.

spark.sql.legacy.timeParserPolicy: LEGACY
spark.sql.parquet.int96RebaseModeInRead: CORRECTED
spark.sql.parquet.int96RebaseModeInWrite: CORRECTED
spark.sql.parquet.datetimeRebaseModeInWrite: CORRECTED
spark.sql.parquet.datetimeRebaseModeInRead: CORRECTED
vkorukanti commented 11 months ago

@zerodarkzone do you have spark.sql.parquet.enableVectorizedReader disabled? Also, what are the data types of columns in your table?

zerodarkzone commented 11 months ago

I dont have it disabled. We have integers, strings, timestamps, dates, some decimals and two columns in particular are an array of structs which contains some decimals and strings.

vkorukanti commented 9 months ago

@andreaschat-db was able to repro this issue. This is happening for wide tables which have more than 100 columns. When we are reading more than 100 columns, Sparks code generator makes a decision (1, 2) to not use the codegen. When not using codegen, Spark sets options to get rows instead of columnar batches from the Parquet reader. This causes the vectorized Parquet reader to return row abstraction over each column in the columnar batch. This row abstraction doesn't allow modification of the contents.

I will be posting a fix shortly. A couple of workarounds: 1) disable the vectorized parquet reader: spark.sql.parquet.enableVectorizedReader=false 2) Set the table width threshold for codegen to a high number (depending upon the number of columns in your table): spark.sql.codegen.maxFields (default value is 100).