wangbo123855842 / Learning

15 stars 2 forks source link

Hadoop #43

Open wangbo123855842 opened 3 years ago

wangbo123855842 commented 3 years ago
スクリーンショット 2020-10-13 21 14 42

大数据技术栈

スクリーンショット 2020-10-13 21 24 09 スクリーンショット 2020-10-13 21 28 31

Hadoop

Hadoop是一个开源框架,允许使用简单的编程模型在跨计算机集群的分布式环境中存储和处理大数据。 它的设计是从单个服务器扩展到数千个机器,每个都提供本地计算和存储。

Hadoop 的三大核心组件

安装 Hadoop 集群

假如在 3 台VM 环境下安装 Hadoop,构成 Hadoop 集群,node-1,node-2,node-3

通过 ntpdate 等方法

/etc/sysconfig/network
HOSTNAME=node-1

修改 /etc/hosts

service iptables stop
chkconfig iptables off

生成公钥 id_rsa.pub ,私钥 id_rsa

ssh-keygen -r rsa

然后将公钥拷贝到要免密码登录的机器上 ( 要配置从 Node-1 到 Node-2,Node-3 的免密码登录 )

ssh-copy-id node2
ssh-copy-id node3
tar -zxvf hadoop-2.7.5.tar.gz

修改 JDK 的路径

vi hadoop-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_171   #JAVA_HOME写上自己jdk 的安装路径

在 configuration 属性中,添加下面的属性

<configuration>

<!-- 指定Hadoop所使用的文件系统schema(URI),HDFS的老大(NameNode)的地址 -->
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://node-1:9000</value>
</property>
<!-- 定义 Hadoop 运行时产生文件的存储目录。默认 -->
<property>
    <name>hadoop.tmp.dir</name>
    <value>/export/data/hddata</value>
</property>

</configuration>

在 configuration 属性中,添加下面的属性

<configuration>

<!-- 指定HDFS副本的数量,不修改默认为3个 -->
<property>
    <name>dfs.replication</name>
    <value>2</value>
</property>
<!-- dfs的SecondaryNameNode在哪台主机上 -->
<property>
    <name>dfs.namenode.secondary.http-address</name>
    <value>node-2:50090</value>
</property>

</configuration>

把模板复制一份

cp mapred-site.xml.template mapred-site.xml
vi mapred-site.xml

在 configuration 属性中,添加下面的属性

<configuration>

<!-- 指定MapReduce运行是框架,这里指定在yarn上,默认是local -->
<property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
</property>

</configuration>
<!-- 指定yarn的老大ResourceManager的地址 -->
<property>
    <name>yarn.resourcemanager.hostname</name>
    <value>node-1</value>
</property>

<!-- NodeManager上运行的附属服务。需要配置成mapreduce_shuffle,才可以运行MapReduce程序默认值 -->
<property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
</property>
vi slaves
node-1
node-2
node-3
vi /etc/profile
export HADOOP_HOME=/export/server/hadoop-2.7.6
export PATH=$HADOOP_HOME/bin:$PATH
scp -r /export/server/hadoop-2.7.6/ root@node-2:/export/server/
scp -r /export/server/hadoop-2.7.6/ root@node-3:/export/server/
scp -r /etc/profile root@node-2:/etc/
scp -r /etc/profile root@node-3:/etc/

可以看出来 Hadoop 的配置文件,跟什么相关,就是 xxx-site.xml,另外官方文档的 xxx-default.xml 介绍了默认的设定。

Hadoop 启动

要启动 Hadoop 集群,需要启动 HDFS 和 YARN 两个集群首次启动 HDFS 集群,必须要对其进行格式化。格式化会做一些初始化的工作,格式化只能做一次格式化操作在 HDFS 集群的 NameNode 中执行

hdfs namenode -format

在主节点的机器上执行下面的命令来启动 HDFS 集群和 YARN 集群。

Hadoop Home/sbin/start-dfs.sh
Hadoop Home/sbin/start-yarn.sh

或者使用

Hadoop Home/sbin/start-all.sh
Hadoop Home/sbin/start-stop.sh

另外,也可以单节点逐个启动。 在主节点启动 HDFS 的 NameNode

hadoop-daemon.sh start namenode

在每个从节点启动 HDFS 的 DataNode(包括主节点)

hadoop-daemon.sh start datanode

在主节点启动 YARN 的 resourcemanager

yarn-daemon.sh start resourcemanager

在每个从节点启动 YARN 的 nodemanager

yarn-daemon.sh start nodemanager
スクリーンショット 2020-10-19 21 22 24

集群部署之后,可以通过 web-ui 来查看集群。

NameNode 的节点IP:port 来访问,port 默认是 50070

ResourceManager 的节点IP:port 来访问,默认是 8080

HDFS

Hadoop 分布式文件系统。 分布式系统解决的问题就是,大数据的存储。它们是横跨多台计算机的存储系统。

HDFS 组成

スクリーンショット 2020-10-19 21 58 14

HDFS 设计目标

硬件组件错误是常态,而非异常情况。 HDFS 可能由成百上千的服务器组成,任何一个组件都有可能一直失效,因此错误检测和快速、自动恢复是HDFS的核心架构目标,同时能够通过自身持续的状态监控快速检测冗余并恢复失效的组件。

运行在 HDFS 上的应用和普通的应用不同,需要流式访问它们的数据集。 流式数据,特点就是,像流水一样,不是一次过来而是一点一点“流”过来,而你处理流式数据也是一点一点处理。如果是全部收到数据以后再处理,那么延迟会很大,而且在很多场合会消耗大量内存。

HDFS 的设计中更多考虑到了数据批处理,而不是用户交互处理。相比数据访问的低延迟,HDFS 应用要求能够高速率、大批量地处理数据,极少有程序对单一的读写操作有严格的响应时间要求,更关键的问题在于数据访问的高吞吐量

运行在 HDFS 上的应用具有很大的数据集。HDFS 上的一个典型文件,大小一般都在GB至TB。 因此,需要调节 HDFS 以支持大文件存储

一个文件经过创建、写入和关闭之后就不需要改变了。 数据集通常由数据源生成或从数据源复制而来,接着长时间在此数据集上进行各种分析。

HDFS 特性

HDFS 首先是一个文件系统,用于存储文件,通过统一的命名空间目录树来定位文件。另外它是分布式的。

HDFS 采用了主从 Master/Slave 结构模型。 一个 HDFS 集群包括一个名称节点(NameNode)和若干个数据节点(DataNode)。

HDFS 的文件在物理上是分块存储(block)的。块的大小可以通过配置调整,在 Hadoop 2.x 中默认是大小是 128M。 每个块都会被复制到多台机器,默认复制 3 份。

一个大文件会被拆分成一个个的块,然后存储于不同的机器。如果一个文件少于Block大小,那么实际占用的空间为其文件的大小。 块是基本的读写单位类似于磁盘的页,每次都是读写一个块

HDFS 支持传统的层次型文件组织结构。 用户或者应用程序可以创建目录,然后将文件保存在这些目录里。 文件系统名字空间的层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件。

NameNode 负责维护文件系统的名字空间,任何对文件系统名字空间或属性的修改都将被 Namenode 记录下来。

HDFS 会给客户端提供一个统一的目录树客户端通过路径来访问文件。 如 hdfs://namenode:port/dir-a/dir-b/dir-c/file.data

我们把目录结构和文件分块位置信息叫做元数据。 NameNode 负责维护整个 HDFS 文件系统的目录树结构,以及每一个文件所对应的 block 块信息。

文件的各个 block 的具体存储管理由 DataNode 节点承担。 每一个 block 都可以在多个 DataNode 上。 DataNode 需要定时向 NameNode 汇报自己持有的 block 信息。

为了容错,文件的所有 block 都会有副本,默认是 3 个。

HDFS 设计成使用 一次写入,多次读出的模型,并且 不支持文件的修改

HDFS Shell 客户端

Hadoop 提供了文件系统的 shell 命令行客户端。 比如,查看 HDFS 根目录的文件列表

hadoop fs -ls hdfs://node-1:9000/

简写

hadoop fs -ls

上面是 Hadoop V2 的写法,V1 的时候,使用如下命令

hdfs dfs <args>

但是,V1的局限性比较大,只支持 HDFSV2 的命令还可以支持其他的文件系统,比如本地FS等。 比如,查看 Linux 文件系统 root 下的文件列表

hadoop fs -ls file:///root

Hadoop 的 shell 跟 Linux 的命令很相似 创建文件夹

hadoop fs -mkdir /hdfs 路径

移动文件

hadoop fs -mv /hdfs 路径 /hdfs 路径

上传本地文件(Linux 文件系统)到 HDFS 中

hadoop fs -put /本地路径 /hdfs 路径

下载文件到 Linux 本地文件系统

hadoop fs -get /hdfs 路径 /本地路径

追加内容到已存在的文件,HDFS 不可以修改文件,但是可以在文件的末尾追加内容。

hadoop fs -appendToFile /本地文件 /hdfs 文件

修改一个文件的副本数。 -R 用于递归改变目录下所有的文件的副本数。

hadoop fs -setrep -w 3 -R /hdfs 路径

并行复制 distcp

hadoop distcp dir1 dir2

distcp 是作为一个 MapReduce 作业来实现的,该复制作业通过集群中并行运行的 map 来完成。这里没有 Reducer。 每一个文件通过一个 map 进行复制,默认情况,有大约 20个 map 被使用,map的数量可以通过 distcp -m 指定。 distcp 的一个常见的实例是在两个 HDFS 集群间传送数据。

hadoop distcp -update -delete -p hdfs://namenode1/foo hdfs://namenode2/foo

如果两个集群运行的 HDFS 版本不兼容,可以使用 webhdfs 协议

hadoop distcp -update -delete -p webhdfs://namenode1:50070/foo webhdfs://namenode2:50070/foo

NameNode 概述

  1. NameNdoe 是 HDFS 的核心。也被称为 Master。
  2. NameNode 仅存储 HDFS 的元数据存储文件系统所有文件的目录树,以及文件的块列表
  3. NameNode 不存储实际的数据或数据集,数据本身的存储在 DataNodes 中。
  4. NameNode 并不持久化存储每个文件中各个块所在的 DataNode 的位置,这些信息会在系统启动的时从 DataNode 重建。
  5. NameNode 把数据保存在内存中,所以 NameNode 所在的机器通常会配置有大量的内存 RAM
  6. NameNode 失效则整个HDFS都失效了,所以要保证 NameNode 的高可用性

NameNode 把元数据保存在内存中,所以当 NameNode 重启之后,元数据会丢失。 所以 NameNode 本身有文件系统的镜像用来保存系统的目录树等元数据,而文件的块保存在哪个 DataNode 的元数据,并不会持久化,会从 DataNode 节点中重建

スクリーンショット 2020-10-20 7 49 18

下面的这张图片展示了 NameNode 是怎么把元数据保存到磁盘上的。

スクリーンショット 2020-10-21 15 37 32

只有在 NameNode 重启时,edit logs 才会合并到 fsimage 文件中,从而得到一个文件系统的最新快照。 但是在产品集群中 NameNode 是很少重启的,这也意味着当 NameNode 运行了很长时间后,edit logs 文件会变得很大。

在这种情况下就会出现下面一些问题:

  1. edit logs 文件会变的很大
  2. NameNode 的重启会花费很长时间,因为在edit logs中有很多改动要合并到fsimage文件上
  3. fsimage 文件非常旧

因此为了克服这个问题,我们需要一个易于管理的机制来帮助我们减小 edit logs 文件的大小和得到一个最新的 fsimage 文件,这样也会减小在NameNode上的压力。

SecondaryNameNode 就是来帮助解决这个问题的。

Secondary NameNode 概述

スクリーンショット 2020-10-21 15 53 51

Secondary NameNode是怎么工作的。

  1. 它定时到 NameNode 去获取 edit logs,并更新到 fsimage 上。( Secondary NameNode自己的fsimage )
  2. 一旦它有了新的 fsimage 文件,它将其拷贝回 NameNode 中。
  3. NameNode 在下次重启时会使用这个新的 fsimage 文件,从而减少重启的时间。

所以说,Secondary NameNode 并不简答的是 NameNode 的备份。 它不是要取代掉 NameNode 也不是 NameNode 的备份。我们可以称呼它为检查点节点

配置 Secondary NameNode

<property>
    <name>dfs.http.address</name>
    <value>master:50070</value>
    <description>
        The address and the base port where the dfs namenode 
    </description>
</property>
<property>
    <name>dfs.namenode.secondary.http-address</name>
    <value>slave1:50090</value>
</property>

fs.checkpoint.period 表示多长时间记录一次hdfs的镜像。默认是1小时。 fs.checkpoint.size 表示一次记录多大的size,默认64M

<property>
  <name>fs.checkpoint.period</name>
  <value>3600</value>
  <description>The number of seconds between two periodic checkpoints.
  </description>
</property>

<property>
  <name>fs.checkpoint.size</name>  // 以日志大小间隔  做备份间隔
  <value>67108864</value>
</property>

<property>
  <name>fs.checkpoint.dir</name>
  <value>/app/user/hdfs/namesecondary</value>
  <description>Determines where on the local filesystem the DFS secondary namenode should store the temporary images to merge.If this is a comma-delimited list of directories then the image is replicated in all of the directories for redundancy. 
 </description>
</property>

DataNode 概述

HDFS 写操作

首先,HDFS 的内部工作机制对客户端保持透明,客户端请求访问 HDFS 都是通过向 NameNode 申请来进行

HDFS 写文件的流程

スクリーンショット 2020-10-21 18 04 47
  1. client 发起文件上传请求,通过 RPC 与 NameNode 建立连接,NameNode 检查目标文件是否已经存在,父目录是否存在,并检查用户是否有相应的权限,若检查通过,会为该文件创建一个新的记录,否则的话文件创建失败,客户端得到异常信息

  2. client 请求 NameNode,第一个 block(128M) 应该传输到哪些 DataNode 服务器上。

  3. NameNode 根据配置文件中指定的备份 (replica) 数量及机架感知原理进行文件分配,返回可用的DataNode的地址。 以三台 DataNode 为例,A B C。Hadoop 在设计时考虑到数据的安全与高效,数据文件默认在 HDFS 上存放三份。 存储策略是,第一个备份放在客户端相同的 DataNode 上( 若客户端在集群外运行,就随机选取一个 DataNode 来存放第一个replica),第二个 replica 放在与第一个 replica 不同机架的一个随机 DataNode 上,第三个 replica 放在与第二个 replica 相同机架的随机 DateNode 上,如果 replica 数大于3,则随后的 replica 在集群中随机存放,Hadoop会尽量避免过多的 replica 存放在同一个机架上。

  4. client 请求3台的 DataNode 的一台上传数据。(本质是一个RPC调用,建立pipeline),A收到请求会继续调用B,然后B调用C,将整个 pipeline 建立完成后,逐级返回 client

  5. client 开始往 A 上传第一个 block ( 先从磁盘读取数据放到一个本地内存缓存 ), 以 packet 为单位 ( 默认 64K )。 A 收到一个 packet 就会传给 B,B 传递给 C。 数据被分割成一个个 packet 数据包在 pipeline 上传输,在 pipeline 反方向上,逐个发送 ack(命令正确应答),最终由pipeline中第一个 DataNode 节点 A 将 pipeline ack 发送给 client

  6. 完成向文件写入数据,Client 在文件输出流(FSDataOutputStream)对象上调用 close 方法,关闭流。

  7. 调用 DistributedFileSystem 对象的 complete 方法,通知 NameNode 文件写入成功。

スクリーンショット 2020-10-21 18 19 37

HDFS 读操作

客户端将要读取的文件路径发送给 NameNode,NameNode 获取文件的元信息(主要是 block 的存放位置信息)返回给客户端。 客户端根据返回的信息找到相应 DataNode 逐个获取文件的 Block 并在客户端本地进行数据追加合并从而获得整个文件。

スクリーンショット 2020-10-21 19 44 03
  1. Client 向 NameNode 发起 RPC 请求,来确定请求文件 block 所在的位置

  2. NameNode 会视情况返回文件的部分或者全部block列表,对于每个block,NameNode 都会返回含有该 block 副本的 DataNode地址

  3. 这些返回的 DN 地址,会按照集群拓扑结构得出 DataNode 与客户端的距离,然后进行排序,排序两个规则: 网络拓扑结构中距离Client的排在前,心跳机制中超时汇报的DN状态为STALE,这样的排在后。

  4. Clietn 选取排序靠前的 DataNode 来读取 block如果客户端本身就是 DataNode,那么将从本地直接获取数据

  5. 底层本质是建立 Socket Stream ( FSDataInputStream ) ,重复调用父类 DataInputStream 的 read 方法,直到这个块上的数据读取完毕

  6. 当读完列表的 block 后,若文件读取还没有结束,客户端会继续向 NameNode 获取下一批的 block 列表。

  7. 读取完一个 Block 都会进行 checksum 验证,如果读取 DataNode 时出现错误,客户端会通知 NameNode,然后再从下一个拥有该 block 副本的 DataNode 继续读取

  8. read 方法是并行的读取 block 信息,不是一块一块的读取,NameNode 只是返回 Client 请求包含块的 DataNode 地址,并不是返回请求块的数据。

  9. 终读取所有的 block 会合并成一个完整的最终文件

スクリーンショット 2020-10-23 21 33 11

客户端通过 FileSystem 对象的 Open 方法来打开希望读取的文件,对于 HDFS 来说,这个对象就是 DistributedFileSystem 的一个实例。DistributedFileSystem 通过 RPC 调用 NameNode 返回 Block 信息。 DistributedFileSystem 返回一个 FSDataInputStream 对象给客户端以便读取数据。客户端通过调用 read 方法,将数据从 DataNode 传输到客户端,传输完毕,关闭和 DataNode 的连接。

如果与 DataNode 通信遇到错误,会尝试从另外一个最邻近的 DataNode 读取数据。也会记住这个 DataNode,避免反复读取这个节点上的后续块。也会通过校验确认从 DataNode 发来的数据是否完整。如果有损坏,也会试图从其他的 DataNode 读取,并通知 NameNode。

HDFS 归档

Hadoop archives 是特殊的归档格式。 一个 Hadoop archive 对应一个文件系统目录。 Hadoop archive 的扩展名是 *.har。 Hadoop archive包含元数据(形式是_index和_masterindx)和数据(part-*)文件。_index文件包含了归档文件的文件名和位置信息。

スクリーンショット 2020-10-24 22 53 45

HDFS 并不擅长存储小文件,因为每个文件最少占用一个 Block,每个 Block 的元数据都会在 NameNode 节点占用内存,如果存在这样大量的小文件,它们会吃掉 NameNode 节点的大量内存。 Hadoop Archives 可以有效的处理以上问题,他可以把多个文件归档成为一个文件,归档成一个文件后还可以透明的访问每一个文件,并且可以做为 MapReduce 任务的输入。

创建

hadoop archive -archiveName zoo.har -p /foo/bar /outputdir

-archiveName 选项指定你要创建的 archive 的名字。比如 foo.har。 archive 的名字的扩展名应该是*.har。输入是文件系统的路径名。创建的 archive 会保存到目标目录下。注意创建 archives 是一个Map/Reduce job。你应该在 map reduce 集群上运行这个命令

归档多个文件夹

hadoop archive -archiveName NAME -p <parent path> <src>* <dest>

/foo/bar 文件夹下面的 a/b/ce/f/g 两个目录的内容压缩归档到 /user/outputdir/ 文件夹下,并且源文件不会被更改或者删除

hadoop archive -archiveName test_save_foo.har -p  /foo/bar   a/b/c e/f/g  /user/outputdir/

删除与恢复 HDFS 文件被归档后,系统不会自动删除源文件,需要手动删除

hadoop fs -rmr /user/hadoop/xxx/201310/

归档文件存在,源文件不在了,如果要恢复怎么办,其实这也很简单,直接从har 文件中 cp出来就可以了

hadoop fs -cp /user/xxx/201310/201310.har/*  /user/hadoop/xxx/201310/

获得创建的 archive 中的文件列表

hadoop dfs -lsr har:///user/hadoop/foo.har

查看 archive 中的 fileA 文件

hadoop dfs -cat har:///user/hadoop/foo.har/dir/fileA

MapReduce 程序中使用 Hadoop Archives 归档文件 在 MapReduce 中,与输入数据使用默认文件系统一样,也可以使用 Hadoop Archives 归档文件作为输入文件系统。 如果你有存储在HDFS目录下 /user/zoo/foo.har 的 Hadoop Archives 归档文件 ,然后你在 MapReduce 程序中就可以使用如下路径 har:///user/zoo/foo.har 作为输入文件。

HDFS JAVA API

HDFS 在生产应用中主要是客户端的开发,其核心的步骤是从 HDFS 提供的 API 中构建一个 HDFS 的客户端对象。 然后通过客户端来增删改查 HDFS 上的文件。

Maven依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.3</version>
</dependency>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.7.3</version>
</dependency>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.3</version>
</dependency>

核心就是 FileSystem 这个类,然后通过这个类的各种 API 来操作 HDFS。

Configuration conf = new Configuration();
System.setProperty("HADOOP_USER_NAME", "hadoop");
conf.addResource("config/core-site.xml");
conf.addResource("config/hdfs-site.xml");
conf.addResource("config/mapred-site.xml");
conf.addResource("config/yarn-site.xml");

conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
// 参数优先级: 1、客户端代码中设置的值 2、classpath下的用户自定义配置文件 3、然后是服务器的默认配置
conf.set("dfs.replication", "2");
conf.set("dfs.block.size", "64m");

FileSystem fs = FileSystem.get(conf);

Java 操作 HDFS 常用API

可以通过 FileSystem 的各种 API 来操作,也可以使用更加原始,效率的流方式来操作

上传

Configuration conf = new Configuration();
System.setProperty("HADOOP_USER_NAME", "hadoop");
conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
FileSystem fs = FileSystem.get(conf);

// 创建本地系统的输入流
InputStream in = new FileInputStream(new File("c:/base.sh"));

// 创建 HDFS 的输出流,把文件上传到 HDFS 并重命名
FSDataOutputStream out = fs.create(new Path("/aa/new_base"));
// 使用 Apache 的 IO 工具类
IOUtils.copyBytes(in, out, 4096, true);

fs.close();

下载

Configuration conf = new Configuration();
System.setProperty("HADOOP_USER_NAME", "hadoop");
conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
FileSystem fs = FileSystem.get(conf);

FSDataInputStream in = fs.open(new Path("/aa/new_base"));
OutputStream out =new FileOutputStream(new File("c:/ab.sh"));
IOUtils.copyBytes(in, out, 4096, true);
fs.close();

一致模型

HDFS 写入的内容不能保证立即可见,当写入的数据超过一个块之后,第一个数据块对新的 reader 就是可见的FSDataOutputStream 调用 hflush 方法,可以保证文件中到目前为止写入的数据对所有的 reader 可见。

SequenceFile 和 MapFile

Hadoop 的 HDFS 和 MapReduce 子框架主要是针对大数据文件来设计的,在小文件的处理上效率低下。 小文件多的话,对 NameNode 造成很大的压力,因为每一个文件都会有一条元数据信息存储在 NameNode 上, 解决办法通常是选择一个容器将这些小文件组织起来统一存储。HDFS 提供了两种类型的容器,分别是 SequenceFileMapFile。通过 SequenceFile 将小文件合并起来,可以更高效率的存储和计算。

SequenceFile 是 Hadoop 提供的一种对二进制文件的支持,二进制文件直接将<Key, Value>对序列化到文件中。 SequenceFile 中的 key 和 value 可以是任意类型的 Writable 或者自定义的 Writable 类型。

在存储结构上,SequenceFile 主要由一个 Header 后跟多条 Record 组成。 Header 主要包含了,key 和 value 的类名,存储压缩算法,用户自定义元数据等信息。 每条 Record 以键值对的方式进行存储SequenceFile 的每条记录是可序列化的字符数组

SequenceFile 可通过如下 API 来完成新记录的添加操作。

fileWriter.append(key,value)

可以看到,每条记录以键值对的方式进行组织,但前提是 Key 和 Value 需具备序列化和反序列化的功能

スクリーンショット 2020-10-25 20 50 36

SequenceFile 写操作

public class SequenceFileWriter {
    private static Configuration configuration = new Configuration();
    private static String HDFS_PATH = "hdfs://master002:9000";
    private static String[] data = {"a,b,c,d,e,f,g","e,f,g,h,j,k","l,m,n,o,p,q,r,s","t,y,v,w,x,y,z"};

    public static void main(String[] args) throws  Exception {
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        FileSystem fileSystem = FileSystem.get(URI.create(HDFS_PATH), configuration);
        Path outputPath = new Path("MySequenceFile.seq");
        IntWritable key = new IntWritable();
        Text value = new Text();

        SequenceFile.Writer writer = SequenceFile.createWriter(fileSystem, configuration,
                outputPath, IntWritable.class, Text.class);

        for(int i=0;i<10;i++){
            key.set(10-i);
            value.set(data[i%data.length]);
            writer.append(key, value);
        }
        IOUtils.closeStream(writer);

    }
}

SequenceFile 读操作

public class SequenceFileReader {
    private static Configuration configuration = new Configuration();
    private static String HDFS_PATH = "hdfs://master002:9000";

    public static void main(String[] args) throws Exception{
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        FileSystem fileSystem = FileSystem.get(URI.create(HDFS_PATH), configuration);
        Path inputPath = new Path("MySequenceFile.seq");

        SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, inputPath, configuration);
        Writable keyClass = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), configuration);
        Writable valueClass = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), configuration);
        while(reader.next(keyClass, valueClass)){
            System.out.println("key:"+keyClass);
            System.out.println("value:"+valueClass);
            System.out.println("position:"+reader.getPosition());
        }
        IOUtils.closeStream(reader);
    }
}

SeqeunceFile 支持两种格式的数据压缩,分别是 record compressionblock compression

SequenceFile.Writer writer = SequenceFile.createWriter(fileSystem, configuration, outputPath,
                IntWritable.class, Text.class, SequenceFile.CompressionType.RECORD, new BZip2Codec());

MapFile 是排序后的 SequenceFile,通过观察其目录结构可以看到 MapFile 由两部分组成,分别是 data 和 index。

public class MapFileWriter {
    private static Configuration configuration = new Configuration();
    private static String HDFS_PATH = "hdfs://master002:9000";

    public static void main(String[] args) throws Exception{
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        FileSystem fileSystem = FileSystem.get(URI.create(HDFS_PATH), configuration);
        Path outputPath = new Path("MyMapFile.map");
        Text key = new Text();
        key.set("mymapkey");
        Text value = new Text();
        value.set("mymapvalue");

        MapFile.Writer writer = new MapFile.Writer(configuration, fileSystem, outputPath.toString(), Text.class, Text.class);

        writer.append(key,value);
        IOUtils.closeStream(writer);
    }
}

Yarn

Apache Hadoop YARN 是 Hadoop 分布式处理框架中的资源管理和作业调度技术YARN 被引入 Hadoop 2

YARN 提供请求和使用集群资源的 API,但是这些 API 很少直接用于用户代码。 用户代码中用的是分布式计算框架提供的更高层的 API。这些 API 建立在 YARN 上并且向用户隐藏了资源管理的细节。

スクリーンショット 2020-10-23 22 01 41

一些分布式计算框架 MapReduce,Spark 等作为 YARN 应用,运行在 YARN 和 集群的存储层 HDFS,HBase 之上。 还有一层应用,如 Pig,Hive 都是运行在 MapReduce,Spark ,Tez 之上的处理框架,它们不和 Yarn 直接打交道。

作为 Apache Hadoop 的核心组件之一,YARN 负责将系统资源分配给在 Hadoop 集群中运行的各种应用程序,并调度要在不同集群节点上执行的任务

YARN 的基本思想是将资源管理和作业调度/监视的功能分解为单独的 daemon(守护进程)。 其拥有一个全局 ResourceManager(RM) 和 每个应用程序的 ApplicationMaster(AM)。 当用户提交应用程序时,将启动名为 ApplicationMaster 的轻量级进程实例,以协调应用程序中所有任务的执行。

ResourceManager和 NodeManager 构成了数据计算框架。 ResourceManager 是在系统中的所有应用程序之间仲裁资源的最终权限。NodeManager 是每台机器框架代理监视其资源使用情况(CPU,内存,磁盘,网络)并将其报告给 ResourceManager。

YARN 架构

YARN 总体上是 master/slave 结构,在整个资源管理框架中,ResourceManager 为 master,NodeManager 是 slave

YARN的基本组成结构,YARN 主要由 ResourceManager、NodeManager、ApplicationMaster 和 Container 等几个组件构成。

Master上一个独立运行的进程,负责集群统一的资源管理、调度、分配等

Slave上一个独立运行的进程,负责上报节点的状态

相当于这个 Application 的监护人和管理者,负责监控、管理这个 Application 的所有 Attempt 在 cluster 中各个节点上的具体运行,同时负责向 Yarn ResourceManager 申请资源、返还资源等。

YARN 中分配资源的一个单位,包涵内存、CPU等等资源,YARN以 Container 为单位分配资源。 当 AM 向 RM 申请资源时,RM 为 AM 返回的资源便是用 Container 表示的YARN 会为每个任务分配一个 Container,且该任务只能使用该 Container 中描述的资源

Container 和集群节点的关系 一个节点会运行多个 Container,但一个 Container 不会跨节点。任何一个 job 或 application 必须运行在一个或多个 Container 中,在 Yarn 框架中,ResourceManager 只负责告诉 ApplicationMaster 哪些 Containers 可以用,ApplicationMaster 还需要去找 NodeManager 请求分配具体的 Container。

スクリーンショット 2020-10-22 22 03 36

Client 向 ResourceManager 提交的每一个应用程序都必须有一个 ApplicationMaster,它经过 ResourceManager 分配资源后,运行于某一个 Slave 节点的 Container 中。另外,具体做事情的 Task,同样也运行某一个 Slave 节点的 Container 中

YARN 应用提交过程

Application在Yarn中的执行过程

スクリーンショット 2020-10-22 22 08 48
  1. 用户将应用程序提交到 ResourceManager 上。

  2. ResourceManager 为应用程序 ApplicationMaster 申请资源,并与某个 NodeManager 通信启动第一个 Container,以启动ApplicationMaster。

  3. ApplicationMaster 与 ResourceManager 注册进行通信,为内部要执行的任务申请资源,一旦得到资源后,将于 NodeManager 通信,以启动对应的 Task。

  4. 所有任务运行完成后,ApplicationMaster 向 ResourceManager 注销,整个应用程序运行结束。

向 YARN 以 jar 包的方式提交作业

hadoop jar jar包名 参数

例如

hadoop jar mapreduce_test.jar 

MapReduce

官方图

スクリーンショット 2020-10-23 13 26 00

MapReduce计算模型主要由三个阶段构成: Map,shuffle,Reduce

Map 是映射,负责数据的过滤分法,将原始数据转化为键值对。 Reduce 是合并,将具有相同key值的value进行处理后再输出新的键值对作为最终结果。 为了让 Reduce 可以并行处理 Map 的结果,必须对 Map 的输出进行一定的排序与分割,然后再交给对应的Reduce,而这个将Map 输出进行进一步整理并交给 Reduce 的过程就是 Shuffle

スクリーンショット 2020-10-23 15 09 29

Map 和 Reduce 操作需要我们自己定义相应 Map 类和 Reduce 类,以完成我们所需要的化简、合并操作。 而 shuffle 则是系统自动帮我们实现的。

Shuffle过程包含在 Map 和 Reduce 两端,即 Map shuffle 和 Reduce shuffle。

Map shuffle

在 Map 端的 shuffle 过程是对 Map 的结果进行分区、排序、分割,然后将属于同一划分区的输出合并在一起并写在磁盘上,最终得到一个分区有序的文件。 分区有序的含义是 map 输出的键值对按分区进行排列,具有相同 partition 值的键值对存储在一起。 每个分区里面的键值对又默认按 key 值进行升序排列

スクリーンショット 2020-10-23 15 29 08

对于 map 输出的每一个键值对,系统都会给定一个 partition,partition 值默认是通过计算 key 的 hash 值后对 Reduce task 的数量取模获得如果一个键值对的 partition 值为1,意味着这个键值对会交给第一个 Reducer 处理

简单的说,Partition 是分割 Map 每个节点的结果,按照 key 分别映射给不同的 Reducer,也是可以自定义的。这里其实可以理解归类

Map 的输出结果是由 Collector 处理的,每个 Map Task 不断地将键值对输出到在内存中构造的一个环形数据结构中。 使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。这个数据结构其实就是个字节数组,叫Kvbuffer默认情况下,Kvbuffer 使用到 80% 的情况下就会触发 Spill 。可以通过 io.sort.spill.percent 来设定。

当 Spill 触发后,先把 Kvbuffer 中的数据按照 partition 值和 key 两个关键字升序排序。 排序结果是 Kvmeta 中数据按照 partition 为单位聚集在一起,同一 partition 内的按照 key 有序。

Spill 线程为这次 Spill 过程创建一个磁盘文件。 Spill 从所有的本地目录中查找能存储这么大空间的目录,找到之后在其中创建一个类似于spill12.out的文件。 Spill 线程根据排过序的 Kvmeta 逐个 partition 的把数据存到这个文件中,一个 partition 对应的数据存完之后顺序地存下个partition,直到把所有的 partition 遍历完。

在这个过程中如果用户配置了 Combine 类,那么在写之前会先调用 combineAndSpill(),对结果进行进一步合并后再写出

所有的 partition 对应的数据都放在这个文件里,虽然是顺序存放的,但是怎么直接知道某个 partition 在这个文件中存放的起始位置呢?是依靠索引文件。

スクリーンショット 2020-10-23 15 50 15

在 Spill 线程 如火如荼的进行 SortAndSpill 工作的同时,Map Task 不会因此而停歇,而是一无既往地进行着数据输出。Map 还是把数据写到 kvbuffer 中。 Map Task 总要把输出的数据写到磁盘上,即使输出数据量很小在内存中全部能装得下,在最后也会把数据刷到磁盘上。

Combine 可以理解为是在 Map 端的 Reduce 的操作,对单个 Map Task 的输出结果数据进行合并的操作。 Combine 是作为一个优化手段,可选项,不是所有的 MR 程序都适合 Combine。 Combine 的优化是一定不能够改变最终的输出的结果。 Combine 的好处是 减少网络的传输,减轻磁盘IO负载

combiner 函数实现的逻辑,基本和 reduce 函数一样

Map Task 如果输出数据量很大,可能会进行好几次Spill,out文件和Index文件会产生很多,分布在不同的磁盘上。 最后把这些文件进行合并,就是 Merge 过程。

Merge 过程从所有的本地目录上扫描得到产生的 Spill 和 Spill 索引文件。然后 Merge 过程创建一个叫 file.out 的文件和一个叫file.out.Index 的文件用来存储最终的输出和索引,一个partition 一个partition的进行合并输出

スクリーンショット 2020-10-23 16 12 22

Reduce shuffle

在 Reduce 端,shuffle主要分为复制Map输出、排序合并两个阶段。

Reduce Task 通过 HTTP 向各个 Map Task 拖取它所需要的数据。 Map Task 完成后,会通知父 TaskTracker 状态已经更新,TaskTracker 进而通知 JobTracker。 Reducer 会定期向 JobTracker 获取Map的输出位置,一旦拿到输出位置,Reduce Task 就会从此输出对应的 TaskTracker 上复制输出到本地,而不会等到所有的 Map Task 结束。

Copy 过来的数据会先放入内存缓冲区中,如果内存缓冲区中能放得下这次数据的话就直接把数据写到内存中,即内存到内存merge。 Reducer 要向每个 Map 去拖取数据,在内存中每个 Map 对应一块数据,当内存缓存区中存储的 Map 数据占用空间达到一定程度的时候,开始启动内存中 merge,把内存中的数据 merge 输出到磁盘上一个文件中,即内存到磁盘merge。 在将 buffer 中多个 map 输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的 map 输出。 Reduce 的内存缓冲区可通过 mapred.job.shuffle.input.buffer.percent 配置,默认是JVM的heap size的70%。 内存到磁盘 merge 的启动可以通过 mapred.job.shuffle.merge.percent 配置,默认是66%。

当属于该 reducer 的 map 输出全部拷贝完成,则会在 reducer 上生成多个文件,这时开始执行合并操作,Map 的输出数据已经是有序的,Merge 进行一次合并排序,所谓 Reduce 端的 sort 过程就是这个合并的过程。 一般 Reduce 是一边 copy 一边 sort,即 copy 和 sort 两个阶段是重叠而不是完全分开的。 最终 Reduce shuffle 过程会输出一个整体有序的数据块

总结一下,整体的流程

96995816-64acf680-1561-11eb-8060-09e45c5c2d4a

组一个常见的例子,查找文件中词语出现的次数

スクリーンショット 2020-10-23 18 58 20

Map Reduce 数量

在 MapReduce 中,第一步就是对文件进行逻辑的 Split。HDFS 中的数据是按 Block 来存储的。 那么 Split 和 Block 之间是什么关系呢?

首先,要确认一个 splitSize

mapreduce.input.fileinputformat.split.minsize // 启动map最小的split size大小,默认0
mapreduce.input.fileinputformat.split.maxsize // 启动map最大的split size大小,默认256M
dfs.block.size  // block块大小,默认128M

splitSize 计算公式

splitSize =  Math.max(minSize, Math.min(maxSize, blockSize));

所以,默认情况下,splitSize = blockSize

Split 是逻辑意义上的 Split。 通常在 M/R 程序或者其他数据处理技术上用到。 splitSize 定义好了之后,可以控制 M/R 中 Mapper 的数量。如果M/R中没有定义 split size , 就用 默认的HDFS配置作为 input split。

例如一个文件 800M,Block大小是 128M,那么Mapper数目就是7个。 6个Mapper处理的数据是128M,1个Mapper处理的数据是32M;

再例如一个目录下有三个文件大小分别为:5M,10M,150M。 这个时候其实会产生四个Mapper处理的数据分别是5M,10M,128M,22M。

Reduce 的数量,可以通过客户端程序制定。

job.setNumReduceTasks(100)

Hadoop的 I/O 操作

HDFS 会对写入的所有数据计算校验和(checksum),并在读取数据时验证校验和。 DataNode 负责在收到数据后,在存储该数据及其校验和之前,对数据进行校验。 客户端从 DataNode 读取数据时,也会验证校验和,将它们与 DataNode 中存储的校验和进行比较

如果客户端在读取数据块时,如果检测错误,首先向 NameNode 报告已损坏的数据块及其正在尝试读操作的这个 DataNode, 在抛出 ChecksumException 异常。NameNode 将这个数据块副本标记为已损坏,这样它不再将客户端的请求直接发送到这个节点。之后它会安排这个数据块的一个副本复制到另一个 DataNode。这样副本数又回到了期望水平。之后,已损坏的数据块副本就会被删除。

可以使用 hadoop 命令来检查一个文件的校验和。

hadoop fs -checksum 文件

Hadoop 默认支持 DEFLATE,Gzip,bzip2 三种压缩格式,多数情况下会使用 bzip2

Codec 使用相关的算法对数据进行编解码。在 Hadoop 中,一个对 CompressionCodec 接口的实现代表一个 codec

スクリーンショット 2020-10-24 21 50 38

对于不同的压缩算法有不同的编解码器。我们要对一个文件进行压缩需要编码器,对一个压缩文件进行解压需要解码器。

スクリーンショット 2020-10-24 21 54 43

CompressionCodec 包含两个函数,如果要对输出数据流的数据进行压缩,可用 CreateOutputStream 方法创建一个 CompressionOutputStream。如果要对输入数据流读取的数据进行解压缩,可用 CreateInputStream 方法创建一个 CompressionInputStream

使用 Java API 压缩・解压缩

先将 bzip2 格式解压,再将文件压缩为 gzip 格式。 通常压缩的时候,指定特定算法的编码器,读取的时候,CompressionCodecFactory 会根据后缀名来推断出 特定算法的解码器。

public class FileCompress {
    public static void main(String[] args) throws IOException {

        // 解压 1.bz2 为 1 -- createInputStream
        String uri = "hdfs://master:9000/data/1.bz2";
        Configuration conf = new Configuration();
        FileSystem fileSystem = FileSystem.get(URI.create(uri.toString()), conf);
        Path inputPath = new Path(uri);
        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
        CompressionCodec codec = factory.getCodec(inputPath);

        // 根据文件后缀判断生成何种类型的CompressionCodec
        if (codec == null) {
            System.out.println("No codec found for " + uri);
            System.exit(1); // 异常退出
        }

        // 解压的路径名
        String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());

        CompressionInputStream inputStream = null;
        CompressionOutputStream outputStream = null;
        try {
            // 对输入流进行解压
            inputStream = codec.createInputStream(fileSystem.open(inputPath)); 
            // 创建输出文件,获得输出流
            outputStream = fileSystem.create(new Path(outputUri)); 
            // 从输入流到输出流复制, 实现解压
            IOUtils.copyBytes(inputStream, outputStream, conf);  
        } finally {
            IOUtils.closeStream(inputStream);
            IOUtils.closeStream(outputStream);
        }

        // 压缩 1 为 1.gz  -- createOutputStream
        CompressionOutputStream compressionOutputStream = null;
        // 压缩
        Path gzPath = new Path("hdfs://master:9000/data/2.gz");
        try {
            inputStream = fileSystem.open(new Path(outputUri)); // 获得源文件输入流
            GzipCodec gzipCodec = new GzipCodec(); // 获得gz格式实例
            gzipCodec.setConf(conf); // 设置Configuration
            compressionOutputStream = gzipCodec.createOutputStream(fileSystem.create(gzPath)); // 创建输出文件获得输出流
            IOUtils.copyBytes(inputStream, compressionOutputStream, 4096, false); // 从输入流复制到输出流 buffsize 4096
            compressionOutputStream.close();            
        } finally {
            // TODO: handle finally clause
            IOUtils.closeStream(inputStream);
            IOUtils.closeStream(outputStream);
        }
    }
}

MapReduce 中使用压缩

  1. 输入的文件的压缩

如果输入的文件是压缩过的,那么在被 MapReduce 读取时,它们会被自动解压,根据文件扩展名来决定应该使用哪一个压缩解码器

  1. MapReduce 作业的输出的压缩

如果要压缩 MapReduce 作业的输出,请在job配置文件中将 mapred.output.compress 属性设置为 true。 将 mapred.output.compression.codec 属性设置为自己打算使用的压缩编码/解码器的类名。

conf.setCompressMapOutput(true);
conf.setMapOutputCompressorClass(GzipCodec.class);

序列化 Writable

在 Hadoop 中,系统中多个节点上进程间的通信通过远程方法调用 RPC 实现的。 RPC 协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息。

Hadoop 使用的是自己的序列化格式 Writable。 Writable 是 Hadoop 的核心,大多数 MapReduce 程序的键值类型都使用它

Writable 接口定义了两个方法,一个将其状态写入 DataOutput 二进制流,另一个从 DataInput 二进制流读取状态。 write 和 readFields 方法。

Writable 类的使用,通过 set / get 方法。比如

IntWritable writable = new IntWritable();
writable.set(123);
writable.get();

对 MapReduce 来说,这个类型很重要,因为中间有一个基于键的排序阶段。

Hadoop 自带很多 Writable 类。

Java 基本类型的 Writable 类

スクリーンショット 2020-10-25 10 34 46

Text 类型 Text 是针对 UTF-8 序列的 Writable 类。一般可以认为它是 java.lang.String 的 Writable 类

BytesWritable 类型 BytesWritable 是对二进制数据数组的封装。

NullWritable 类型 NullWritable 是 Writable 的特殊类型,它的序列化长度是0。

Writable 集合类 常用的,ArrayWritable ,MapWritable ,SortedMapWritable

尽管大多数 MapReduce 程序使用的都是 Writable 类型的键值,但不是 MapReduce API 强制要求的,事实上,可以使用任何类型,只要能有一种机制对每个类进行类型与二进制类型来回转换就可以。

MapReduce 的应用开发

MapReduce 编码都遵循特定的流程,首先写 map 函数和 reduce 函数,最好使用单元测试来确保函数符合预期。 之后写一个驱动程序来运行作业。 然后在本地 IDE 中用一个小数据集运行它,OK之后,把它放在集群中运行。

一个 Configuration 类的实例代表配置属性及其取值的一个集合。 比如一个 configuration-1.xml

<configuration>
    <property>
       <name>color</name>
       <value>yello</value>
    </property>
</configuration>

Configuration 从资源中读取属性值。

Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");
assertThat(conf.get("color"), is("yellow"));

开发 Hadoop 应用时,经常需要从本地运行和集群运行之间进行切换。 我们可以配置不同的 Hadoop 配置文件,比如 hadoop-local.xml,hadoop-localhost.xml,hadoop-cluster.xml 。

hadoop-local.xml

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>file:///</value>
    </property>
</configuration>

hadoop-localhost.xml

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost/</value>
    </property>
</configuration>

hadoop-cluster.xml

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://namenode/</value>
    </property>
</configuration>

命令行,可以通过 -conf 指定配置。

hadoop fs -conf conf/hadoop-localhost.xml -ls .

现在要实现一些 wordCouter 的 MR 程序。

继承 Hadoop 提供的 Mapper 类。

package cn.abc.hadoop.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 *
 * 这里就是mapreduce程序  mapper阶段业务逻辑实现的类
 *
 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 *
 * KEYIN:表示mapper数据输入的时候key的数据类型,在默认的读取数据组件下,叫TextInputFormat,它的行为是一行一行的读取待处理的数据
 * 读取一行,返回一行给我们的mr程序,这种情况下,keyin就表示每一行的起始偏移量,因此数据类型是Long
 *
 * VALUEIN:表述mapper数据输入的时候value的数据类型,在默认的读取数据组件下 valuein就表示读取的这一行内容  因此数据类型是String
 *
 * KEYOUT 表示mapper数据输出的时候key的数据类型  在本案例当中 输出的key是单词  因此数据类型是 String
 *
 * VALUEOUT表示mapper数据输出的时候value的数据类型  在本案例当中 输出的key是单词的次数  因此数据类型是 Integer
 *
 * 这里所说的数据类型 String Long 都是 jdk 自带的类型   在序列化的时候  效率低下 因此hadoop自己封装一套数据类型
 *   long---->LongWritable
 *   String-->Text
 *   Integer--->Intwritable
 *   null-->NullWritable
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 拿到传入进来的一行内容,把数据类型转化为String
        String line = value.toString();

        // 将这一行内容按照分隔符进行一行内容的切割 切割成一个单词数组
        String[] words = line.split(" ");

        // 遍历数组,每出现一个单词  就标记一个数字1  <单词,1>
        // 类似这个格式 hadoop hadoop spark -->   <hadoop,1><hadoop,1><spark,1>
        for (String word : words) {
            // 使用mr程序的上下文context 把mapper阶段处理的数据发送出去,作为reduce节点的输入数据
            context.write(new Text(word), new IntWritable(1));
        }
    }
}
package cn.abc.hadoop.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 *
 * 这里是MR程序 reducer阶段处理的类
 *
 * KEYIN:就是reducer阶段输入的数据key类型,对应mapper的输出key类型  在本案例中  就是单词  Text
 *
 * VALUEIN就是reducer阶段输入的数据value类型,对应mapper的输出value类型  在本案例中  就是单词次数  IntWritable
 * .
 * KEYOUT就是reducer阶段输出的数据key类型 在本案例中  就是单词  Text
 *
 * VALUEOUTreducer阶段输出的数据value类型 在本案例中  就是单词的总次数  IntWritable
 */
public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

    /**
     * 这里是reduce阶段具体业务类的实现方法
     *
     * reduce接收所有来自map阶段处理的数据之后,按照key的字典序进行排序
     * <hello,1><hadoop,1><spark,1><hadoop,1>
     * 排序后:
     * <hadoop,1><hadoop,1><hello,1><spark,1>
     *
     * 按照key是否相同作为一组去调用reduce方法
     * 本方法的key就是这一组相同kv对的共同key
     * 把这一组所有的v作为一个迭代器传入我们的reduce方法
     *
     * <hadoop,[1,1]>
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 定义一个计数器
        int count = 0;
        // 遍历一组迭代器,把每一个数量1累加起来就构成了单词的总次数
        for(IntWritable value:values){
            count +=value.get();
        }

        // 把最终的结果输出
        context.write(key,new IntWritable(count));
    }
}

ReduceTask 设置为3,就会生成3个结果文件,part-r-00000 , part-r-00001 ,part-r-00002

package cn.abc.hadoop.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *
 * 这个类就是mr程序运行时候的主类,本类中组装了一些程序运行时候所需要的信息
 * 比如:使用的是那个Mapper类  那个Reducer类  输入数据在那 输出数据在什么地方
 */
public class WordCountDriver {
    public static void main(String[] args) throws Exception{
        // 通过Job来封装本次mr的相关信息
        Configuration conf = new Configuration();
        // 即使没有下面这行,也可以本地运行
        // 因为hadoop-mapreduce-client-core-2.7.4.jar里的mapred-default.xml 中默认的参数就是 local
        conf.set("mapreduce.framework.name","local");
        Job job = Job.getInstance(conf);

        // 指定本次 mr job jar包运行主类
        job.setJarByClass(WordCountDriver.class);

        // 指定本次 mr 所用的mapper reducer类分别是什么
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 指定本次 mr mapper 阶段的输出 k v 类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 指定本次 mr 最终输出的 k v 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // ReduceTask个数
        job.setNumReduceTasks(3); 

        // 如果业务有需求,就可以设置combiner组件,可以跟 Reducer 一样
        job.setCombinerClass(WordCountReducer.class);

        // 指定本次 mr 输入的数据路径和最终输出结果存放在什么位置
        FileInputFormat.setInputPaths(job,"/wordcount/input");
        FileOutputFormat.setOutputPath(job,new Path("/wordcount/output"));

        // 提交程序  并且监控打印程序执行情况
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

自定义 KV 类型,自定义分区,排序的例子

总结

狭义 Hadoop 是大数据的处理平台,包括

广义 Hadoop 是指 Hadoop 生态体系,生态圈。

MapReduce 计算模型是大数据离线计算模型。MapReduce 是处理 HDFS 上的数据。 MapReduce 的思想是把一个大任务拆分成多个小任务,再把小任务的结果汇总,得到最后的结果。而数据都是历史数据、数据已经存在 HDFS 上。而实时计算框架,就要使用 SparkStreaming,Storm,Flume 等框架。

スクリーンショット 2020-10-25 13 47 39 スクリーンショット 2020-10-25 13 44 27 スクリーンショット 2020-10-25 13 44 47 スクリーンショット 2020-10-25 13 45 18