opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
22 stars 33 forks source link

[BUG] Timestamp queries with Iceberg throw ClassCastException #511

Open engechas opened 3 months ago

engechas commented 3 months ago

What is the bug? When running certain queries that involve timestamp fields against Iceberg tables an exception is thrown during query execution:

24/07/26 20:16:19 ERROR Executor: Exception in task 4.3 in stage 0.0 (TID 17)
--
java.lang.ClassCastException: class org.apache.iceberg.shaded.org.apache.arrow.vector.TimeStampMicroVector cannot be cast to class org.apache.iceberg.shaded.org.apache.arrow.vector.BigIntVector (org.apache.iceberg.shaded.org.apache.arrow.vector.TimeStampMicroVector and org.apache.iceberg.shaded.org.apache.arrow.vector.BigIntVector are in unnamed module of loader 'app')
at org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.allocateVectorBasedOnOriginalType(VectorizedArrowReader.java:273) ~[iceberg-spark-runtime-3.3_2.12-1.2.0-amzn-0.jar:?]
at org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.allocateFieldVector(VectorizedArrowReader.java:218) ~[iceberg-spark-runtime-3.3_2.12-1.2.0-amzn-0.jar:?]
at org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.read(VectorizedArrowReader.java:132) ~[iceberg-spark-runtime-3.3_2.12-1.2.0-amzn-0.jar:?]
at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader$ColumnBatchLoader.readDataToColumnVectors(ColumnarBatchReader.java:123) ~[iceberg-spark-runtime-3.3_2.12-1.2.0-amzn-0.jar:?]
at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader$ColumnBatchLoader.loadDataToColumnBatch(ColumnarBatchReader.java:98) ~[iceberg-spark-runtime-3.3_2.12-1.2.0-amzn-0.jar:?]
at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader.read(ColumnarBatchReader.java:72) ~[iceberg-spark-runtime-3.3_2.12-1.2.0-amzn-0.jar:?]
at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader.read(ColumnarBatchReader.java:44) ~[iceberg-spark-runtime-3.3_2.12-1.2.0-amzn-0.jar:?]
at org.apache.iceberg.parquet.VectorizedParquetReader$FileIterator.next(VectorizedParquetReader.java:147) ~[iceberg-spark-runtime-3.3_2.12-1.2.0-amzn-0.jar:?]
at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:136) ~[iceberg-spark-runtime-3.3_2.12-1.2.0-amzn-0.jar:?]
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:119) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:156) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at scala.Option.exists(Option.scala:376) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) ~[?:?]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source) ~[?:?]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[?:?]
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source) ~[?:?]
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:955) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.scheduler.Task.run(Task.scala:138) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at java.lang.Thread.run(Thread.java:840) ~[?:?]

More info:

The exception comes from here: https://github.com/apache/iceberg/blob/1.2.x/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java#L273

It looks like the TimeStampMicroVector is coming from here: https://github.com/apache/iceberg/blob/main/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java#L103-L107

How can one reproduce the bug? Steps to reproduce the behavior: The exact mechanism to reproduce this is unknown. The below query causes the exception:

SELECT accountid, region, count(*) as total FROM <table> WHERE accountid in ('<redacted>') AND region = 'us-east-1' AND time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '1' MONTH AND CURRENT_TIMESTAMP GROUP BY accountid, region ORDER BY total DESC

What is the expected behavior? The query should execute successfully instead of throwing a ClassCastException

What is your host/environment?

Do you have any screenshots? If applicable, add screenshots to help explain your problem.

Do you have any additional context? Add any other context about the problem.

dai-chen commented 3 months ago

Just trying to understand: is this a bug in Spark Iceberg reader itself?

engechas commented 3 months ago

Yes it looks like a bug in the Spark Iceberg reader

dai-chen commented 3 months ago

Yes it looks like a bug in the Spark Iceberg reader

Thanks for confirming! If possible, could you test it with Spark 3.5 because we've bumped the version and planning to release 0.5 soon.

engechas commented 3 months ago

Peng encountered this in some of his testing with EMRs 7.2/Spark 3.5 so doesn't look like the version bump will fix it unfortunately

anirudha commented 3 months ago

whats the path ahead here?