NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
765 stars 225 forks source link

[BUG] GpuScalarSubquery serializes the subquery plan #5889

Open jlowe opened 2 years ago

jlowe commented 2 years ago

Describe the bug Serializing GpuScalarSubquery will also serialize the underlying subquery plan which appears to be unnecessary. The underlying query plan should only be necessary on the driver which ends up collecting the plan result and poking the scalar result into this node for reference on the executor.

Steps/Code to reproduce bug Apply a serialization "poison pill" to GpuExec with a patch like this:

diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala
index 5284cf8c5..ed2957757 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala
@@ -208,7 +208,11 @@ object GpuExec {
   }
 }

+class SerializationPoisonPill {}
+
 trait GpuExec extends SparkPlan with Arm {
+  protected val poisonPill = new SerializationPoisonPill()
+
   import GpuMetric._
   def sparkSession: SparkSession = {
     SparkShimImpl.sessionFromPlan(this)

Then run a query that triggers use of a subquery scalar. NDS query 58 is one such query that does this.

Expected behavior The query plan should not be needlessly serialized in an RDD evaluation closure.

jlowe commented 2 years ago

Applying the following patch appears to fix it, but I'm not sure if there's any ramifications for this change:

diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalarSubquery.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalarSubquery.scala
index be494bbbd..61098f410 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalarSubquery.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalarSubquery.scala
@@ -31,11 +31,11 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
  * other GPU overrides.
  */
 case class GpuScalarSubquery(
-    plan: BaseSubqueryExec,
+    @transient plan: BaseSubqueryExec,
     exprId: ExprId)
   extends ExecSubqueryExpression with GpuExpression with ShimExpression {

-  override def dataType: DataType = plan.schema.fields.head.dataType
+  override lazy val dataType: DataType = plan.schema.fields.head.dataType
   override def children: Seq[Expression] = Seq.empty
   override def nullable: Boolean = true
   override def toString: String = plan.simpleString(SQLConf.get.maxToStringFields)

Note that Apache Spark's ScalarSubquery has the same issue.