apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.22k stars 439 forks source link

RecordBatch field 0 should be integer when using HashPartitioning #3942

Open felipepessoto opened 12 months ago

felipepessoto commented 12 months ago

Backend

VL (Velox)

Bug description

When I use ColumnarShuffleExchangeExec and ShuffleExchangeExec with HashPartitioning I receive the error below. I tried to isolate the issue and created this repro code. It only fails when partitioning is HashPartitioning. Single and RoundRobin works fines.

I created it based on examples I found in the Gluten source, like https://github.com/oap-project/gluten/blob/81bb6c9b0652ec4df39e6f50c0405756b59d5a3d/gluten-core/src/main/scala/io/glutenproject/execution/TakeOrderedAndProjectExecTransformer.scala#L103

Repro code:

val sourcePath = "/tmp/test/source"
val outputPath = "/tmp/test/target"
val pColumn = "colC"
val isTablePartitioned = true
val targetFileSize = 134217728
val numShufflePartitions = spark.sessionState.conf.numShufflePartitions

val dfSource = spark
.range(5000)
.map { _ =>
  (10L,
  11,
  scala.util.Random.nextInt(2))
}
.repartition(100)
.toDF("colA", "colB", "colC")

dfSource.write.partitionBy(pColumn).format("parquet").mode("overwrite").save(sourcePath)

val df = spark.read.format("parquet").load(sourcePath)

val physicalPlan = df.queryExecution.executedPlan

println("\n\n physicalPlan: " + physicalPlan)

val originalPlan = physicalPlan

//Remove top C2R
val planWithoutC2R = physicalPlan.children(0)

val partitionColumnsExpr = Array(planWithoutC2R.output.find(c => c.name.equals(pColumn)).get)
val partitioning = HashPartitioning(partitionColumnsExpr, numShufflePartitions)
// These two works
// SinglePartition
// RoundRobinPartitioning(numShufflePartitions)

val shuffleExec = ShuffleExchangeExec(partitioning, planWithoutC2R)
val transformedShuffleExec =
      ColumnarShuffleExchangeExec(shuffleExec, planWithoutC2R, shuffleExec.child.output)

// TransformHints.tag(shuffleExec, transformedShuffleExec.doValidate().toTransformHint)

// Add C2R back
val planWithC2R = VeloxColumnarToRowExec(transformedShuffleExec)

val planToWrite = planWithC2R

println("\n\n planToWrite: " + planToWrite)
println("X: " + planToWrite.execute().first().getLong(0))

Spark version

Spark-3.3.x

Spark configurations

Running using unit tests

System information

No response

Relevant logs

[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4.0 (TID 104) (172.31.117.175 executor driver): java.lang.RuntimeException: Exception: VeloxRuntimeError
[info] Error Source: RUNTIME
[info] Error Code: INVALID_STATE
[info] Reason: RecordBatch field 0 should be integer
[info] Retriable: False
[info] Expression: firstChild->type()->isInteger()
[info] Function: getFirstColumn
[info] File: /__w/1/s/Gluten/cpp/velox/shuffle/VeloxShuffleWriter.cc
[info] Line: 96
[info] Stack trace:
[info] # 0  _ZN8facebook5velox7process10StackTraceC1Ei
[info] # 1  _ZN8facebook5velox14VeloxExceptionC1EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_
[info] # 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_17VeloxRuntimeErrorEPKcEEvRKNS1_18VeloxCheckFailArgsET0_
[info] # 3  0x0000000000000000
[info] # 4  _ZN6gluten18VeloxShuffleWriter5splitESt10shared_ptrINS_13ColumnarBatchEEl
[info] # 5  Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper_split
[info] # 6  0x00007f1855018427
[info] 
[info]  at io.glutenproject.vectorized.ShuffleWriterJniWrapper.split(Native Method)
[info]  at org.apache.spark.shuffle.ColumnarShuffleWriter.internalWrite(ColumnarShuffleWriter.scala:163)
[info]  at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:218)
[info]  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
[info]  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
[info]  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
[info]  at org.apache.spark.scheduler.Task.run(Task.scala:136)
[info]  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
[info]  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
[info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
[info]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]  at java.lang.Thread.run(Thread.java:750)
[info] 
[info] Driver stacktrace:
[info]   at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2682)
[info]   at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2618)
[info]   at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2617)
[info]   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
[info]   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
[info]   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2617)
[info]   at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1190)
[info]   at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1190)
[info]   at scala.Option.foreach(Option.scala:407)
[info]   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1190)
[info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2870)
[info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2812)
[info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2801)
[info]   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
[info]   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:958)
[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2344)
[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2365)
[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2384)
[info]   at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1477)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[info]   at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
[info]   at org.apache.spark.rdd.RDD.take(RDD.scala:1450)
[info]   at org.apache.spark.rdd.RDD.$anonfun$first$1(RDD.scala:1491)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[info]   at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
[info]   at org.apache.spark.rdd.RDD.first(RDD.scala:1491)
[info]   at org.apache.spark.sql.TestGluten2.$anonfun$new$1(TestGluten2.scala:95)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:203)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:64)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.sql.TestGluten2.org$scalatest$BeforeAndAfterEachTestData$$super$runTest(TestGluten2.scala:41)
[info]   at org.scalatest.BeforeAndAfterEachTestData.runTest(BeforeAndAfterEachTestData.scala:213)
[info]   at org.scalatest.BeforeAndAfterEachTestData.runTest$(BeforeAndAfterEachTestData.scala:206)
[info]   at org.apache.spark.sql.TestGluten2.runTest(TestGluten2.scala:41)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563)
[info]   at org.scalatest.Suite.run(Suite.scala:1112)
[info]   at org.scalatest.Suite.run$(Suite.scala:1094)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:64)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:64)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:750)
[info]   Cause: java.lang.RuntimeException: Exception: VeloxRuntimeError
[info] Error Source: RUNTIME
[info] Error Code: INVALID_STATE
[info] Reason: RecordBatch field 0 should be integer
[info] Retriable: False
[info] Expression: firstChild->type()->isInteger()
[info] Function: getFirstColumn
[info] File: /__w/1/s/Gluten/cpp/velox/shuffle/VeloxShuffleWriter.cc
[info] Line: 96
[info] Stack trace:
[info] # 0  _ZN8facebook5velox7process10StackTraceC1Ei
[info] # 1  _ZN8facebook5velox14VeloxExceptionC1EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_
[info] # 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_17VeloxRuntimeErrorEPKcEEvRKNS1_18VeloxCheckFailArgsET0_
[info] # 3  0x0000000000000000
[info] # 4  _ZN6gluten18VeloxShuffleWriter5splitESt10shared_ptrINS_13ColumnarBatchEEl
[info] # 5  Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper_split
[info] # 6  0x00007f1855018427
[info]   at io.glutenproject.vectorized.ShuffleWriterJniWrapper.split(Native Method)
[info]   at org.apache.spark.shuffle.ColumnarShuffleWriter.internalWrite(ColumnarShuffleWriter.scala:163)
[info]   at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:218)
[info]   at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
[info]   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
[info]   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
[info]   at org.apache.spark.scheduler.Task.run(Task.scala:136)
[info]   at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
[info]   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
[info]   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:750)
marin-ma commented 12 months ago

As for Velox backend, the computation of HashPartitioning partition id is achieved by employing a Pre-projection operator before shuffle operator. So, it requires the first column to be integer type on native side computation.

https://github.com/oap-project/gluten/blob/06b9d476c5b602a7ea337e28391ea0133559888b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala#L384-L396

We should always rely on Gluten's ColumnarRules to make the SQL being planned correctly.

felipepessoto commented 12 months ago

I tried to add a project similarly to getProjectWithHash method but then started to receive this error.

I’ll try again to double check.

[info] Error Source: RUNTIME [info] Error Code: INVALIDSTATE [info] Reason: (16384 vs. 32768) [info] Retriable: False [info] Expression: values->capacity() >= byteSize [info] Function: FlatVector [info] File: /__w/1/s/Velox/velox/vector/FlatVector.h   It seems it is trying to read the first column, which should be the result of hash (Int32), but it is actually the first columns from table (which is a int 64)

marin-ma commented 12 months ago

I tried to add a project similarly to getProjectWithHash method but then started to receive this error.

I’ll try again to double check.

[info] Error Source: RUNTIME [info] Error Code: INVALIDSTATE [info] Reason: (16384 vs. 32768) [info] Retriable: False [info] Expression: values->capacity() >= byteSize [info] Function: FlatVector [info] File: /__w/1/s/Velox/velox/vector/FlatVector.h   It seems it is trying to read the first column, which should be the result of hash (Int32), but it is actually the first columns from table (which is a int 64)

Could you share the code for the implemented getProjectWithHash function? The murmurhash3 in Velox has the return type of int32, so the expected data type of the "hash partition id" column is also int32. Additionally, I'm curious about the approach you're using to invoke the API, as it appears to differ from the typical usage of Gluten.

felipepessoto commented 11 months ago

This is the version with the project.

  test("test spark gluten code only - smaller code") {
    val sourcePath = "/tmp/test/source"
    val outputPath = "/tmp/test/target"
    val pColumn = "colC"
    val isTablePartitioned = true
    val targetFileSize = 134217728
    val numShufflePartitions = spark.sessionState.conf.numShufflePartitions

    val df = createDf(sourcePath, pColumn)
    val physicalPlan = df.queryExecution.executedPlan

    println("\n\n physicalPlan: " + physicalPlan)

    val originalPlan = physicalPlan

    //Remove top C2R
    val planWithoutC2R = physicalPlan.children(0)

    val partitionColumnsExpr = Array(planWithoutC2R.output.find(c => c.name.equals(pColumn)).get)
    val partitioning = HashPartitioning(partitionColumnsExpr, numShufflePartitions)
    // These two works
    // SinglePartition
    // RoundRobinPartitioning(numShufflePartitions)

    val planWithProject = getProjectWithHash(partitionColumnsExpr, planWithoutC2R) // Added this
    val shuffleExec = ShuffleExchangeExec(partitioning, planWithProject)
    val transformedShuffleExec =
          ColumnarShuffleExchangeExec(shuffleExec, planWithProject, planWithProject.output.drop(1)) // planWithProject.output)

    // TransformHints.tag(shuffleExec, transformedShuffleExec.doValidate().toTransformHint)

    val addC2R = false // true

    val finalPlan = if (addC2R) {
      VeloxColumnarToRowExec(transformedShuffleExec)
    } else {
      transformedShuffleExec
    }

    println("\n\n finalPlan: " + finalPlan)
    println("\n\n finalPlan.supportsColumnar: " + finalPlan.supportsColumnar)

    if(finalPlan.supportsColumnar) {
      println("executeColumnar: " + finalPlan.executeColumnar().first().getRow(0))
    } else {
      println("execute: " + finalPlan.execute().first().getLong(0))
    }  
  }

  def createDf(sourcePath: String, pColumn: String): DataFrame = {
    val dfSource = spark
    .range(5000)
    .map { _ =>
      (10L,
      11,
      scala.util.Random.nextInt(2))
    }
    .repartition(100)
    .toDF("colA", "colB", "colC")

    dfSource.write.partitionBy(pColumn).format("parquet").mode("overwrite").save(sourcePath)

    val df = spark.read.format("parquet").load(sourcePath)
    df
  }

  private def getProjectWithHash(exprs: Seq[Expression], child: SparkPlan): SparkPlan = {
    val hashExpression = new Murmur3Hash(exprs)
    hashExpression.withNewChildren(exprs)

    val project = ProjectExec( // Also tried ProjectExec / ProjectExecTransformer
        Seq(Alias(hashExpression, "hash_partition_key")()) ++ child.output, child)
    AddTransformHintRule().apply(project)

    val projectWithHint = TransformHints.getHint(project) match {
      case _: TRANSFORM_SUPPORTED =>
        // Tested with TransformPreOverrides(true/false)
        println("TRANSFORM_SUPPORTED")
        TransformPreOverrides(true).replaceWithTransformerPlan(project)
      case _: TRANSFORM_UNSUPPORTED =>
        println("TRANSFORM_UNSUPPORTED")
        project
    }

    val transformStageCounter = ColumnarCollapseTransformStages.transformStageCounter
    WholeStageTransformer(projectWithHint)(transformStageCounter.incrementAndGet())
  }

And this is the error:

[info] - test spark gluten code only - smaller code *** FAILED *** (6 seconds, 88 milliseconds)
[info]   java.lang.UnsupportedOperationException: This operator doesn't support doTransform with SubstraitContext.
[info]   at io.glutenproject.execution.TransformSupport.doTransform(WholeStageTransformer.scala:71)
[info]   at io.glutenproject.execution.TransformSupport.doTransform$(WholeStageTransformer.scala:69)
[info]   at io.glutenproject.execution.WholeStageTransformer.doTransform(WholeStageTransformer.scala:96)
[info]   at io.glutenproject.execution.ProjectExecTransformer.doTransform(BasicPhysicalOperatorTransformer.scala:221)
[info]   at io.glutenproject.execution.WholeStageTransformer.generateWholeStageTransformContext(WholeStageTransformer.scala:189)
[info]   at io.glutenproject.execution.WholeStageTransformer.doWholeStageTransform(WholeStageTransformer.scala:222)
[info]   at io.glutenproject.execution.WholeStageTransformer.$anonfun$doExecuteColumnar$1(WholeStageTransformer.scala:275)
[info]   at io.glutenproject.metrics.GlutenTimeMetric$.withNanoTime(GlutenTimeMetric.scala:41)
[info]   at io.glutenproject.metrics.GlutenTimeMetric$.withMillisTime(GlutenTimeMetric.scala:46)
[info]   at io.glutenproject.execution.WholeStageTransformer.doExecuteColumnar(WholeStageTransformer.scala:290)
[info]   at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:257)
felipepessoto commented 11 months ago

@marin-ma I tried to change it to rely on Gluten API to add the projection. I still seeing some errors, am I using the API incorrectly?

In summary I'm adding a ShuffleExchangeExec and calling replaceWithTransformerPlan which automatically adds the project. It fails because it doesn't support columnar, so I need to wrap it with WholeStage, then it throws another error:

Without WholeStage:

[info] - test spark gluten code only - smaller code FAILED (5 seconds, 920 milliseconds) [info] java.lang.UnsupportedOperationException: This operator doesn't support doExecuteColumnar(). [info] at io.glutenproject.execution.ProjectExecTransformer.doExecuteColumnar(BasicPhysicalOperatorTransformer.scala:297) [info] at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:257)

After adding WholeStage:

[info] java.lang.UnsupportedOperationException: This operator doesn't support doTransform with SubstraitContext. [info] at io.glutenproject.execution.TransformSupport.doTransform(WholeStageTransformer.scala:71) [info] at io.glutenproject.execution.TransformSupport.doTransform$(WholeStageTransformer.scala:69) [info] at io.glutenproject.execution.WholeStageTransformer.doTransform(WholeStageTransformer.scala:96) [info] at io.glutenproject.execution.ProjectExecTransformer.doTransform(BasicPhysicalOperatorTransformer.scala:221) [info] at io.glutenproject.execution.WholeStageTransformer.generateWholeStageTransformContext(WholeStageTransformer.scala:189)

    val df = createDf(sourcePath, pColumn)
    val physicalPlan = df.queryExecution.executedPlan

    println("\n\n physicalPlan: " + physicalPlan)

    // Remove top C2R
    val planWithoutC2R = physicalPlan.children(0)
    println("\n\n planWithoutC2R: " + planWithoutC2R)

    // Add ShuffleExchangeExec
    val partitionColumnsExpr = Array(planWithoutC2R.output.find(c => c.name.equals(pColumn)).get)
    val partitioning: Partitioning = HashPartitioning(partitionColumnsExpr, numShufflePartitions)
    val shuffleExec = ShuffleExchangeExec(partitioning, planWithoutC2R, REPARTITION_BY_COL)
    println("\n\n shuffleExec: " + shuffleExec)

    val finalPlan = {
      // Add C2R
      val planWithC2R = VeloxColumnarToRowExec(shuffleExec)
      AddTransformHintRule().apply(planWithC2R) 
      val afterTransform = TransformPreOverrides(false).replaceWithTransformerPlan(planWithC2R).asInstanceOf[VeloxColumnarToRowExec]
      println("\n\n afterTransform: " + afterTransform)

      val wrapProjectWithWholeStage = true
      if (wrapProjectWithWholeStage) {
        // If I don't wrap the project, it throws a error: This operator doesn't support doExecuteColumnar().
        val transformStageCounter = ColumnarCollapseTransformStages.transformStageCounter
        val projectWrapped = WholeStageTransformer(afterTransform.child.asInstanceOf[ColumnarShuffleExchangeExec].child)(transformStageCounter.incrementAndGet())
        val afterSurgery = afterTransform.makeCopy(Array(afterTransform.child.asInstanceOf[ColumnarShuffleExchangeExec].copy(child = projectWrapped)))

        AddTransformHintRule().apply(afterSurgery) 
        TransformPreOverrides(false).replaceWithTransformerPlan(afterSurgery)
      } else {
        afterTransform
      }
    }

    println("\n\n finalPlan: " + finalPlan)
    println("\n\n finalPlan.supportsColumnar: " + finalPlan.supportsColumnar)

    println("execute: " + finalPlan.execute().first().getLong(0))
felipepessoto commented 11 months ago

Forgot to mention, another test I did, is wrapping ShuffleExchangeExec with ColumnarShuffleExchangeExec:

    val transformedShuffleExec = // shuffleExec
          ColumnarShuffleExchangeExec(shuffleExec, planWithProject, planWithoutC2R.output)

In this case I don't receive the java.lang.UnsupportedOperationException: This operator doesn't support doExecuteColumnar().. But then I'm back to the RecordBatch field 0 should be integer

  test("test spark gluten code only - smaller code") {
    val sourcePath = "/tmp/test/source"
    val outputPath = "/tmp/test/target"
    val pColumn = "colC"
    val numShufflePartitions = spark.sessionState.conf.numShufflePartitions

    val df = createDf(sourcePath, pColumn)
    val physicalPlan = df.queryExecution.executedPlan

    println("\n\n physicalPlan: " + physicalPlan)

    val originalPlan = physicalPlan

    //Remove top C2R
    val planWithoutC2R = physicalPlan.children(0)
    println("\n\n planWithoutC2R: " + planWithoutC2R)

    val partitionColumnsExpr = Array(planWithoutC2R.output.find(c => c.name.equals(pColumn)).get)
    val partitioning: Partitioning = HashPartitioning(partitionColumnsExpr, numShufflePartitions)
    // These two works
    // SinglePartition
    // RoundRobinPartitioning(numShufflePartitions)

    val shuffleExec = ShuffleExchangeExec(partitioning, planWithoutC2R, REPARTITION_BY_COL)
    val transformedShuffleExec = ColumnarShuffleExchangeExec(shuffleExec, planWithoutC2R, planWithoutC2R.output)

    AddTransformHintRule().apply(transformedShuffleExec) 
    val finalPlan = TransformPreOverrides(false).replaceWithTransformerPlan(transformedShuffleExec)

    println("\n\n finalPlan: " + finalPlan)
    println("execute: " + VeloxColumnarToRowExec(finalPlan).execute.first().getLong(0))
  }

  def createDf(sourcePath: String, pColumn: String): DataFrame = {
    val dfSource = spark
    .range(5000)
    .map { _ =>
      (10L,
      11,
      scala.util.Random.nextInt(2))
    }
    .repartition(100)
    .toDF("colA", "colB", "colC")

    dfSource.write.partitionBy(pColumn).format("parquet").mode("overwrite").save(sourcePath)

    val df = spark.read.format("parquet").load(sourcePath)
    df
  }
felipepessoto commented 11 months ago

I partially found the problem and fix it with a hack, explaining it inline. Chances are I'm doing something wrong as this is my first time using Gluten/Velox.

Assuming planWithoutC2R is a simple:

+- ^(11) NativeFileScan parquet [colA#552L,colB#553,colC#554] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[file:/tmp/spark-8db878d9-c7e8-4f16-8c53-ebfe2e28f35b], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<colA:bigint,colB:int>

    val partitionColumnsExpr = Array(planWithoutC2R.output.find(c => c.name.equals(pColumn)).get)
    val partitioning: Partitioning = HashPartitioning(partitionColumnsExpr, numShufflePartitions)

    val shuffleExec = ShuffleExchangeExec(partitioning, planWithoutC2R, REPARTITION_BY_COL)

    AddTransformHintRule().apply(shuffleExec)
    val finalPlanP = TransformPreOverrides(true).replaceWithTransformerPlan(shuffleExec)
      .asInstanceOf[ColumnarShuffleExchangeExec]
    println("\n\n finalPlanP: " + finalPlanP.supportsColumnar + " - " + finalPlanP)
    // VeloxColumnarToRowExec(finalPlanP).execute.count
    // I expect VeloxColumnarToRowExec(finalPlan).execute.count to work
    // But it throws an error saying ProjectExecTransformer doesn't support columnar

    // ProjectExecTransformer needs to be wrapped with WholeStageTransformer
    // For some reason the replaceWithTransformerPlan doesnt wrap it and if
    // we wrap it manually it doesnt work with error:
// [info]   java.lang.UnsupportedOperationException: This operator doesn't support doTransform with SubstraitContext.
// [info]   at io.glutenproject.execution.TransformSupport.doTransform(WholeStageTransformer.scala:71)
// [info]   at io.glutenproject.execution.TransformSupport.doTransform$(WholeStageTransformer.scala:69)
// [info]   at io.glutenproject.execution.WholeStageTransformer.doTransform(WholeStageTransformer.scala:96)
// [info]   at io.glutenproject.execution.ProjectExecTransformer.doTransform(BasicPhysicalOperatorTransformer.scala:221)

  // This first hack didn't work
    // val transformStageCounter = ColumnarCollapseTransformStages.transformStageCounter
    // val projectWrapped = WholeStageTransformer(project)(transformStageCounter.incrementAndGet())
    // val finalPlanP2 = finalPlanP.makeCopy(Array(WholeStageTransformer(
    //   finalPlanP.children(0))(transformStageCounter.incrementAndGet())))
    // println("\n\n finalPlanP2: " + finalPlanP2.supportsColumnar + " - " + finalPlanP2)

    // I workaround it by using TakeOrderedAndProjectExecTransformer, which supports Columnar without WholeStageTransformer
    val project = finalPlanP.child.asInstanceOf[ProjectExecTransformer]
    val projectWrapped = TakeOrderedAndProjectExecTransformer(Int.MaxValue, Seq.empty, project.projectList, project.child)
    val finalPlanP2 = finalPlanP.copy(outputPartitioning = finalPlanP.outputPartitioning, child = projectWrapped)

    AddTransformHintRule().apply(finalPlanP2)
    val finalPlan = finalPlanP2

    println("\n\n finalPlan: " + finalPlan)

    println("execute: " + VeloxColumnarToRowExec(finalPlan).execute.count)
felipepessoto commented 11 months ago

Another thing I may need to find an workaround, I was planning to inherit from Exchange or ShuffleExchangeLike, but Gluten uses the ShuffleExchangeExec (which is a case class): https://github.com/oap-project/gluten/blob/18b33d0cb4dfcd54edbfebeb13f0d2d7eb3dcaf6/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala#L381

marin-ma commented 11 months ago

@felipepessoto Could you let us know the reason you are using the API in this manner? Gluten provides lots of columnar rules to ensure the correct formulation of the final physical plan. It appears, however, that you might not be fully utilizing these columnar rules of Gluten to guarantee the optimization of the final physical plan. Instead, it seems you are attempting to rewrite some logic within the columnar rules. Understanding your rationale or method here would be helpful.

felipepessoto commented 11 months ago

@marin-ma I'm writing a Columnar version of Optimized Write: https://github.com/delta-io/delta/pull/1198/files#diff-5648029472acd991211c6216d31879fe87f33e6aba46653e7a10ccd3bcff8389 (OptimizeWriteExchangeExec).

My initial expectation was I'd only need to add a doExecuteColumnar(), replacing ShuffledRowRDD by ShuffledColumnarBatchRDD and a shuffleDependency that uses ColumnarShuffleExchangeExec.prepareShuffleDependency instead of ShuffleExchangeExec.prepareShuffleDependency. Which works. as long I don't use HashPartitioning.

To use HashPartitioning I had to add the call to replaceWithTransformerPlan, and the hack to fix that plan generated by it.

Thanks.

felipepessoto commented 11 months ago

@marin-ma, with https://github.com/oap-project/gluten/pull/4167 and changing OptimizeWriteExchangeExec to inherit from ShuffleExchangeLike I can make it work (at least it doesn't throw errors, I'm still validating the behavior). But still requires workarounds.

Could you help me understand what I'm doing wrong? I have some questions to understand how Gluten works:

  1. Always that I manually change the plan (like we do here: https://github.com/delta-io/delta/pull/1198/files#diff-da2c9be25dd00a5a2abbdabc45387415eb511c7dba019cfbb175c222286fc5f5R306), do I need to call replaceWithTransformerPlan?:
    AddTransformHintRule().apply(owPlanWraped)
    TransformPreOverrides(false).replaceWithTransformerPlan(owPlanWraped)

In some cases we don't call it:

https://github.com/oap-project/gluten/blob/fcb31fc6b09ece3be76131caaba81681a3b7b53c/gluten-delta/src/main/scala/io/glutenproject/extension/DeltaRewriteTransformerRules.scala#L160

  1. How can I know if I should use TransformPreOverrides with false or true parameter?
  2. I had to workaround the lack of Columnar support of ProjectExecTransformer (that was created by replaceWithTransformerPlan) by replacing it by TakeOrderedAndProjectExecTransformer. Do you know what I'm doing wrong? 3.1. Why ProjectExecTransformer needs a WholeStage wrapper while TakeOrderedAndProjectExecTransformer doesn't? 3.2. The missing WholeStage could be a bug in Gluten side? Given the ProjectExecTransformer is created by replaceWithTransformerPlan? 3.3. Why I receive a This operator doesn't support doTransform with SubstraitContext error if I try to manually wrap the ProjectExecTransformer
marin-ma commented 11 months ago

@marin-ma I'm writing a Columnar version of Optimized Write: https://github.com/delta-io/delta/pull/1198/files#diff-5648029472acd991211c6216d31879fe87f33e6aba46653e7a10ccd3bcff8389 (OptimizeWriteExchangeExec).

My initial expectation was I'd only need to add a doExecuteColumnar(), replacing ShuffledRowRDD by ShuffledColumnarBatchRDD and a shuffleDependency that uses ColumnarShuffleExchangeExec.prepareShuffleDependency instead of ShuffleExchangeExec.prepareShuffleDependency. Which works. as long I don't use HashPartitioning.

To use HashPartitioning I had to add the call to replaceWithTransformerPlan, and the hack to fix that plan generated by it.

Thanks.

@felipepessoto No matter using hash partitioning or not, Gluten's rules are used to ensure the final physical plan being generated correctly. Therefore, I still suggest we should use GlutenPlugin rather than rewriting it.

https://github.com/delta-io/delta/pull/1198/files#diff-da2c9be25dd00a5a2abbdabc45387415eb511c7dba019cfbb175c222286fc5f5R306-R309 I have a question here. Looks like it adds OptimizedWriteExec by directly modifying the QueryExecution.executedPlan. Does this modification take effect for both AQE on/off?

felipepessoto commented 11 months ago

Yes, it should work even when AQE is enabled.

felipepessoto commented 10 months ago

The reason I need to rewrite physical plan is the existing repartition/rebalance generates a big large partition and write it as a file. Delta Optimized Write improves it generating files close to the target size, this is only possible by changing physical plan.