rainit2006 / My_AWS-Cloud

0 stars 0 forks source link

Hadoop #19

Open rainit2006 opened 6 years ago

rainit2006 commented 6 years ago

rainit2006 commented 6 years ago

rainit2006 commented 6 years ago

Hadoop编程入门(2011年的内容了,有些old。特别是map函数的参数也变了) http://www.jiacheo.org/blog/233

  1. 通过split之后, 他变成了若干的分片, 每个分片交给一个Map处理

  2. map处理完后, tasktracker会把数据进行复制和排序, 然后通过输出的key 和value进行 partition的划分, 并把partition相同的map输出, 合并为相同的reduce的输入.

  3. ruducer通过处理, 把数据输出, 每个相同的key, 一定在一个reduce中处理完, 每一个reduce至少对应一份输出(可以通过扩展MultipleOutputFormat来得到多分输出)

  4. 来看一个例子, 如下图:(来自 《hadoop权威指南》 一书) image 实例 说明几点: 5.1 输入的数据可能就是一堆文本 5.2 mapper会解析每行数据, 然后提取有效的数据, 作为输出. 这里的例子是 从日志文件中提取每一年每天的气温, 最后会计算每年的最高气温 5.3 map的输出就是一条一条的 key-value 5.4 通过shuffle之后, 变成reduce的输入, 这是相同的key对应的value被组合成了一个迭代器 5.5 reduce的任务是提取每一年的最高气温, 然后输出

二. Mapper

  1. mapper可以选择性地继承 MapreduceBase这个基类, 他只是把一些方法实现了而已, 即使方法体是空的.
  2. mapper必须实现 Mapper 接口(0.20以前的版本), 这是一个泛型接口, 需要执行输入和输出的key-value的类型, 这些类型通常都是Wriable接口的实现类
  3. 实现map方法, 方法有四个参数, 前面两个就是输入的 Key 和 value, 第三个参数是 OuputCollector, 用于收集输出的, 第四个是reporter,用来报告一些状态的,可以用于debug 3.1 input 默认是一行一条记录, 每天记录都放在value里边 3.2 output 每次搜集一条 K-V记录, 一个K可以对应多个value, 在reduce 里面体现为一个 iterator
  4. 覆盖 configure方法可以得到JobConf的实例, 这个JobConf是在Job运行时传递过来的, 可以跟外部资源进行数据交互 三. Reducer
  5. reduce也可以选择继承 MapreduceBase这个基类, 功能跟mapper一样.
  6. reducer必须实现Reducer接口, 这个接口同样是泛型接口, 意义跟Mapper的类似
  7. 实现reduce方法, 这个方法也有四个参数, 第一个是输入的key, 第二个是输入的 value的迭代器, 可以遍历所有的value,相当于一个列表, outputCollector跟map的一样, 是输出的搜集器, 每次搜集都是key-value的形式, report的作用跟map的相同.
  8. 在新版本中, hadoop已经将后面两个参数合并到一个context对象里边了, 当然还会兼容就版本的 接口. >0.19.x
  9. 覆盖configure方法, 作用跟map的相同
  10. 覆盖close 方法,可以做一些reduce结束后的处理工作.(clean up) 四. Combiner
  11. combiner的作用是, 将map的输出,先计算一遍,得到初步的合并结果, 减少reduce的计算压力.
  12. combiner的编写方法跟reduce是一样的, 他本来就是一个Reducer的实现类
  13. 当reducer符合函数 F(a,b) = F(F(a), F(b)) 时, combinner可以与reduce相同. 比如 sum(a,b,c,d,e,f,g) = sum(sum(a,b) ,sum(c,d,e,f) , sum(g)) 还有max, min等等.
  14. 编写正确的combiner可以优化整个mapreduce程序的性能.(特别是当reduce是性能瓶颈的时候.)
  15. combiner可以跟reducer不同. 五. Configuration
  16. 后加的属性的值会覆盖前面定义的相同名称的属性的值.
  17. 被定义为 final的属性(在属性定义中加上true标签)不会被后面的同名属性定义的值给覆盖.
  18. 系统属性比通过资源定义的属性优先级高, 也就是通过System.setProperty()方法会覆盖在资源文件中定义的属性的值.
  19. 系统属性定义必须在资源文件中有相应的定义才会生效.
  20. 通过 -D 选项定义的属性, 比在资源文件中定义的属性优先级要高. 六. Run Jobs
  21. 设置 inputs & output 1.1 先判断输入是否存在 (不存在会导致出错,最好利用程序来判断.) 1.2 判断输出是否已经存在(存在也会导致出错) 1.3 养成一种好的习惯(先判断,再执行)
  22. 设置 mapper、reducer、combiner. 各个实现类的class对象. XXXX.class
  23. 设置 inputformat & outputformat & types 3.1 input和output format都有两种, 一种是 textfile, 一种是sequencefile. 简单理解, textfile是文本组织的形式,sequence file是 二进制组织的形式. 3.2 Types的设置, 根据输入和输出的数据类型, 设置各种Writable接口的实现类的class对象.
  24. 设置reduce count 4.1 reduce count可以为0, 当你的数据无需reduce的时候. 4.2 reduce数量最好稍微少于当前可用的slots的数量, 这样reduce就能在一波计算中算好. (一个slot可以理解为一个计算单元(资源).) 七. 其他的一些细节.
  25. ChainMapper可以实现链式执行mapper 他本身就是一个Mapper的实现类. 提供了一个addMapper的方法.
  26. ChainReducer 跟ChainMapper类似, 可以实现链式执行reducer, 他是Reducer的实现类.
  27. 多个job先后运行, 可以通过先后执行 JobClient.runJob方法来实现先后顺序
  28. 扩展MultipleOutputFormat接口, 可以实现一个reduce对应多份输出 (而且可以指定文件名哦)
  29. Partitioner 接口用于将 Map的输出结果进行分区, 分区相同的key对应的数据会被同一个reducer处理 5.1 提供了一个接口方法: public int getPartition(K2 key, V2 value, int numReduceTasks) 5.2 可以自己定义, 根据key的某些特指来划分, 也可以根据value的某些特质来划分. 5.3 numReduceTasks就是设置的reduce的个数.一般返回的partition的值应该都小于这个值.(%)
  30. reporter的作用 6.1 reporter.incrCounter(key, amount). 比如对数据计算是, 一些不合规范的脏数据, 我们可以通过counter来记录有多少 6.2 reporter.setStatus(status); 方法可以设置一条状态消息, 当我们发现job运行出现这条消息是, 说明出现了我们预期的(错误或者正确)的情况, 用于debug. 6.3 reporter.progress(), 像mapreduce框架报告当前运行进度. 这个progress可以起到心跳的作用. 一个task要是超过10分钟没有想mapreduce框架报告情况, 这个reduce会被kill掉. 当你的任务处理会比较旧是, 最好定时向mapreduce汇报你的状态.
  31. 通过实现Wriable接口, 我们可以自定义key和value的类型, 使用起来就像pojo, 不需要每次都进行parse. 如果你的自定义类型是Key的类型, 则需要同时实现Comparable 接口, 用于排序. 比如MapWritable就是一个例子.
rainit2006 commented 6 years ago

Hadoop编程初级实例(2011年的例子,old了) http://www.jiacheo.org/blog/233

  1. 需求: 统计某个站点每天的PV
  2. 数据输入: 以天为分区存放着的日志数据, 一条日志代表一个PV
  3. 数据输出: 日期 PV
  4. Mapper编写 image 主要的工作很简单, split每一条日志, 取出日期, 并对该日期的PV搜集一条记录, 记录的value为ONE(1, 一条记录代表一个PV)
  5. Reducer编写 image reduce的任务是将每天(key相同的为同一天) 的日志进行汇总(sum), 最后以天为key输出汇总结果.
  6. 设置环境, 指定job(Run) 6.1 设置输入路径. image 6.2 设置输出路径 image 6.3 设置Mapper/Reducer 和 输入数据的数据格式和数据类型 image 6.4 执行命令: hadoop jar site-pv-job.jar org.jiacheo.SitePVSumSampleJob 6.5 查看hadoop的web 工具, 显示当前job进度. image 可以看出, 此次输入产生了14292个map,和29个reduce. reduce数这么少是因为我的reduce的slots数只有30, 所以设置为29, 以防一个挂了, 还能在一波reduce中算好. 6.6 计算结果. image 上面部分是hadoop cli客户端显示的进度, 中间是web工具显示的输入输出的一些数据的统计.可以看出, 此次输入数据总共有1.6TB大小, 设计的总记录数为69.6亿. 也就是这份数据记录了该站点的69.6亿的PV. 左下角可以看出, 执行时间比较长, 用了18分钟+46秒.这里慢的原因不在于reduce, 而是我的map的slots太少, 只有300个, 总共一万多个map, 那要分好几百波才能算完map, 所以瓶颈在map这里.右下角是统计的结果数据, 可以看出, 该站点的整体的PV是呈现上升趋势的.
rainit2006 commented 6 years ago

http://www.cnblogs.com/sunddenly/p/3985386.html

1.2.1 Mapper类  map 函数定义如下代码

protected void map(KEYIN key, VALUEIN value, Context context) 
throws IOException, InterruptedException 
{
     context.write((KEYOUT) key, (VALUEOUT) value);
}

1.2.2 Reducer类 reduce 函数定义如下代码

protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException 
{
     for(VALUEIN value: values) {
       context.write((KEYOUT) key, (VALUEOUT) value);
     }
 }

二、 MapReduce 执行原理 2.1 MapRduce执行流程 image 2.2 Mapper 任务的执行过程 image 其中, 第二阶段是对输入片中的记录按照一定的规则解析成键值对。有个默认规则是把每一行文本内容解析成键值对。“键”是每一行的起始位置(单位是字节),“值”是本行的文本内容。 第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。 2.3 Reducer执行过程 image 在整个MapReduce 程序的执行过程中如图2.4,我可以根据上面的讲解来分析下面MapReducer执行过程,从下图可知每个Mapper任务分了两个区,因此会有两个Reducer任务,最终产生两个HDFS副本。 image

其他: Mapper 类的泛型不是java 的基本类型,而是Hadoop 的数据类型LongWritable、Text、IntWritable。读者可以简单的等价为java 的类long、String、int。

rainit2006 commented 6 years ago

Hadoop初级实验, 很多入门的例子,很好 http://www.jianshu.com/p/7328bb45a7cd

rainit2006 commented 6 years ago

JobClient.runJob(JobConf) 和 job.waitForCompletion 的区别

JobConf and everything else in the org.apache.hadoop.mapred package is part of the old API used to write hadoop jobs,

Job and everything in the org.apache.hadoop.mapreduce package is part of the new and preferred API to write hadoop jobs.

Both APIs generally provide equivalent core functionality. Job control is performed through the Job class in the new API, rather than the old JobClient, which no longer exists in the new API.


Hadoop旧版本0.2. x与新版本1. x的 API比较 1.存放的位置 一个位于org.apache.hadoop.mapred,这个是老的API; 一个位于org.apache.hadoop.mapreduce,这个是新的API。

2.接口与抽象类 在老的API体系中,InputFormat是一个接口,而在新的API体系中,InputFormat是一个抽象类。 熟悉java的同学都知道接口最大的优点是允许一个类实现多个接口,从而达到类似多重继承的目的。但是在hadoop体系中,这个优势体现并不明显。而且接口的实现类(不含抽象类),必须实现接口的每个方法。相对来说,抽象类的约束较弱。抽象类可以为某些方法默认实现(实际上我们在阅读hadoop的源码时,能看到很多抽象类,大部分抽象类里既有abstract方法需要自己重写,又有不带abstract默认已经实现的方法)。这样做的好处就是,当需要给抽象类添加新方法时,为了保持向后兼容,只需要给抽象类的新添加方法提供默认实现即可,之前的代码完全无须修改。

3 上下文封装 新版API 将变量和函数封装成各种上下文(Context)类,使得API 具有更好的易用性和扩展性。首先,函数参数列表经封装后变短,使得函数更容易使用;其次,当需要修改或添加某些变量或函数时,只需修改封装后的上下文类即可,用户代码无须修改,这样保证了向后兼容性,具有良好的扩展性。 这些Context 各自封装了一种实体的基本信息及对应的操作(setter 和getter 函数),如JobContext、TaskAttemptContext 分别封装了Job 和Task 的基本信息,TaskInputOutputContext 封装了Task 的各种输入输出操作,MapContext 和ReduceContext 分别封装了Mapper 和Reducer 对外的公共接口。

Hadoop1.x和2.x的其他变化: 1.配置文件的路径 在1.x中,Hadoop的配置文件是放在$HADOOP_HOME/conf目录下的,关键的配置文件在src目录都有对应的存放着默认值的文件,如下: image

在2.x中,Hadoop的架构发生了变化,配置文件的路径也发生了变化,放到了$HADOOP_HOME/etc/hadoop目录,这样修改的目的,应该是让其更接近于Linux的目录结构吧,让Linux用户理解起来更容易。 Hadoop 2.x中配置文件的几个主要的变化: l 去除了原来1.x中包括的$HADOOP_HOME/src目录,该目录包括关键配置文件的默认值; l 默认不存在mapred-site.xml文件,需要将当前mapred-site.xml.template文件copy一份并重命名为mapred-site.xml,并且只是一个具有configuration节点的空文件; l 默认不存在mapred-queues.xml文件,需要将当前mapred-queues.xml.template文件copy一份并重命名为mapred-queues.xml; l 删除了master文件,现在master的配置在hdfs-site.xml通过属性dfs.namenode.secondary.http-address来设置, 如下:

<property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>nginx1:9001</value>
</property>

l 增加了yarn-env.sh,用于设置ResourceManager需要的环境变量,主要需要修改JAVA_HOME; l 增加yarn-site.xml配置文件,用于设置ResourceManager;

2,命令文件目录的变化 在1.x中,所有的命令文件,都是放在bin目录下,没有区分客户端和服务端命令,并且最终命令的执行都会调用hadoop去执行; 而在2.x中将服务端使用的命令单独放到了sbin目录,其中有几个主要的变化: http://blog.csdn.net/fenglibing/article/details/32916445

rainit2006 commented 6 years ago

Hadoop Chain Mapper Example Pattern : Mapper1 -> Mapper2 -> Reducer-> Mapper3 关键: ChainMapper类。

ChainMapper.addMapper(job,         //主作业
                Mapper1.class,             //待加入的map class
                LongWritable.class,        //待加入map class的输入key类型
                Text.class,                //待加入map class的输入value类型 
                Text.class,                //待加入map class的输出key类型
                VLongWritable.class,       //待加入map class的输出value类型
                map1Conf);                 //待加入map class的配置信息

        //配置mapper2
        ChainMapper.addMapper(job, Mapper2.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false));

        /**
         * 配置Reducer
         * 注意此处使用的是setReducer()方法
         */
        ChainReducer.setReducer(job, Reducer_Only.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false));

        //配置mapper3
        ChainReducer.addMapper(job, Mapper3.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false));

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);

对任意MR作业,Map和Reduce阶段可以有无限个Mapper,但reduer只能有一个. ChainReducer专门提供了一个setRreducer()方法来设置整个作业唯一的Reducer。

rainit2006 commented 6 years ago

Fila项目里到的Hadoop 类

-- getBaseRecordWriter method

protected abstract RecordWriter<K,V> getBaseRecordWriter(FileSystem fs,
                                                         JobConf job,
                                                         String name,
                                                         Progressable arg3)
                                                  throws IOException

Parameters:

fs - the file system to use
job - a job conf object
name - the name of the file over which a record writer object will be constructed
arg3 - a progressable object

Returns: A RecordWriter object over the given file

项目里重写了该函数中,没有用fs参数,而是用自己定义的RecordWriter对象来实现文件的写入。 利用FileOutputFormat的getTaskOutputPath函数创建了一个自己的ouput directory. 接着利用orcOutputFormat来生成一个新的RecordWriter作为函数的返回值。

@Override
    protected RecordWriter getBaseRecordWriter(FileSystem fileSystem, JobConf job, String name, Progressable progressable) throws IOException {
        // Because we know that each key results in a file and each file only holds data for one key we can close
        // the previous writer (if any) to prevent running into OOM exceptions.
        if (recordWriter != null) {
            recordWriter.close(null);
            recordWriter = null;
        }

        LOG.info("Creating writer for {}", name);

        // it turns out OrcOutputFormat ignores the file system and creates the file on hdfs instead
        Path file = FileOutputFormat.getTaskOutputPath(job, name);
        final String fsName = file.toString();
        LOG.info("Name converted to {}", fsName);

        recordWriter = orcOutputFormat.getRecordWriter(fileSystem, job, fsName, progressable);
        return recordWriter;
    }
rainit2006 commented 6 years ago

Spark image

RDD(Resilient Distributed Dataset) 不変(イミュータブル)で並列実行可能な(分割された)コレクションです。

RDDのメソッドの2種類: 「Transformations」と「Actions」。 「Transformations」はRDDを操作し、結果を新しいRDDとして返します。「Actions」はRDDのデータを操作し、結果をRDD以外の形式で返すか保存を行います。

「Transformations」の代表的なメソッドは、mapやfilterです。 mapはRDD内のデータの一つ一つに対して記述した処理を行い、結果を返します。 filterは文字通りフィルタリングするメソッド unionはRDD同士を連結するものです。 flatMapでは各カラムを分割し、一つのカラムにしています。サンプルではunionした結果をスペースで分割し、一つのカラムにしています.

「Actions」の代表的なメソッドは、reduceやcountです。 reduceはRDD内の2つの要素に対して操作を行い、結果を返します。 countは文字通りRDDのデータの件数を返すメソッドです。

https://dev.classmethod.jp/etc/apache-spark_rdd_investigation/

<KEN_All_ROME.CSV>
"0600000","北海道","札幌市 中央区","以下に掲載がない場合","HOKKAIDO","SAPPORO SHI CHUO KU","IKANIKEISAIGANAIBAAI"
"0640941","北海道","札幌市 中央区","旭ケ丘","HOKKAIDO","SAPPORO SHI CHUO KU","ASAHIGAOKA"
"0600041","北海道","札幌市 中央区","大通東","HOKKAIDO","SAPPORO SHI CHUO KU","ODORIHIGASHI"
(以降略)

サンプルソースです。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object Startup{
  def printRDD(filterName: String, rdd: org.apache.spark.rdd.RDD[_]) = {
    println(filterName)

    rdd.foreach {r => {
        println(r)
      }
    }
  }

  def main(args: Array[String]) :Unit = {
    val conf = new SparkConf().setAppName("RddSample").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val inputRDD = sc.textFile("KEN_All_ROME.CSV")

    //mapの例
    val addresses = inputRDD.map{line =>
      val splited = line.replace("\"", "").split(",")

      var result: Array[String] = null
      if (splited(6) == "IKANIKEISAIGANAIBAAI")
        result = Array(splited(0), splited(4), splited(5))
      else
        result = Array(splited(0), splited(4), splited(5), splited(6))

      result.mkString(" ")
    }

    printRDD("mappedRDD", addresses)

    //filterとunionの例
    val filtered1 = addresses.filter(line => line.contains("OSAKA")).filter(line => line.contains("AOBADAI"))
    val filtered2 = addresses.filter(line => line.contains("KANAGAWA")).filter(line => line.contains("WAKABADAI"))
    val unioned = filtered1.union(filtered2)

    printRDD("filtered RDD 1", filtered1)
    printRDD("filtered RDD 2", filtered2)
    printRDD("unioned RDD", unioned)

    //flatMapの例
    val flatmapped = unioned.flatMap(line => line.split(" "))
    printRDD("flatmapped", flatmapped)

    //reduceの例
    val reduced = flatmapped.reduce((x, y) => x + " " + y)
    println("reduced")
    println(reduced)

    //countの例
    val count = inputRDD.count
    println("count")
    println(count)
  }
}

image