NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
807 stars 234 forks source link

to_json when parsing a map[struct, struct] throws an exception. #10918

Open Feng-Jiang28 opened 5 months ago

Feng-Jiang28 commented 5 months ago

to_json when parsing a map[struct, struct] throws an exception.

Reproduce:

CPU:

scala> import org.apache.spark.sql.SparkSession
scala> import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
scala> val spark = SparkSession.builder.appName("Test").getOrCreate()
24/05/28 13:44:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@24fec6d9
scala> import spark.implicits._
scala> val schema = MapType(
     |   StructType(Seq(StructField("a", IntegerType))),
     |   StructType(Seq(StructField("b", IntegerType)))
     | )
schema: org.apache.spark.sql.types.MapType = MapType(StructType(StructField(a,IntegerType,true)),StructType(StructField(b,IntegerType,true)),true)
scala> val data = Seq(
     |   Row(Map(Row(5) -> Row(2)))
     | )
data: Seq[org.apache.spark.sql.Row] = List([Map([5] -> [2])])
scala> val df = spark.createDataFrame(
     |   spark.sparkContext.parallelize(data),
     |   StructType(Seq(StructField("value", schema)))
     | )
df: org.apache.spark.sql.DataFrame = [value: map<struct<a:int>,struct<b:int>>]
scala> val jsonDF = df.select(to_json(col("value")).alias("json_string"))
jsonDF: org.apache.spark.sql.DataFrame = [json_string: string]
scala> jsonDF.show(false)
+-----------------+
|json_string      |
+-----------------+
|{"[0,5]":{"b":2}}|
+-----------------+

GPU:

$ $SPARK_HOME/bin/spark-shell --master local[*] --jars ${SPARK_RAPIDS_PLUGIN_JAR} 
--conf spark.plugins=com.nvidia.spark.SQLPlugin 
--conf spark.rapids.sql.enabled=true 
--conf spark.rapids.sql.explain=ALL --driver-java-options '-ea -Duser.timezone=UTC ' 
--conf spark.rapids.sql.expression.JsonTuple=true 
--conf spark.rapids.sql.expression.GetJsonObject=true 
--conf spark.rapids.sql.expression.JsonToStructs=true 
--conf spark.rapids.sql.expression.StructsToJson=true
scala> import org.apache.spark.sql.SparkSession
scala> import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
scala> val spark = SparkSession.builder.appName("Test").getOrCreate()
scala> import spark.implicits._
scala> val schema = MapType(
     |   StructType(Seq(StructField("a", IntegerType))),
     |   StructType(Seq(StructField("b", IntegerType)))
     | )
scala> val data = Seq(
     |   Row(Map(Row(1) -> Row(2)))
     | )
scala> val df = spark.createDataFrame(
     |   spark.sparkContext.parallelize(data),
     |   StructType(Seq(StructField("value", schema)))
     | )
scala> val jsonDF = df.select(to_json(col("value")).alias("json_string"))
scala> jsonDF.show(false)
24/05/28 05:47:36 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> to_json(value#21, Some(UTC)) AS json_string#26 will run on GPU
      *Expression <StructsToJson> to_json(value#21, Some(UTC)) will run on GPU
    ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec
      @Expression <AttributeReference> value#21 could run on GPU

24/05/28 05:47:36 ERROR Executor: Exception in task 6.0 in stage 5.0 (TID 23)
java.lang.ClassCastException: org.apache.spark.sql.types.MapType cannot be cast to org.apache.spark.sql.types.StructType
    at org.apache.spark.sql.rapids.GpuStructsToJson.doColumnar(GpuStructsToJson.scala:86)
    at com.nvidia.spark.rapids.GpuUnaryExpression.doItColumnar(GpuExpressions.scala:250)
    at com.nvidia.spark.rapids.GpuUnaryExpression.$anonfun$columnarEval$1(GpuExpressions.scala:261)
    at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
    at com.nvidia.spark.rapids.GpuUnaryExpression.columnarEval(GpuExpressions.scala:260)
    at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:35)
    at com.nvidia.spark.rapids.GpuAlias.columnarEval(namedExpressions.scala:110)
    at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:35)
    at com.nvidia.spark.rapids.GpuProjectExec$.$anonfun$project$1(basicPhysicalOperators.scala:110)
    at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1(implicits.scala:221)
    at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1$adapted(implicits.scala:218)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.safeMap(implicits.scala:218)
    at com.nvidia.spark.rapids.RapidsPluginImplicits$AutoCloseableProducingSeq.safeMap(implicits.scala:253)
    at com.nvidia.spark.rapids.GpuProjectExec$.project(basicPhysicalOperators.scala:110)
    at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$project$2(basicPhysicalOperators.scala:619)
    at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
    at com.nvidia.spark.rapids.GpuTieredProject.recurse$2(basicPhysicalOperators.scala:618)
    at com.nvidia.spark.rapids.GpuTieredProject.project(basicPhysicalOperators.scala:631)
    at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$5(basicPhysicalOperators.scala:567)
    at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRestoreOnRetry(RmmRapidsRetryIterator.scala:272)
    at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$4(basicPhysicalOperators.scala:567)
revans2 commented 3 weeks ago

I don't think we want to try and support having anything but a String as the key for a map. At least not until we have a customer use case that asks for it. I know in the past toString was called on the java class that was the key and there were inconsistent/no-sensical results at best.

revans2 commented 3 weeks ago

After https://github.com/NVIDIA/spark-rapids/pull/11642 this no longer throws an exception. Instead it falls back to the CPU.