NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
Apache License 2.0
765 stars 225 forks source link

Parsing a column containing invalid json into StructureType with schema throws an Exception. #10891

Open Feng-Jiang28 opened 1 month ago

Feng-Jiang28 commented 1 month ago

Parsing a column containing invalid json into StructureType with schema throws an Exception.


:~$ $SPARK_HOME/bin/spark-shell --master local[*] --jars ${SPARK_RAPIDS_PLUGIN_JAR} 
--conf spark.plugins=com.nvidia.spark.SQLPlugin 
--conf spark.rapids.sql.enabled=true 
--conf spark.rapids.sql.explain=ALL 
--driver-java-options '-ea -Duser.timezone=UTC ' 
--conf spark.rapids.sql.expression.JsonTuple=true 
--conf spark.rapids.sql.expression.GetJsonObject=true 
--conf spark.rapids.sql.expression.JsonToStructs=true 
--conf spark.rapids.sql.expression.StructsToJson=true


scala> import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.types.{IntegerType, StructType}

scala> val df = Seq("""{"a" 1}""").toDS()
df: org.apache.spark.sql.Dataset[String] = [value: string]

scala> df.write.mode("OVERWRITE").parquet("TEMP")
24/05/24 09:51:35 WARN GpuOverrides: 
*Exec <DataWritingCommandExec> will run on GPU
  *Output <InsertIntoHadoopFsRelationCommand> will run on GPU
  ! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
    @Expression <AttributeReference> value#1 could run on GPU

scala> val df2 = spark.read.parquet("TEMP")
df2: org.apache.spark.sql.DataFrame = [value: string]

scala> val schema = new StructType().add("a", IntegerType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,true))

scala> var parsed = df2.select(from_json($"value", schema))
parsed: org.apache.spark.sql.DataFrame = [from_json(value): struct<a: int>]

scala> parsed.show()
24/05/24 09:51:39 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.IllegalStateException: No empty row count provided and the table read has no row count or columns
    at ai.rapids.cudf.Table.gatherJSONColumns(Table.java:1204)
    at ai.rapids.cudf.Table.readJSON(Table.java:1446)
    at ai.rapids.cudf.Table.readJSON(Table.java:1428)
    at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$doColumnar$2(GpuJsonToStructs.scala:179)
    at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
    at org.apache.spark.sql.rapids.GpuJsonToStructs.$anonfun$doColumnar$1(GpuJsonToStructs.scala:177)
    at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)


scala> import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.types.{IntegerType, StructType}

scala> val df = Seq("""{"a" 1}""").toDS()
df: org.apache.spark.sql.Dataset[String] = [value: string]

scala> df.write.mode("OVERWRITE").parquet("TEMP")

scala> val df2 = spark.read.parquet("TEMP")
df2: org.apache.spark.sql.DataFrame = [value: string]

scala> val schema = new StructType().add("a", IntegerType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,true))

scala> var parsed = df2.select(from_json($"value", schema))
parsed: org.apache.spark.sql.DataFrame = [from_json(value): struct<a: int>]

scala> parsed.show()
|          {null}|
revans2 commented 1 month ago

@Feng-Jiang28 Thanks for submitting this. I put in some fixes into CUDF, but it still requires a small change to the plugin to help address it. I thought I had put up a PR for it. But I guess I forgot to. If you want to give it a try you just need to change


So it passes numRows as the last argument after ds. This will let the CUDF code know that if it sees a situation where there were no columns could be returned (a limitation in the current CUDF JSON parser) that it can create null columns of for the schema passed in. If you do want to do it let me know, otherwise I'll try to get to it next week.

Feng-Jiang28 commented 1 month ago

No problem Bobby, I will I would put a PR for it