apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.13k stars 2.13k forks source link

read from Iceberg table throw java.lang.ArrayIndexOutOfBoundsException: 3 #10103

Open jiantao-vungle opened 5 months ago

jiantao-vungle commented 5 months ago

Apache Iceberg version

1.3.1

Query engine

Spark

Please describe the bug 🐞

Environment

Spark: 3.4.1 Iceberg: 1.3.1

Description

Throw following exception when read from Iceberg table. I did some researches, and found that there existed a column named publisher_payout_type_at_delivery in the table, it had only three options, like CPM, REVENUE_SHARE and FLAT_CPM, but the exception seemed that it was to use other fourth option. It seemed that it made some mistake when writing the parquet file.

And this problem question is very random, re-run the corresponding DML sql to generate the Iceberg table, and then queries on it can succeed.

Exception

Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:63)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 38 in stage 4.0 failed 4 times, most recent failure: Lost task 38.3 in stage 4.0 (TID 314) (172.26.2.4 executor 4): java.lang.ArrayIndexOutOfBoundsException: 3
    at org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory$DictionaryStringAccessor.getUTF8String(GenericArrowVectorAccessorFactory.java:423)
    at org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector.getUTF8String(IcebergArrowColumnVector.java:138)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 3
    at org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory$DictionaryStringAccessor.getUTF8String(GenericArrowVectorAccessorFactory.java:423)
    at org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector.getUTF8String(IcebergArrowColumnVector.java:138)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
nastra commented 5 months ago

@jiantao-vungle do you have a small reproducible example? Without that it's quite difficult to reproduce this

amogh-jahagirdar commented 5 months ago

n. It seemed that it made some mistake when writing the parquet file.

To add to what @nastra said If you're allowed to share this file that led to the query failing that would be very helpful to determine if the file was correctly written. specifically, I'm looking to check if the dictionary encoding of this particular column is correct. Based on the exception there's some invalid offset.

If you're not able to share the file then it would be helpful to use parquet-cli or some other parquet inspection tool to do the dictionary encoding verification I mentioned.

jiantao-vungle commented 5 months ago

thank @nastra and @amogh-jahagirdar , it's need to evaluate to share the parquet file, i did some explores,like

➜ parquet dictionary 00339-809-290052b9-087e-4bda-b9a6-716fb7ef841c-00001.parquet -c publisher_payout_type_at_delivery

the result is:

Row group 0 dictionary for "publisher_payout_type_at_delivery":
     0: "CPM"
     1: "REVENUE_SHARE"
     2: "FLAT_CPM"

➜ parquet check-stats 00339-809-290052b9-087e-4bda-b9a6-716fb7ef841c-00001.parquet is:

00339-809-290052b9-087e-4bda-b9a6-716fb7ef841c-00001.parquet has no corrupt stats

➜ parquet pages 00339-809-290052b9-087e-4bda-b9a6-716fb7ef841c-00001.parquet -c publisher_payout_type_at_delivery

Column: publisher_payout_type_at_delivery
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-D    dict  G _  3       12.00 B    36 B
  0-1    data  G R  20000   0.14 B     2.729 kB
  0-2    data  G R  20000   0.13 B     2.621 kB
  0-3    data  G R  20000   0.17 B     3.289 kB
  0-4    data  G R  20000   0.13 B     2.615 kB
  0-5    data  G R  20000   0.14 B     2.674 kB
  0-6    data  G R  20000   0.14 B     2.665 kB
  0-7    data  G R  20000   0.14 B     2.669 kB
  0-8    data  G R  20000   0.14 B     2.726 kB
  0-9    data  G R  20000   0.13 B     2.602 kB
  0-10   data  G R  20000   0.14 B     2.818 kB
  0-11   data  G R  20000   0.13 B     2.585 kB
  0-12   data  G R  20000   0.15 B     2.834 kB
  0-13   data  G R  20000   0.13 B     2.585 kB
  0-14   data  G R  14872   0.14 B     2.022 kB

➜ parquet scan 00339-809-290052b9-087e-4bda-b9a6-716fb7ef841c-00001.parquet -c publisher_payout_type_at_delivery

Unknown error
java.lang.RuntimeException: Failed on record 52315
    at org.apache.parquet.cli.commands.ScanCommand.run(ScanCommand.java:69)
    at org.apache.parquet.cli.Main.run(Main.java:163)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    at org.apache.parquet.cli.Main.main(Main.java:193)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 52317 in block 0 in file file:/Users/Env/parquet/00339-809-290052b9-087e-4bda-b9a6-716fb7ef841c-00001.parquet
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.parquet.cli.BaseCommand$1$1.advance(BaseCommand.java:363)
    at org.apache.parquet.cli.BaseCommand$1$1.next(BaseCommand.java:357)
    at org.apache.parquet.cli.commands.ScanCommand.run(ScanCommand.java:64)
    ... 3 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 3 out of bounds for length 3
    at org.apache.parquet.avro.AvroConverters$BinaryConverter.addValueFromDictionary(AvroConverters.java:87)
    at org.apache.parquet.column.impl.ColumnReaderBase$1.writeValue(ColumnReaderBase.java:186)
    at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
    at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
    at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:234)
    ... 7 more

Or could you share the possible methods to check the dictionary encoding @amogh-jahagirdar

jiantao-vungle commented 4 months ago

and yesterday we encountered the similar problem again:

Py4JJavaError: An error occurred while calling o140.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 5.0 failed 4 times, most recent failure: Lost task 2.3 in stage 5.0 (TID 15) (172.26.33.207 executor 1): org.apache.parquet.io.ParquetDecodingException: Failed to read from input stream
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readNextGroup(VectorizedRleValuesReader.java:942)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readIntegers(VectorizedRleValuesReader.java:696)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$IntegerUpdater.readValues(ParquetVectorUpdaterFactory.java:256)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readIntegers(VectorizedRleValuesReader.java:193)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:204)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:316)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:212)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:554)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
    at org.apache.parquet.bytes.SingleBufferInputStream.slice(SingleBufferInputStream.java:116)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readNextGroup(VectorizedRleValuesReader.java:933)
    ... 27 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.parquet.io.ParquetDecodingException: Failed to read from input stream
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readNextGroup(VectorizedRleValuesReader.java:942)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readIntegers(VectorizedRleValuesReader.java:696)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$IntegerUpdater.readValues(ParquetVectorUpdaterFactory.java:256)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readIntegers(VectorizedRleValuesReader.java:193)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:204)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:316)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:212)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:554)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
    at org.apache.parquet.bytes.SingleBufferInputStream.slice(SingleBufferInputStream.java:116)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readNextGroup(VectorizedRleValuesReader.java:933)
    ... 27 more