NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
822 stars 235 forks source link

[BUG] Spark UT framework: select explode of nested field of array of struct: Encountered an exception applying GPU overrides #11653

Open Feng-Jiang28 opened 4 weeks ago

Feng-Jiang28 commented 4 weeks ago

Description:

contacts parquet is defined as following and has saved here: contacts.zip

+---+--------------------+---------------+----+--------------------+----------------------------+-------------------------------+----------------------------+---+
|id |name                |address        |pets|friends             |relatives                   |employer                       |relations                   |p  |
+---+--------------------+---------------+----+--------------------+----------------------------+-------------------------------+----------------------------+---+
|0  |{Jane, X., Doe}     |123 Main Street|1   |[{Susan, Z., Smith}]|{brother -> {John, Y., Doe}}|{0, {abc, 123 Business Street}}|{{John, Y., Doe} -> brother}|1  |
|1  |{John, Y., Doe}     |321 Wall Street|3   |[]                  |{sister -> {Jane, X., Doe}} |{1, null}                      |{{Jane, X., Doe} -> sister} |1  |
|2  |{Janet, null, Jones}|567 Maple Drive|null|null                |null                        |null                           |null                        |2  |
|3  |{Jim, null, Jones}  |6242 Ash Street|null|null                |null                        |null                           |null                        |2  |
+---+--------------------+---------------+----+--------------------+----------------------------+-------------------------------+----------------------------+---+

Code to reproduce:

    val dataSourceName = "parquet"
    val path = "/home/fejiang/Desktop"
    val schema = ("`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
      "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, " +
      "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, " +
      "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, " +
      "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, " +
      "`last`: STRING>,STRING>,`p` INT")
    spark.read.format(dataSourceName).schema(schema).
      load(path + "/contacts").createOrReplaceTempView("contacts")
    spark.sql("set spark.sql.optimizer.nestedSchemaPruning.enabled=false")
    spark.sql("set spark.sql.optimizer.expression.nestedPruning.enabled=true")
    val query1 = spark.table("contacts").select(explode(col("friends.first")))

    query1.show()

CPU:

scala>     val dataSourceName = "parquet"
dataSourceName: String = parquet

scala>     val path = "/home/fejiang/Desktop"
path: String = /home/fejiang/Desktop

scala>     val schema = ("`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
     |       "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, " +
     |       "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, " +
     |       "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, " +
     |       "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, " +
     |       "`last`: STRING>,STRING>,`p` INT")
schema: String = `id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, `address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>,STRING>,`p` INT

scala>     spark.read.format(dataSourceName).schema(schema).
     |       load(path + "/contacts").createOrReplaceTempView("contacts")

scala>     spark.sql("set spark.sql.optimizer.nestedSchemaPruning.enabled=false")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala>     spark.sql("set spark.sql.optimizer.expression.nestedPruning.enabled=true")
res2: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala>     val query1 = spark.table("contacts").select(explode(col("friends.first")))
query1: org.apache.spark.sql.DataFrame = [col: string]

scala> 

scala>     query1.show()
+-----+                                                                         
|  col|
+-----+
|Susan|
+-----+

GPU:

Start Plugin to run strict mode

$SPARK_HOME/bin/spark-shell --master local[*] \
  --jars ${SPARK_RAPIDS_PLUGIN_JAR} \
  --conf spark.plugins=com.nvidia.spark.SQLPlugin \
  --conf spark.rapids.sql.enabled=true \
  --conf spark.rapids.sql.explain=ALL \
  --conf spark.rapids.sql.test.enabled=true \
  --conf spark.rapids.sql.test.allowedNonGpu=SerializeFromObjectExec,DeserializeToObjectExec,ExternalRDDScanExec
cala> val dataSourceName = "parquet"
dataSourceName: String = parquet

scala> val path = "/home/fejiang/Desktop"
path: String = /home/fejiang/Desktop

scala> val schema = ("`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
     |   "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, " +
     |   "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>,STRING>,`p` INT")
schema: String = `id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, `address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>,STRING>,`p` INT

scala> spark.read.format(dataSourceName).schema(schema).load(path + "/contacts").createOrReplaceTempView("contacts")

scala> spark.sql("set spark.sql.optimizer.nestedSchemaPruning.enabled=false")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> spark.sql("set spark.sql.optimizer.expression.nestedPruning.enabled=true")
res2: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> val query1 = spark.table("contacts").select(explode(col("friends.first")))
query1: org.apache.spark.sql.DataFrame = [col: string]

scala> query1.explain()
24/10/24 15:35:06 WARN GpuOverrides: 
*Exec <GenerateExec> will run on GPU
  *Expression <Explode> explode(friends#4.first) will run on GPU
    *Expression <GetArrayStructFields> friends#4.first will run on GPU
  *Exec <ProjectExec> will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

== Physical Plan ==
GpuColumnarToRow false
+- GpuGenerate gpuexplode(friends#4.first), false, [col#48]
   +- GpuProject [friends#4]
      +- GpuFileGpuScan parquet [friends#4] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/fejiang/Desktop/contacts], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<friends:array<struct<first:string,middle:string,last:string>>>

scala> query1.show()
24/10/24 15:35:07 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <GenerateExec> will run on GPU
    *Expression <Explode> explode(friends#4.first) will run on GPU
      *Expression <GetArrayStructFields> friends#4.first will run on GPU
    *Exec <ProjectExec> will run on GPU
      *Exec <FileSourceScanExec> will run on GPU

24/10/24 15:35:07 ERROR GpuOverrideUtil: Encountered an exception applying GPU overrides java.lang.IllegalArgumentException: Part of the plan is not columnar class org.apache.spark.sql.execution.CollectLimitExec
CollectLimit 21
+- GpuColumnarToRow false
   +- GpuGenerate gpuexplode(friends#4.first), false, [col#48], [loreId=2]
      +- GpuProject [friends#4], [loreId=1]
         +- GpuFileGpuScan parquet [friends#4] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/fejiang/Desktop/contacts], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<friends:array<struct<first:string,middle:string,last:string>>>

java.lang.IllegalArgumentException: Part of the plan is not columnar class org.apache.spark.sql.execution.CollectLimitExec
CollectLimit 21
+- GpuColumnarToRow false
   +- GpuGenerate gpuexplode(friends#4.first), false, [col#48], [loreId=2]
      +- GpuProject [friends#4], [loreId=1]
         +- GpuFileGpuScan parquet [friends#4] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/fejiang/Desktop/contacts], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<friends:array<struct<first:string,middle:string,last:string>>>

    at com.nvidia.spark.rapids.GpuTransitionOverrides.assertIsOnTheGpu(GpuTransitionOverrides.scala:671)
    at com.nvidia.spark.rapids.GpuTransitionOverrides.$anonfun$apply$3(GpuTransitionOverrides.scala:819)
    at com.nvidia.spark.rapids.GpuOverrides$.logDuration(GpuOverrides.scala:457)
    at com.nvidia.spark.rapids.GpuTransitionOverrides.$anonfun$apply$1(GpuTransitionOverrides.scala:793)
    at com.nvidia.spark.rapids.GpuOverrideUtil$.$anonfun$tryOverride$1(GpuOverrides.scala:4646)
    at com.nvidia.spark.rapids.GpuTransitionOverrides.apply(GpuTransitionOverrides.scala:852)
    at com.nvidia.spark.rapids.GpuTransitionOverrides.apply(GpuTransitionOverrides.scala:46)
    at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$2(Columnar.scala:555)
    at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$2$adapted(Columnar.scala:555)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.apply(Columnar.scala:555)
    at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.apply(Columnar.scala:514)
    at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$1(QueryExecution.scala:440)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
****