KTurnura / paper-notes

1 stars 0 forks source link

Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing #19

Open KTurnura opened 5 months ago

KTurnura commented 5 months ago

本论文介绍了RDD的基本概念,介绍了RDD中最重要的Lineage 概念,可以通过Lineage 结合Checkpoint 实现快速容错恢复。使用RDD实现了PageRank算法和逻辑回归算法,介绍了宽依赖和窄依赖的概念。

KTurnura commented 5 months ago

实际应用场景:基于RDD实现的Spark被认为是Mapreduce的一种演进。相较于Mapreduce将中间文件写入磁盘中,Spark将数据放在内存中,并将Mapreduce的两部分总结为多步骤数据流图(DAG)。极大的提高了分布式并行计算的速度。

  1. 方法:使用内存存放数据

    解决:读取中间数据过慢的问题

  2. 方法:lineage

    解决:当某个节点故障后,可以根据lineage推断某个RDD的来源,重做该数据(宽依赖情况后面讨论)即可恢复该节点数据

KTurnura commented 5 months ago

RDD 内容

RDD包含五种信息:

  1. RDD 自身数据的分区信息
  2. RDD的父依赖
  3. metadata
  4. 父RDD转换到子RDD所依赖的函数
  5. 分区方案

Spark-based PageRank algorithm

PageRank 是谷歌最开始用来估计网页重要性的一个算法

输入:

使用Scala语言实现的PageRank算法

val lines = spark.read.textFile("input.txt").rdd
val links1 = lines.map{s => val parts = s.split("\\s+")
    (parts(0), parts(1))
}
val links2 = links1.distinct()  //wide transformation 
val links3 = links2.groupByKey() //wide transformation
val links4 = links3.cache()
val ranks = links4.mapValue(v => 1.0)

val jj = link4.join(ranks)
val contribs = jj.values.flatMap{case (urls,rank) => urls.map(url => (url, rank/ url.size))}
// Array((u1,0,5),(u2,1.0))

// wide transformation shuffle
ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)

val output = ranks.colletct()
output.foreach(tup => println(s"${tup._1} has rank : ${tup_.2}." ))

lineage图

image-20240115165226407

links在Spark处理过程中多次使用,因此需要cache持久化

在调用Action的过程中,发生了很多事,Graph会进行处理并生成Java字节码,用来描述所有不同的Transformation,当调用collect,driver会让一组worker来处理该输入数据的不同分区,他会对lineage graph进行编译,讲lineage graph 中的transformation 编译为Java字节码,会将这些字节码发送给Spark所选择的所有worker机器上,这些wokrer就会执行该字节码

字节码会告诉worker去读取他们各自所负责的那部分分区并将数据进行收集,它会告诉每个worker去读取它所负责的那个分区输入数据

算子操作

算子 操作意义 输入列 输出列
dinstinct 去重 [<u1,u2>,<u1,u2>] [<u1,u2>]
groupByKey 按照Key对RDD进行分区 [<u1,u2>,<u1,u3>] [(u1,CompactBuffer(u2,u3))]
cache 论文中的persist
mapValues(v => 1.0) 对每个Key添加一个值 [(u1,CompactBuffer(u2,u3))] Array((u1,1.0))
join 可以理解为两张表的拼接(大概率) Array((u1,(CompactBuffer(u2,u3),1.0)))

拼接的过程大概率会应用和已分区的数据的分区方法一样,以此来减轻数据移动的问题

Spark 持久化

Spark可以自定义数据分区的分区方法

宽依赖和窄依赖

宽依赖的由来:distinct、join、groupByKey等,需要知道某个键在所有分区的相关值

而窄依赖的算子则是对自身节点的数据进行操作

宽依赖的过程中,在对本分区处理结束后,输出结果会被==拆分到==不同的bucket并交由不同的worker来进行下一步transformation操作

Spark自身对lineage的优化:在开始对数据进行任何处理前,Spark会去创建一个完整的lineage graph, 所以,Spark 能够对lineage graph进行检查,并寻找机会对他进行优化

比如在pageRank的代码中

val links2 = links1.distinct()  //wide transformation
val links3 = links2.groupByKey() //wide transformation

这两个可能做了相同的事情,当使用groupByKey的时候,Spark可能并没有使用网络通信,无需重复对数据进行shuffle操作

Spark容错

Spark的分区信息等都存放在Driver端,当Driver崩溃时,需要完全重启任务

如果只是一个分区容错,可以通过lineage graph,重启一个新的worker重算该分区的数据

lineage 可以很方便的推断出某个节点的数据来自于哪些分区的父RDD,但仅仅使用lineage,那些宽依赖算子得出的RDD,也需要重算其他分区的RDD

当图中的 1节点出错,他可以很轻松的重算自身的lineage的过程,

但当图中的2节点出错,恢复他需要重算所有分区的lineage过程

因此Spark依然引入了checkpoint的概念,他可以在关键的宽依赖前设立检查点,保存不同分区的数据值,当同样情况下的2节点出错时,可以很轻易的通过检查点来回复数据

支持容错是分布式内存计算系统中能横向扩展的必要条件