bartosz25 / spark-scala-playground

Sample processing code using Spark 2.1+ and Scala
50 stars 25 forks source link

whole stage generated code for a simple query #23

Open bithw1 opened 4 years ago

bithw1 commented 4 years ago

Hi, @bartosz25 ,

I have a simple test case that would like to see whole stage generated code

  test("whole stage code gen test") {
    val spark = SparkSession.builder().enableHiveSupport().master("local").appName("whole stage code gen test").getOrCreate()
    import spark.implicits._
    Seq(("A", 1), ("B", 2), ("C", 3)).toDF("name", "age").createOrReplaceTempView("t")
    val df = spark.sql(
      """
        select name, age from t  where age > 2
      """.stripMargin(' '))
    df.explain(true)
    df.show()
    spark.stop()
  }

following is snippet of generated code:

final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
private Object[] references;
private scala.collection.Iterator[] inputs;  
private scala.collection.Iterator localtablescan_input_0;
private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] filter_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];

public GeneratedIteratorForCodegenStage1(Object[] references) {
this.references = references;
}
public void init(int index, scala.collection.Iterator[] inputs) {
partitionIndex = index;
this.inputs = inputs; 
localtablescan_input_0 = inputs[0]; //LocalTableScanExec
filter_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 32);
filter_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 64);

}

I don't understand varaible filter_mutableStateArray_0, it is created for the FilterExec(the varable name starts with filter), I think this variable should be created for ProjectExec, that should be named project_mutableStateArray_0(it is of type UnsafeRowWriter),

I am not sure why this variable is created for FilterExec, Could you please have a look? Thanks!

bartosz25 commented 4 years ago

Hi @bithw1

I will take a look at your example today or tomorrow. Thank you.

bartosz25 commented 4 years ago

Re @bithw1

I didn't succeed to reproduce the same generated code. Do you have any specific setup? The code I get every time looks like:

/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.Unsaf
eRowWriter[1];
/* 009 */
/* 010 */   public SpecificUnsafeProjection(Object[] references) {
/* 011 */     this.references = references;
/* 012 */     mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 64);
/* 013 */
/* 014 */   }
/* 015 */
/* 016 */   public void initialize(int partitionIndex) {
/* 017 */
/* 018 */   }
bithw1 commented 4 years ago

Thanks @bartosz25 .

I print out the message from WholeStageCodegenExec#doExecute,

  override def doExecute(): RDD[InternalRow] = {
    val (ctx, cleanedSource) = doCodeGen()
    println("cleanedSource#" + cleanedSource.body)  //and println to print the genereted code

I am using the code of the Spark's master branch, so that, I am running against the latest spark code base.I am not sure whether older'version will print out the similar generated code.

bartosz25 commented 4 years ago

OK, that explains why we got different results :) I launched the code against Spark 3 preview 2 and also got a single scan stage:

[2020-01-07 06:20:10,399] org.apache.spark.internal.Logging DEBUG 
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private boolean resultIsNull_0;
/* 009 */   private java.lang.String[] mutableStateArray_0 = new java.lang.String[1];
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_1 = new org.apache.spark.sql.catalyst.expressions.codegen.Unsaf
eRowWriter[1];
/* 011 */
/* 012 */   public SpecificUnsafeProjection(Object[] references) {
/* 013 */     this.references = references;
/* 014 */
/* 015 */     mutableStateArray_1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 32);
/* 016 */
/* 017 */   }
/* 018 */

And the plans + result:

== Parsed Logical Plan ==
'Project ['name, 'age]
+- 'Filter ('age > 2)
   +- 'UnresolvedRelation [t]

== Analyzed Logical Plan ==
name: string, age: int
Project [name#7, age#8]
+- Filter (age#8 > 2)
   +- SubqueryAlias `t`
      +- Project [_1#2 AS name#7, _2#3 AS age#8]
         +- LocalRelation [_1#2, _2#3]

== Optimized Logical Plan ==
LocalRelation [name#7, age#8]

== Physical Plan ==
LocalTableScan [name#7, age#8]

+----+---+
|name|age|
+----+---+
|   C|  3|
+----+---+

Maybe check the preview branch because master seems to be still work in progress (26 127 commits vs 26 004 for the preview), or stay for now with 2.4.4 :) To debug the code generated stage, you can also use this tip instead of adding printing in the framework: https://www.waitingforcode.com/tips/spark-sql/how_show_generated_code :)

bithw1 commented 4 years ago

thanks @bartosz25 . Yes, The debugCodegen feature prints exactly the same thing as I did by inserting a println statement in the code, as debugCodegen will call WholeStageCodegenExec#doCodegen method.

hmm...I am kind of surprised that we saw the different output,

https://github.com/apache/spark/blob/v2.4.0/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

above is 2.4.0, we should see line 549, which is:

/* 011 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {

bartosz25 commented 4 years ago

Let's confirm first if the output is really different for you if you launch the code against 2.4.0.

above is 2.4.0, we should see line 549, which is:

/ 011 / final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {

Maybe there is no change in WholeStageCodegenExec but somewhere earlier in the planning? I didn't follow what happen on master ever day, I do only when I write new posts ;-) IMO if you try to understand what happens, it's better to test on a stable version, eventually beta if you're really curious :P

Could you try then to run the code on top of 2.4.0 to see if you also get different plan than I do?