apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.23k stars 2.17k forks source link

Spark vectorized read of Parquet produces incorrect result for a decimal column #11221

Open wypoon opened 4 days ago

wypoon commented 4 days ago

Apache Iceberg version

main (development)

Query engine

Spark

Please describe the bug 🐞

The bug is present in Iceberg 1.2 and later (and is in main). A customer uses Impala to write Parquet data into an Iceberg table. We have a sample Parquet file written by Impala. It contains a single decimal(38, 0) column.

$ parquet-tools meta impala_test_data.parq
file:        file:/home/systest/impala_test_data.parq 
creator:     impala version 4.0.0.2024.0.18.1-1 (build 1fe5a71a0498831cf13e5b30ca5e431d69da58bd) 

file schema: schema 
--------------------------------------------------------------------------------
id:          OPTIONAL FIXED_LEN_BYTE_ARRAY L:DECIMAL(38,0) R:0 D:1

row group 1: RC:435942 TS:2755159 OFFSET:4 
--------------------------------------------------------------------------------
id:           FIXED_LEN_BYTE_ARRAY SNAPPY DO:4 FPO:243092 SZ:2755159/7058047/2.56 VC:435942 ENC:PLAIN_DICTIONARY,PLAIN,RLE ST:[min: 224798, max: 5431555, num_nulls: 0]

Spark is able to read the Parquet file correctly (note this is Spark's own Parquet read path, not Spark Iceberg support):

scala> val df = spark.read.parquet("/user/systest/decimal_test/impala_test_data.parq")
df: org.apache.spark.sql.DataFrame = [id: decimal(38,0)]                        

scala> df.count()
res0: Long = 435942                                                             

scala> df.show()
+-------+                                                                       
|     id|
+-------+
|3025050|
|1401270|
| 505425|
| 479647|
|5061822|
|4170450|
|3307794|
| 683409|
|3205921|
|3261299|
|1596856|
|5260644|
|4865400|
|4737157|
|4808919|
|4032370|
|5183774|
|4119261|
|1911171|
| 782928|
+-------+
only showing top 20 rows

However, when we create an Iceberg table, add the file to it (using the Spark add_files procedure), and then query the table, we get

scala> spark.sql("select * from test_iceberg").show(80, false)
+-------+
|id     |
+-------+
|3025050|
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|1401270|
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|505425 |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|479647 |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|5061822|
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
|0      |
+-------+
only showing top 80 rows

As we can see, the values do show up, but there are 15 zeros that show up after each value. This is using vectorized Parquet read, as read.parquet.vectorization.enabled is true by default. When I set it to false in table properties for the table and query it again, the results are correct. Thus the bug is in the vectorized read path.

Note that when I write the DataFrame from reading the original Parquet file back out into another Iceberg table (with the same schema), the file written by Spark has a different encoding:

$ parquet-tools meta spark_iceberg_data.parquet
file:        file:/home/systest/spark_iceberg_data.parquet 
creator:     parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba) 
extra:       iceberg.schema = {"type":"struct","schema-id":0,"fields":[{"id":1,"name":"id","required":false,"type":"decimal(38, 0)"}]} 

file schema: table 
--------------------------------------------------------------------------------
id:          OPTIONAL FIXED_LEN_BYTE_ARRAY L:DECIMAL(38,0) R:0 D:1

row group 1: RC:435942 TS:6975886 OFFSET:4 
--------------------------------------------------------------------------------
id:           FIXED_LEN_BYTE_ARRAY SNAPPY DO:0 FPO:4 SZ:2685449/6975886/2.60 VC:435942 ENC:PLAIN,RLE,BIT_PACKED ST:[min: 224798, max: 5431555, num_nulls: 0]

Querying this other table (with Parquet written by Spark) works fine, even using vectorized read.

Willingness to contribute

wypoon commented 4 days ago

In Iceberg 1.1, a different bug occurs when reading the Iceberg table; the read fails altogether due to:

ERROR org.apache.iceberg.spark.source.BaseReader - Error reading file(s): file:/Users/wypoon/tmp/downloads/impala_test_data.parq
java.lang.IndexOutOfBoundsException: index: 1016, length: 1 (expected: range(0, 1016))
    at org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:318)
    at org.apache.arrow.memory.ArrowBuf.chk(ArrowBuf.java:305)
    at org.apache.arrow.memory.ArrowBuf.getByte(ArrowBuf.java:507)
    at org.apache.arrow.vector.BitVectorHelper.setBit(BitVectorHelper.java:85)
    at org.apache.arrow.vector.DecimalVector.setBigEndian(DecimalVector.java:216)
    at org.apache.iceberg.arrow.vectorized.parquet.DecimalVectorUtil.setBigEndian(DecimalVectorUtil.java:31)
    at org.apache.iceberg.arrow.vectorized.parquet.VectorizedDictionaryEncodedParquetValuesReader$FixedLengthDecimalDictEncodedReader.nextVal(VectorizedDictionaryEncodedParquetValuesReader.java:146)
    at org.apache.iceberg.arrow.vectorized.parquet.VectorizedDictionaryEncodedParquetValuesReader$BaseDictEncodedReader.nextBatch(VectorizedDictionaryEncodedParquetValuesReader.java:67)
    at org.apache.iceberg.arrow.vectorized.parquet.VectorizedParquetDefinitionLevelReader$FixedLengthDecimalReader.nextDictEncodedVal(VectorizedParquetDefinitionLevelReader.java:513)
    at org.apache.iceberg.arrow.vectorized.parquet.VectorizedParquetDefinitionLevelReader$BaseReader.nextDictEncodedBatch(VectorizedParquetDefinitionLevelReader.java:356)
    at org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator$FixedLengthDecimalPageReader.nextDictEncodedVal(VectorizedPageIterator.java:421)
    at org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator$BagePageReader.nextBatch(VectorizedPageIterator.java:186)
    at org.apache.iceberg.arrow.vectorized.parquet.VectorizedColumnIterator$FixedLengthDecimalBatchReader.nextBatchOf(VectorizedColumnIterator.java:213)
    at org.apache.iceberg.arrow.vectorized.parquet.VectorizedColumnIterator$BatchReader.nextBatch(VectorizedColumnIterator.java:77)
    at org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.read(VectorizedArrowReader.java:146)
    at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader$ColumnBatchLoader.readDataToColumnVectors(ColumnarBatchReader.java:123)
    at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader$ColumnBatchLoader.loadDataToColumnBatch(ColumnarBatchReader.java:98)
    at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader.read(ColumnarBatchReader.java:72)
    at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader.read(ColumnarBatchReader.java:44)
    at org.apache.iceberg.parquet.VectorizedParquetReader$FileIterator.next(VectorizedParquetReader.java:147)
    at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:130)
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:119)
    at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:156)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
    at scala.Option.exists(Option.scala:376)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
        ...

As far as I can tell, this has never worked or worked correctly.

wypoon commented 4 days ago

impala_test_data.txt

Please rename the file (I used a .txt extension just to workaround my Mac preventing me from uploading the file).

wypoon commented 4 days ago

cc @nastra

wypoon commented 3 days ago

cc @bryanck @rdblue