NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
823 stars 236 forks source link

[BUG] Spark UT framework: SPARK-34212 Parquet should read decimals correctly #11433

Open Feng-Jiang28 opened 2 months ago

Feng-Jiang28 commented 2 months ago

Exception: a mismatch between the schema of the data stored in the Parquet file and the schema you're trying to use when reading it.

Reproduce:

Start plugin:

$SPARK_HOME/bin/spark-shell --master local[*] --jars ${SPARK_RAPIDS_PLUGIN_JAR} --conf spark.plugins=com.nvidia.spark.SQLPlugin --conf spark.rapids.sql.enabled=true

CPU:

scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

scala> val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
df: org.apache.spark.sql.DataFrame = [a: decimal(2,1), b: decimal(17,2) ... 1 more field]

scala> df.show()
+---+----+----+
|  a|   b|   c|
+---+----+----+
|1.0|1.23|1.23|
+---+----+----+

scala> df.write.parquet("/home/fejiang/Documents/temp")

scala> val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
schema1: String = a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)

scala> val df2 = spark.read.schema(schema1).parquet("/home/fejiang/Documents/temp")
df2: org.apache.spark.sql.DataFrame = [a: decimal(3,2), b: decimal(18,3) ... 1 more field]

scala> df2.show()
+----+-----+-----+
|   a|    b|    c|
+----+-----+-----+
|1.00|1.230|1.230|
+----+-----+-----+

GPU:

scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

scala> val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
df: org.apache.spark.sql.DataFrame = [a: decimal(2,1), b: decimal(17,2) ... 1 more field]

scala> df.show()
24/09/06 15:30:30 WARN GpuOverrides: 
  ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec

+---+----+----+
|  a|   b|   c|
+---+----+----+
|1.0|1.23|1.23|
+---+----+----+

scala> df.write.parquet("/home/fejiang/Documents/temp")
24/09/06 15:30:31 WARN GpuOverrides: 
    ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec

scala> val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
schema1: String = a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)

scala> val df2 = spark.read.schema(schema1).parquet("/home/fejiang/Documents/temp")
df2: org.apache.spark.sql.DataFrame = [a: decimal(3,2), b: decimal(18,3) ... 1 more field]

scala> df2.show()
24/09/06 15:30:32 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU

24/09/06 15:30:32 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///home/fejiang/Documents/temp/part-00000-9f83dfb2-74c0-4b2a-9bd4-5faadaeb893d-c000.snappy.parquet. Column: a, Expected: decimal(3,2), Found: required int32 a (DECIMAL(2,1))
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.throwTypeIncompatibleError(GpuParquetScan.scala:1025)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$12(GpuParquetScan.scala:757)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$12$adapted(GpuParquetScan.scala:757)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$6(GpuParquetScan.scala:878)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.checkPrimitiveCompat(GpuParquetScan.scala:1009)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.checkSchemaCompat(GpuParquetScan.scala:878)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$3(GpuParquetScan.scala:830)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$3$adapted(GpuParquetScan.scala:821)
    at scala.Option.foreach(Option.scala:407)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$2(GpuParquetScan.scala:821)
    at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$2$adapted(GpuParquetScan.scala:820)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
nartal1 commented 3 days ago

Will test if all the tests in UT-34212 is passing before closing this issue. Need to enable the UT before closing https://github.com/NVIDIA/spark-rapids/blob/f2ea943dfd911bd92d3342f91e966ade3dcc5510/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/utils/RapidsTestSettings.scala#L88