Open zqhxuyuan opened 8 years ago
@zqhxuyuan Spark 的 stage 和 stage 是按照前后顺序来下发的,所以先有左上和左下的 RDD 分别计算完成,再有右边的 RDD 通过 iterator 的方式由后面触发同一个 stage 里前面的元素的计算。Hope this helps!
谢谢两位作者的分享! @lw-lin 您好!正好在看 Jerry 文章的时候对照着 Spark Streaming 遇到了一个问题,想请教一下。
groupByKey 在 map 端是不做 combine 的。PariRDDFunctions.scala 中,groupByKey 在调用 combineByKeyWithTag 的时候,mapSideCombine 的参数设了 false(默认是true)
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
但是在 Streaming 里,我发现它没用做这个处理。PairDStreamFunctions.scala 中,groupByKey 在调用 combineByKey 的时候却没有设置 mapSideCombine 的值。您觉得这里是它忘了吗?还是有什么道理?
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = ssc.withScope {
val createCombiner = (v: V) => ArrayBuffer[V](v)
val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
.asInstanceOf[DStream[(K, Iterable[V])]]
@Angryrou good catch.
我翻一下 git blame,5 年前的时候 core 和 streaming 都没有特别设置 mapSideCombine, 参见 streaming 的这里。然后 4 年前,core 这里做了修改,设置了 mapSideCombine = false, 但没有同时修改 streaming 这边。所以就这样了。
你可以给 streaming 这边提个 PR,fix 下 :)
@lw-lin 恩恩 谢谢!
这里划分task的时候我的理解是从最后的task往前倒退,如果依赖什么就计算什么 那么以FlatMappedValuesRDD的第一个Partition为例,回退到左上角的时候ShuffleRDD的第二个和第三个Partition不应该被计算吧。这两个Partition的线不应该是粗线吧。