Closed PeterSH6 closed 3 years ago
Motivation:
这些应用需要一个特征:Efficient primitive for data sharing。但是MapReduce只能通过stable storage进行data share。这个过程十分slow Slow的原因:Replication 和 Disk I/O 因此希望通过RDD来实现In Memory Data Sharing. 同时也需要做In Memory Fault Tolerance。
引用学长的总结
Existing abstractions for in-memory storage on clusters, such as distributed shared memory, key-value stores, databases, and Piccolo, offer an interface based on fine-grained updates to mutable state (e.g., cells in a table). With this interface, the only ways to provide fault tolerance are to replicate the data across machines or to log updates across machines. Both approaches are expensive for data-intensive workloads, as they require copying large amounts of data over the cluster network, whose bandwidth is far lower than that of RAM, and they incur substantial storage overhead.
想在in-memory的存储上做容错,方法要么是replicate,要么是log updates. 但update操作都太fine-grained了,这两个方法开销都很大. 自然的思路就是针对coarse-grained操作做容错.
In constrast to these systems, RDDs provide an interface based on coarse-grained transformations (e.g., map, filter and join) that apply the same operation to many data items. This allows them to efficiently provide fault tolerance by logging the transformations used to build a dataset (its lineage) rather than the actual data. If a partition of an RDD is lost, the RDD has enough information about how it was derived from other RDDs to recomute just that partition. Thus, lost data can be recovered, often quite quickly, without requiring costly replication.
如果只是map, filter, join这样的操作,这都是stateless的function啊,做容错的最简单方法当然是recompute. 如果操作还带状态(比如machine learning里面的operator),就很麻烦了.
2节中介绍了一些例子,意思是说Spark RDD提供了一系列的stateless coarse-grained function,每一个RDD都是从最初存在stable storage中的RDD经过操作不断变化得到的。Distributed Shared Memory可以操作任意的地址,而RDD无法这样操作。
lineage记录的是一系列coarse-grained function。
As discussed in the Introduction, RDDs are best suited for batch applications that apply the same operation to all elements of a dataset. In these cases, RDDs can efficiently remember each transformation as one step in a lineage graph and can recover lost partitions without having to log large amounts of data. RDDs would be less suitable for applications that make asynchronous fine-grained updates to shared state, such as a storage system for a web application or an incremental web crawler.
对于ML任务,虽然符合batch的特征,但是因为参数是在不断迭代变化的,因此实际上”apply the same operation to all elements of a dataset“是无法满足的。每个iteration之后更新后的参数代表着不同的operator,如果需要使用lineage的思路,那么我们仍然需要记录每个iteration之后的模型参数。
回想起GShard的图,使用lineage的思路,可能只需要记录模型中一小部分每个iteration的参数(这就相当于每个iteration做checkpoint?)。这个部分使用lineage的思路去恢复,其他部分用其他方法进行恢复。
Lineage可以做到自动恢复,在中间过程中如果丢失一个RDD,子节点(依赖于这个RDD)无法进行后续运算,那么scheduler就会自动分配任务进行递归生成这个RDD。但是这个过程需要有一个scheduler。
第4节作者提出了一个很有意思的概念:narrow dependencies & wide dependencies
narrow dependencies, where each partition of the parent RDD is used by at most one partition of the child RDD, wide dependencies, where multiple child partitions may depend on it. For example, map leads to a narrow dependency, while join leads to to wide dependencies (unless the parents are hash-partitioned).
对于Narrow:父子节点之间的对应关系是n:1。图中左半部分join:如果两个RDD在进行join操作时,一个RDD的partition仅仅和另一个RDD中已知个数的Partition进行join,那么这种类型的join操作就是窄依赖,例如图1中左半部分的join操作(join with inputs co-partitioned);
对于Wide:父子节点之间的对应关系是n:m。图中右半部分join:其它情况的join操作就是宽依赖,例如图1中右半部分的join操作(join with inputs not co-partitioned),由于是需要父RDD的所有partition进行join的转换,这就涉及到了shuffle,因此这种类型的join操作也是宽依赖。
在这里我们是从父RDD的partition被使用的个数来定义窄依赖和宽依赖,因此可以用一句话概括下:如果父RDD的一个Partition被子RDD的一个Partition所使用就是窄依赖,否则的话就是宽依赖。因为是确定的partition数量的依赖关系,所以RDD之间的依赖关系就是窄依赖;由此我们可以得出一个推论:即窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖。
一对固定个数的窄依赖的理解:即子RDD的partition对父RDD依赖的Partition的数量不会随着RDD数据规模的改变而改变;换句话说,无论是有100T的数据量还是1P的数据量,在窄依赖中,子RDD所依赖的父RDD的partition的个数是确定的,而宽依赖是shuffle级别的,数据量越大,那么子RDD所依赖的父RDD的个数就越多,从而子RDD所依赖的父RDD的partition的个数也会变得越来越多。
recovery after a node failure is more efficient with a narrow dependency, as only the lost parent partitions need to be recomputed, and they can be recomputed in parallel on different nodes. In contrast, in a lineage graph with wide dependencies, a single failed node might cause the loss of some partition from all the ancestors of an RDD, requiring a complete re-execution.
确定的narrow可以直接进行pipeline恢复
还需要思考:一个模型的不同阶段,比如是Pipeline Parallelsim,可以抽象成哪一种依赖?还是说是不相关的?这个地方想不太明白。
还没细读
PDF & Talk: https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia 这一篇信息量比较大,先理解整体逻辑和思路,最后记录一下没有读的部分,以后回来再看