Open Triple-Z opened 3 years ago
主要贡献:提供了一个简单而功能强大的界面,通过该界面提交的大规模计算任务能够自动分布式并行化,且仅需使用大量的商用 PC 上就能达到很高的计算性能。
编程模型:
map
:一个由用户编写的函数,将输入转换成一个中间 key/value 值的集合,之后 MapReduce 库将这个结果输送给 reduce
函数。reduce
:也是一个由用户编写的函数,接受中间值的 key I ,和各个 map 任务对应产生的 value 值的集合。reduce 会将这些值组合起来,形成一个更小的值集合。通常每个 reduce 函数只产生零或一个输出值。类型:
map (k1, v1) -> list(k2,v2)
reduce(k2, list(v2)) -> list(v2)
k1
一般为文档名称(document name)。k2
, v2
为中间键值对结果(intermediate key/value pairs)。文中提供的 MapReduce 能够实现的样例: | map | reduce | |
---|---|---|---|
分布式 Grep 程序 | 遍历行内容是否匹配模式 | 将 map 输出都拷贝在一起(恒等函数) | |
计算 URL 访问频率 | 处理 Web 访问日志,并输出 <URL, 1> | 将相同中间 key (URL)的值相加,输出 <URL, total count> | |
反向网页链接图 | 输入网页链接 source -> target ,输出 <target, source> | 将相同 target 的 source 放在一起组成一个数组,即输出 <target, list(source)> | |
每个 Host 的词条向量 | 输入 <word, frequency> 列表的文件,输出 <hostname, term-vector> (hostname 从文档 URL 中获取) | 将相同 hostname 的词条向量相加,输出 <hostname, (total) term-vector> 结果 | |
倒排索引 | 对每个文档进行解析(分词等),输出 <word, document ID> | 将中间结果按照 key 对 value 进行聚合,输出 <word, list(document ID)> | |
分布式 Sort 程序 | 从记录中提取 key,输出 <key, record> 中间结果对 | 原封不动地输出结果 |
Google 实现的 MapReduce 库的环境背景:
Map 调用:将用户输入(并行)拆分为 M 个数据块。
Reduce 调用:将中间结果用分割函数(如:hash(key) mod R
)来分割为 R 个数据块。
MapReduce 计算执行的过程见下图。
当 MapReduce 任务成功之后,任务的输出将为 R 个文件(每个 reduce 任务输出一个文件)。通常地,用户不需要将 R 个输出文件整合为一个,一般我们直接将它们一起作为下一个 MapReduce 任务的输入(就像 UNIX pipeline 一样,管道模式),或者将其用在能够处理分块输入的程序中。
Master 的数据结构:存储每个 map 和 reduce 任务的信息。
对于每个 map 任务,master 都需要存储 R 个中间文件的路径和大小。而这个信息会被增量推送至拥有状态为运行中 reduce 任务的 worker 上。
应对 Worker 失效:master 周期性地 ping 所有的 worker 节点来验活。若在一定时间内,master 收不到来自某个 worker 的响应,则认为该 worker 已经失效。所有被分配到该 worker 上的已完成和运行中的 map 任务都需要重新回到“空闲”状态(idle),再由调度器去安排其他 worker 重做 这些 map 任务;而对于 reduce 任务,只需要将处于“运行中”状态(in-progress)的 reduce 任务重新退回“空闲”状态即可(调度器同样会重新安排这些 reduce 任务重做)。
对于使用了重做的 map 任务的 reduce 任务来说,当 map 任务重做后,所有正在进行 reduce 任务的 worker 都会收到这个 map 任务被重做的信息。因此若某个 reduce 任务需要使用该 map 任务的结果,它将会重新从新 worker 上读取结果数据。
关于 worker 失效后 map 和 reduce 任务重做状态不同的原因如下:
应对 master 失效:在以上提到的 master 数据结构,将 master 内数据状态定期进行快照存储。若 master 发生失效,则新建一个新的 master 实例,并从最近的快照检查点进行数据恢复。此时 worker 向 master 发送的请求将失败,worker 将检查当前情况,并根据自己的策略重试 MapReduce 操作。
失效发生的语义(更合适的应该是:失效定义,见 https://en.wikipedia.org/wiki/Failure_semantics):
用户自定义的 map 和 reduce 函数若为确定函数,则 MapReduce 运行的结果应该要与串行执行结果相同。MapReduce 依赖于对 map 和 reduce 任务的原子提交(atomic commit)来实现这点。原子性提交实现:
大部分情况下,map 和 reduce 操作都是具有确定性的。这种情况下我们可以将语义等同于串行操作,这样非常易于程序员理解 MapReduce 程序的执行逻辑。
而当 map 和 reduce 操作不具有确定性时,我们仍然提供程度较弱但合理的语义。
这段没看懂
网络带宽是一个非常稀缺的资源。因此 MapReduce 尽所能来使得数据不需要被传输,MapReduce 的调度器会从全局文件系统(如 GFS)获取输入文件的元信息,尽可能将 map 任务安排在已经有文件部分副本的机器上直接运行(这样能够直接读取本地磁盘来获取数据,而不需要通过网络传输大量数据)。若无法直接安排在有数据块的机器上,则会寻找与其数据目标块机器在同一个交换机下的(同一个网络节点)机器来运行 map 任务,尽可能降低系统整体的网络负载。
map 阶段分了 M 个任务片,reduce 阶段分了 R 个任务片。 理想情况下,M 和 R 都应该远远大于 worker 机器的数量。
实践中的限制:master 必须要做出 O(M+R) 个调度任务决定,且要在内存中存储 O(M*R) 个状态信息(每个完成的 map 任务都有 R 个文件信息)。
R 一般由用户来指定,在具体实践中,我们一般设置 M 的依据是其能将输入文件的分片大小刚好在 16-64 MB 范围内(这样文件本地化的效果最优);R 一般选用一个对于 worker 机器数量的较小的乘数,见下面的例子。
M = 200000
R = 5000
#workers = 2000
M/#workers = 100
R/#workers = 2.5
在现实中,经常发现“掉队者”(straggler)会导致整个 MapReduce 任务的运行用时增加。掉队者一般是需要长时间才能运行完一个或少量的 map 或 reduce 计算任务的节点。硬件上和系统上的问题经常会导致集群中出现“掉队者”,比如一个机器上的磁盘出现问题,总是需要不停地做数据错误纠正,就可能导致其读性能从 30MB/s 下降为 1MB/s。
因此我们在集群控制器上配置了“备份任务“这一策略。当控制器发现某些 map 或 reduce 任务在一个设置的时间内还没完成且整个 MapReduce 任务马上就要接近尾声,就会把这个没完成的任务再分发一份给另外的一个 worker 运行,这称为“备份任务“。当主任务和备份任务任意一个完成后,控制器就认为这个计算任务已经完成。
这个机制很有效地提高了计算集群的资源利用率,能在极大减少大规模 MapReduce 计算任务的用时。在 Google 的实践中,当“备份任务”策略被关闭后,集群需要多耗时 44% 才能完成相同计算任务。
优化:
hash(key) mod R
)。经过测试,这个简单的函数也能将结果分布的比较均匀。然而,在一些场景下,用户希望得到的结果里有一些特殊要求(比如在 URL 数目输出中,我们会希望同一个域名的 URL 都尽量输出在一个文件中)。这时用户就可以自定义分区函数了,如 hash(Hostname(urlkey)) mod R
,就能够满足如上要求。
https://pdos.csail.mit.edu/6.824/papers/mapreduce.pdf
MIT 6.824 Lecture 1.