InitialDLab / Simba

Spatial In-Memory Big data Analytics
Apache License 2.0
121 stars 62 forks source link

Why in the partition folder of core(of sql) should add xxxPartition.scala file #68

Closed pzz2011 closed 7 years ago

pzz2011 commented 7 years ago

the commit of Simba I mentioned in the tile is https://github.com/InitialDLab/Simba/commit/871e5930fbdcb7e5ab35e010ee6a41ed03dadb3c

I am trying to understand your implementation thoughts. Although I have readed some code of sparkcore, still have plenty of confusion.

I know that sparkcore has implemented several partition and original sparksql seems has not implemented partition. Why you should add your own partition implementation in the link above after adding Index for sparksql.

Hope to see your reply, thanks very much!

dongx-psu commented 7 years ago

There are several reasons why we build something like that:

pzz2011 commented 7 years ago

It's not very clear to me. In Spark mail list I have found some simple custom Partitioner, but Simba code of XXXXPartitioner.scala are still very strange to me.(I have compared to spark core's implementation of RangePartitioner.scala)

The following is my confusion at this moment: 1.not only in RangePartition Object but almost all partition implementation in Simba XXXpartition Object 's apply method , you use SortBasedShuffleOn to check whether to use MutablePair or not? I cannot make sense of it.

  1. I cannot make good sense of the usage of ShuffledRDD, the snippet code:
   72       val shuffled = new ShuffledRDD[Double, InternalRow, InternalRow](rdd, part)
    73      shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
    74      (shuffled, part.rangeBounds)
    75    }
    76  }

WhysetSerializer(new SparkSqlSerializer(new SparkConf(false)))?

Please send me your reply in your free time, thanks!

dongx-psu commented 7 years ago

They are legacy codes migrated from Exchange execution plan of Spark SQL 1.3. At that particular version, these codes are used to make sure compatibility for old hash based spark shuffler and a better default serializer (Kyro) is used during the procedure. Since all these codes are still functional and didn't cost any issue, I keep them as I don't really have time to look through new Exchange Plan in higher version of Spark SQL.

dongx-psu commented 7 years ago

Well, after a fast pass over the new Exchange plan, all the procedure I did is still there. They are just buried in more heuristics determining if it can use Tungsten Hash Shuffling strategy or not....

This is the particular Spark Plan I am referring to: https://github.com/InitialDLab/Simba/blob/612053d6d159eb97ae935118fac09e784d41b8e6/engine/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

If you want to learn how it works, you should read and think through it.

pzz2011 commented 7 years ago

They are legacy codes migrated from

It's true that they are come from the version you referred above.

As you mentioned Exchange.scala, I found the Spark Dev mail list the following: http://apache-spark-developers-list.1001551.n3.nabble.com/question-about-catalyst-and-TreeNode-td16663.html#a16669

Trees are immutable, and TreeNode takes care of copying unchanged parts of the tree when you are doing transformations. As a result, even if you do construct a DAG with the Dataset API, the first transformation will turn it back into a tree.

The only exception to this rule is when we share the results of plans after an Exchange operator. This is the last step before execution and sometimes turns the query into a DAG to avoid redundant computation.

From the above , I know Exchange operation happens between query to a DAG.(does it means tree to DAG(query to rdd?)?)
And I look into the execution/SparkStrategies.scala, it seems say the call site of Exchange in BasicOperation only. So why other operation e.g. SpatialJoinExtractor(your implementation) doesn't need Exchange to transform a query to DAG?

btw, your are so amazing that have published 3 paper in recent one year.:-).

dongx-psu commented 7 years ago

Reading the following Rule of Spark Planner will answer your question. To understand the whole thing, you should stand on a higher position before diving into individual implementation. It will show the elegance of this architecture and it is the correct way to read it.

https://github.com/InitialDLab/Simba/blob/612053d6d159eb97ae935118fac09e784d41b8e6/engine/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala#L273

https://github.com/InitialDLab/Simba/blob/master/engine/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L931