apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.22k stars 439 forks source link

[VL] TPCDS q05 failed due to exception in ColumnarBuildSideRelation #7807

Closed leoluan2009 closed 3 weeks ago

leoluan2009 commented 3 weeks ago

Backend

VL (Velox)

Bug description

error message: 2024-11-04 22:41:56,466 [ERROR] [dynamicpruning-0] TaskResources: Task -1 failed by error: java.lang.IllegalStateException: Couldn't find d_date_sk#376 in [d_date_sk#103] at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:405) at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:73) at org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:94) at scala.collection.immutable.List.map(List.scala:293) at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReferences(BoundAttribute.scala:94) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:161) at org.apache.spark.sql.execution.ColumnarBuildSideRelation$$anon$2.next(ColumnarBuildSideRelation.scala:144) at org.apache.spark.sql.execution.ColumnarBuildSideRelation$$anon$2.next(ColumnarBuildSideRelation.scala:111) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.TraversableOnce$FlattenOps$$anon$2.hasNext(TraversableOnce.scala:521) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:366) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364) at scala.collection.AbstractIterator.to(Iterator.scala:1431) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339) at scala.collection.AbstractIterator.toArray(Iterator.scala:1431) at org.apache.spark.sql.execution.ColumnarBuildSideRelation.$anonfun$transform$1(ColumnarBuildSideRelation.scala:175) at org.apache.spark.task.TaskResources$.runUnsafe(TaskResources.scala:99) at org.apache.spark.sql.execution.ColumnarBuildSideRelation.transform(ColumnarBuildSideRelation.scala:88) at org.apache.spark.sql.execution.ColumnarSubqueryBroadcastExec.$anonfun$relationFuture$3(ColumnarSubqueryBroadcastExec.scala:80) at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25) at org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37) at org.apache.spark.sql.execution.ColumnarSubqueryBroadcastExec.$anonfun$relationFuture$2(ColumnarSubqueryBroadcastExec.scala:75) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:177) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:175) at org.apache.spark.sql.execution.ColumnarSubqueryBroadcastExec.$anonfun$relationFuture$1(ColumnarSubqueryBroadcastExec.scala:73) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

Spark version

Spark-3.5.x

Spark configurations

No response

System information

No response

Relevant logs

No response

leoluan2009 commented 3 weeks ago

cased by https://github.com/apache/incubator-gluten/pull/7704

PHILO-HE commented 3 weeks ago

@yikf, could you help take a look?

FelixYBW commented 3 weeks ago

@PHILO-HE why our CI can pass?

PHILO-HE commented 3 weeks ago

@PHILO-HE why our CI can pass?

It's strange. @leoluan2009, could you share your key configuration with us?

leoluan2009 commented 3 weeks ago

@PHILO-HE my config is: spark.plugins org.apache.gluten.GlutenPlugin spark.memory.offHeap.enabled true spark.memory.offHeap.size 20g spark.executor.memoryOverhead 2g spark.gluten.sql.columnarToRowMemoryThreshold 256MB spark.shuffle.manager org.apache.spark.shuffle.sort.ColumnarShuffleManager spark.driver.extraClassPath /usr/local/service/spark/gluten/gluten-velox-bundle-spark3.5_2.12-xxx_x86_64-1.3.0-SNAPSHOT.jar spark.executor.extraClassPath /usr/local/service/spark/gluten/gluten-velox-bundle-spark3.5_2.12-xxx_x86_64-1.3.0-SNAPSHOT.jar

yikf commented 3 weeks ago

I will take a look

yikf commented 3 weeks ago

@PHILO-HE @FelixYBW @leoluan2009 This should be due to the fact that the output of ColumnarBuildSideRelation is the output of the current side's child node, while buildKeys may be a reference to the opposite side, and I will try to use an index to bound it. Sorry for the confusion.

yikf commented 3 weeks ago

I will try to summarize this problem. the phenomenon and cause of the problem are that the reference of key cannot be found in output.

During the dynamic pruning process, the execution plan is usually ColumnarSubqueryBroadcastExec -> ColumnarBroadcastExchangeExec -> ...

And the buildKeys and child of ColumnarSubqueryBroadcastExec usually come from the same side of Join. so ideally, this bound reference method will not have problems. however, when reuse exchange is applied, the output of child may change. At this time, the output may not contain the reference of buildKeys.

In the case of applying reuse exchange, name can be used for bound reference. But the limitation here is that it does not support that the child contains multiple outputs with the same name. this limitation is unreasonable.

The reason for this situation is that the transformation of relation in gluten currently occurs in ColumnarSubqueryBroadcastExec. We should refer to Spark's approach and perform the transformation in the child. In this way, even if reuse exchange occurs, the buildKeys(from BroadcastMode not SubqueryBroadcastExec) and output of the child are always connected. And ColumnarSubqueryBroadcastExec only needs to obtain the required value from the BuildRelation of the child node (at this time it is the output of buildKeys) according to the index.

The permanent fix for this problem is divided into two stages:

  1. Short-term solution. after reuse exchange, use name for binding. this solution is similar to the previous implementation of https://github.com/apache/incubator-gluten/pull/7704, but has more support than that implementation. for example, it supports that the key contains multiple attrs. the short-term solution limits that the output of the child after reuse exchange cannot contain multiple outputs with the same name.
  2. Long-term solution. the transformation occurs in the child, but it needs to be transformed according to different cases. for example, the regular BroadcastHashJoinExec is still column-based execution and does not require additional transformation. however, ColumnarSubqueryBroadcastExec needs it.

I file a pr for short-term solution, Could you help check it in your env? @leoluan2009