liusheng / liusheng.github.io

Liusheng's blog
http://liusheng.github.io
5 stars 1 forks source link

Hadoop TeraSort和TeraGen流程解析(WIP) #30

Open liusheng opened 4 years ago

liusheng commented 4 years ago

0. 背景

TeraSort是Hadoop中自带的一个常用的用来做基准测试的应用,主要包含:

1. TeraGen生成测试数据

整个TeraSort流程中,第一步是使用TeraGen工具生成测试数据,TeraGen通过定义的Map任务(无Reduce),来根据输入参数生成测试数据。 首先我们来看一下Hadoop自带的TeraGen生成的数据形式: image

TeraGen生成的数据具有如下格式:

最终格式如下: (10 bytes key) (constant 2 bytes) (32 bytes rowid) (constant 4 bytes) (48 bytes filler) (constant 4 bytes) 其中rowid是行号的16进制数字右对齐

注意: 虽然TeraGen生成的数据是使用随机数生成器生成的,但是每一次执行TeraGen生成的数据都是相同的,当然,具体生成的数据长度是根据输入参数决定的。 TeraGen生成的测试数据是业界比较认可的排序基准测试数据GraySort的输入数据,具体参见这里

2. TeraSort处理流程

  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      usage();
      return 2;
    }
    LOG.info("starting");
    Job job = Job.getInstance(getConf()); //定义Job对象,会依次根据core-default.xml, core-site.xml,
    // mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml中所有的配置项来生成一个Job的定义。
    Path inputDir = new Path(args[0]); //输入数据路径
    Path outputDir = new Path(args[1]); //输出数据路径
    boolean useSimplePartitioner = getUseSimplePartitioner(job); //是否使用简单分区,默认False
    TeraInputFormat.setInputPaths(job, inputDir); //设置输入数据位置
    FileOutputFormat.setOutputPath(job, outputDir);// 设置输出数据位置
    job.setJobName("TeraSort");
    job.setJarByClass(TeraSort.class);
    job.setOutputKeyClass(Text.class);// 定义每一条记录Key的类型
    job.setOutputValueClass(Text.class);// 定义每一条记录的Value类型
    job.setInputFormatClass(TeraInputFormat.class); //设置输入数据的格式
    job.setOutputFormatClass(TeraOutputFormat.class);// 设置输出数据的格式
    if (useSimplePartitioner) {
      job.setPartitionerClass(SimplePartitioner.class); //使用简单分区,会根据输入数据的key的前3字节平分成reduce数量个组
    } else {
      long start = System.currentTimeMillis();
      Path partitionFile = new Path(outputDir, 
                                    TeraInputFormat.PARTITION_FILENAME);
      URI partitionUri = new URI(partitionFile.toString() +
                                 "#" + TeraInputFormat.PARTITION_FILENAME);
      try {
        TeraInputFormat.writePartitionFile(job, partitionFile); //写分区文件
      } catch (Throwable e) {
        LOG.error("{}", e.getMessage(), e);
        return -1;
      }
      job.addCacheFile(partitionUri);  
      long end = System.currentTimeMillis();
      System.out.println("Spent " + (end - start) + "ms computing partitions.");
      job.setPartitionerClass(TotalOrderPartitioner.class); // 使用全局排序的分区方法
    }

    job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
    int ret = job.waitForCompletion(true) ? 0 : 1;
    LOG.info("done");
    return ret;
  }

这里主要做的事情,包含对job的input和outpu的位置设置,key/value的类型等,需要重点关注的是:

FileInputFormat.java中定义了getSplits方法,用于分割输入文件

首先会根据Input输入的位置,获取所有输入数据的文件信息,然后遍历所有的文件,对每一个文件分割Split,代码如下:

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);

    boolean ignoreDirs = !getInputDirRecursive(job)
      && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
    for (FileStatus file: files) {
......

在对每一个输入文件分割Split的时候,会计算输入数据的Split大小,代码如下:

  protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

根据这里的定义,可以发现SplitSize是根据blockSize及最大和最小取值区间决定的,如果blockSize在最大最小值内,则取blockSize作为SplitSize,否则,如果blockSize小于最小值,则取最小值作为blockSize,反之,取最大值。

然后对每一个输入的文件根据SplitSize进行分割,代码如下:

        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }

上面的代码可以看出,每一个Split通常就是一个blockSize大小,将输入文件根据blockSize分割成为多个Split,传给Mapper,这里需要注意的是,每一个split不一定严格是blockSize大小,上面有一个SPLIT_SLOP定义,值为1.1也就是分割的时候,剩余的字节数如果小于blockSize的1.1倍,那么就把剩余的字节当做一个Split,放到最后一个Split记录中。