NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
824 stars 236 forks source link

[BUG] Spark UT framework: schema mismatch failure error message for parquet reader #11434

Open Feng-Jiang28 opened 2 months ago

Feng-Jiang28 commented 2 months ago

When runing the code below which causes schema mismatch failure to intercept the Exception, the Exception in Spark is an instance of QueryExecutionException and ParquetDecodingException. But in GPU version, it is only an instance of QueryExecutionException.

Reproduce:

spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
Seq(("bcd", 2)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet("/home/fejiang/Documents/temp3")
Seq((1, "abc")).toDF("a", "b").coalesce(1).write.mode("append").parquet("/home/fejiang/Documents/temp3")
spark.read.parquet("/home/fejiang/Documents/temp3").collect()

CPU:

scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

scala> Seq(("bcd", 2)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet("/home/fejiang/Documents/temp3")

scala> Seq((1, "abc")).toDF("a", "b").coalesce(1).write.mode("append").parquet("/home/fejiang/Documents/temp3")

scala> spark.read.parquet("/home/fejiang/Documents/temp3").collect()
24/09/06 17:56:48 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
org.apache.spark.sql.execution.QueryExecutionException: Encountered error while reading file file:///home/fejiang/Documents/temp3/part-00000-5e54d78f-90b5-4323-a827-62d8b73076d3-c000.snappy.parquet. Details: 
    at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:713)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:283)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/home/fejiang/Documents/temp3/part-00000-5e54d78f-90b5-4323-a827-62d8b73076d3-c000.snappy.parquet

GPU:


scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

scala> Seq(("bcd", 2)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet("/home/fejiang/Documents/temp3")
24/09/06 18:32:09 WARN GpuOverrides: 
    ! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
      @Expression <AttributeReference> a#31 could run on GPU
      @Expression <AttributeReference> b#32 could run on GPU

scala> Seq((1, "abc")).toDF("a", "b").coalesce(1).write.mode("append").parquet("/home/fejiang/Documents/temp3")
24/09/06 18:32:10 WARN GpuOverrides: 
    ! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
      @Expression <AttributeReference> a#44 could run on GPU
      @Expression <AttributeReference> b#45 could run on GPU

scala> spark.read.parquet("/home/fejiang/Documents/temp3").collect()
24/09/06 18:32:10 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 6)
org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///home/fejiang/Documents/temp3/part-00000-d52c3c9d-9ef0-41a0-b0d7-7a3679ee4bd7-c000.snappy.parquet. Column: a, Expected: string, Found: required int32 a
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.throwTypeIncompatibleError(GpuParquetScan.scala:1025)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$12(GpuParquetScan.scala:757)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$12$adapted(GpuParquetScan.scala:757)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$6(GpuParquetScan.scala:878)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.checkPrimitiveCompat(GpuParquetScan.scala:1009)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.checkSchemaCompat(GpuParquetScan.scala:878)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$3(GpuParquetScan.scala:830)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$3$adapted(GpuParquetScan.scala:821)
    at scala.Option.foreach(Option.scala:407)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$2(GpuParquetScan.scala:821)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$2$adapted(GpuParquetScan.scala:820)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)

``
mattahrens commented 2 months ago

@GaryShen2008 this is only an exception difference so it is a low priority to address.