apache / kyuubi

Apache Kyuubi is a distributed and multi-tenant gateway to provide serverless SQL on data warehouses and lakehouses.
https://kyuubi.apache.org/
Apache License 2.0
2.01k stars 876 forks source link

[Bug] spark hive connector failed to read tpcds parquet table #6503

Open FANNG1 opened 6 days ago

FANNG1 commented 6 days ago

Code of Conduct

Search before asking

Describe the bug

I follow the steps to generate tpcds data for spark https://github.com/yaooqinn/tpcds-for-spark/tree/master, when querying data get bellow exceptions: select * from catalog_sales limit 1;

Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/Users/fanng/opensource/tpcds/tpcds-for-spark/spark-warehouse/tpcds.db/catalog_sales/cs_sold_date_sk=2450815/part-00005-8942c9cb-bf45-4521-b53d-ab272c62ce58.c000.gz.parquet
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
    at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
    at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:98)
    at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:60)
    at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
    at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.liftedTree1$1(HivePartitionReaderFactory.scala:130)
    at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.<init>(HivePartitionReaderFactory.scala:129)
    at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.createPartitionWritableReader(HivePartitionReaderFactory.scala:122)
    at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.buildReaderInternal(HivePartitionReaderFactory.scala:91)
    at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.$anonfun$createReader$1(HivePartitionReaderFactory.scala:75)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.getNextReader(SparkFilePartitionReader.scala:99)
    at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.next(SparkFilePartitionReader.scala:46)
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
    at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
    at scala.Option.exists(Option.scala:376)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
    at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
    at org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter$BinaryConverter.setDictionary(ETypeConverter.java:283)
    at org.apache.parquet.column.impl.ColumnReaderBase.<init>(ColumnReaderBase.java:415)
    at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:46)
    at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:82)
    at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
    at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:177)
    at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:141)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:230)

select * from store_returns limit 1;

Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/Users/fanng/opensource/tpcds/tpcds-for-spark/spark-warehouse/tpcds.db/store_returns/sr_returned_date_sk=2450820/part-00006-99abfdab-303f-4af4-8be1-46e581a8b189.c000.gz.parquet
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
    at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
    at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:98)
    at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:60)
    at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
    at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.liftedTree1$1(HivePartitionReaderFactory.scala:130)
    at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.<init>(HivePartitionReaderFactory.scala:129)
    at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.createPartitionWritableReader(HivePartitionReaderFactory.scala:122)
    at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.buildReaderInternal(HivePartitionReaderFactory.scala:91)
    at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.$anonfun$createReader$1(HivePartitionReaderFactory.scala:75)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.getNextReader(SparkFilePartitionReader.scala:99)
    at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.next(SparkFilePartitionReader.scala:46)
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
    at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
    at scala.Option.exists(Option.scala:376)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter$8$1
    at org.apache.parquet.io.api.PrimitiveConverter.addInt(PrimitiveConverter.java:98)
    at org.apache.parquet.column.impl.ColumnReaderBase$2$3.writeValue(ColumnReaderBase.java:297)
    at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
    at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
    at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:234)

Affects Version(s)

1.8.2

Kyuubi Server Log Output

No response

Kyuubi Engine Log Output

No response

Kyuubi Server Configurations

No response

Kyuubi Engine Configurations

No response

Additional context

No response

Are you willing to submit PR?

pan3793 commented 6 days ago

There are known limitations with Hive Parquet reader implementation, you may want to try enabling spark.sql.parquet.writeLegacyFormat when generating TPC-DS using Spark.

FANNG1 commented 6 days ago

There are known limitations with Hive Parquet reader implementation, you may want to try enabling spark.sql.parquet.writeLegacyFormat when generating TPC-DS using Spark.

Yes, it works after setting spark.sql.parquet.writeLegacyFormat to true. Is there any other limitations about kyuubi spark hive connector ? and any plan to support new parquet format?

pan3793 commented 5 days ago

Limitations were listed on our first meeting. KSCH only supports using Hive SerDe to read/write Hive tables, consequently, it has the same limitation as Spark's built-in Hive implementation, specifically, Hive 2.3.9, e.g. poor performance because of non-vectorized, does not support new Parquet logical types.

This requires a mechanism to respect spark.sql.hive.convertMetastoreParquet(or define a new dedicated configuration) to convert the Hive Parquet table reading to the Spark DataSource table.