Hadoop 切片机制

发布于 2024-02-08 09:53:48 字数 5215 浏览 19 评论 0

一. InputFormat

在运行 MapReduce 程序时,输入的文件格式包括:基于行的日志文件,二进制格式文件,数据库表等。那么,针对不同的数据类型,MapReduce 是如何读取这些数据的呢?

在数据传递给 MapTask 之前,需要对数据进行切片处理等工作。下图是一个完整的 Map,Reduce 数据处理流程。

Hadoop 数据处理的第一步由 Inputformat 实现类完成。Hadoop 框架提供了一个 InputFormat 接口。数据处理相关的 InputFormat 类都需要实现此接口。

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public abstract class InputFormat<K, V> {
    public InputFormat() {
    }

    public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;

    public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}

这个接口有 2 个方法:

  • getSplits :此方法用于数据分片,返回一个数据分片的数组。
  • createRecordReader :此方法返回一个 RecordReader ,用来将数据转换成键值对的形式。

二. 数据分片

一个超大文件在 HDFS 上存储时,是以多个 Block 存储在不同的节点上,比如一个 512M 的文件,HDFS 默认一个 Block 为 128M,那么 1G 的文件分成 4 个 Block 存储在集群中 4 个节点上。

Hadoop 在 map 阶段处理上述 512M 的大文件时分成几个 MapTask 进行处理呢?Hadoop 的 MapTask 并行度与数据切片有有关系,数据切片是对输入的文件在逻辑上进行分片,对文件切成多少份,Hadoop 就会分配多少个 MapTask 任务进行并行执行该文件,原理如下图所示。

Block 与 Splite 区别:Block 是 HDFS 物理上把数据分成一块一块;数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

如上图所示,一个 512M 的文件在 HDFS 上存储时,默认一个 block 为 128M,那么该文件需要 4 个 block 进行物理存储;若对该文件进行切片,假设以 100M 大小进行切片,该文件在逻辑上需要切成 5 片,则需要 5 个 MapTask 任务进行处理。

三. 数据切片源码

/** 
 * Generate the list of files and make them into FileSplits.
 * @param job the job context
 * @throws IOException
 */
public List<InputSplit> getSplits(JobContext job) throws IOException {
  StopWatch sw = new StopWatch().start();
  /*
   * 	1、minSize 默认最小值为 1
   *     maxSize 默认最大值为 9,223,372,036,854,775,807‬
   * */
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  long maxSize = getMaxSplitSize(job);

  // generate splits
  List<InputSplit> splits = new ArrayList<InputSplit>();
  /*
   *   2、获取所有需要处理的文件
   * */
  List<FileStatus> files = listStatus(job);
  for (FileStatus file: files) {
    Path path = file.getPath();
    /*
     *   3、获取文件的大小
     * */
    long length = file.getLen();
    if (length != 0) {
      BlockLocation[] blkLocations;
      if (file instanceof LocatedFileStatus) {
        /*
         * 4、获取文件的 block,比如一个 500M 的文件,默认一个 Block 为 128M,500M 的文件会分布在 4 个 DataNode 节点上进行存储
         * */
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();
      } else {
      	/*
      	 * 5、Hadoop 如不特殊指定,默认用的 HDFS 文件系统,只会走上面 if 分支
      	 * */
        FileSystem fs = path.getFileSystem(job.getConfiguration());
        blkLocations = fs.getFileBlockLocations(file, 0, length);
      }
      if (isSplitable(job, path)) {
        /*
         * 6、获取 Block 块的大小,默认为 128M
         * */
        long blockSize = file.getBlockSize();
        /*
         * 7、计算 spliteSize 分片的尺寸,首先取 blockSize 与 maxSize 之间的最小值即 blockSize,
                        *         然后取 blockSize 与 minSize 之间的最大值,即为 blockSize=128M,所以分片尺寸默认为 128M
         * */
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);

        long bytesRemaining = length;
        /*
         * 8、计算分片 file 文件可以在逻辑上划分为多少个数据切片,并把切片信息加入到 List 集合中
         * */
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                      blkLocations[blkIndex].getHosts(),
                      blkLocations[blkIndex].getCachedHosts()));
          bytesRemaining -= splitSize;
        }

        /*
         * 9、如果文件最后一个切片不满 128M,单独切分到一个数据切片中
         * */
        if (bytesRemaining != 0) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                     blkLocations[blkIndex].getHosts(),
                     blkLocations[blkIndex].getCachedHosts()));
        }
      } else { // not splitable
        /*
         * 10、如果文件不可以切分,比如压缩文件,会创建一个数据切片
         * */
        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                    blkLocations[0].getCachedHosts()));
      }
    } else { 
      //Create empty hosts array for zero length file
  	/*
  	 * 11、如果为空文件,创建一个空的数据切片
  	 * */
      splits.add(makeSplit(path, 0, length, new String[0]));
    }
  }
  // Save the number of input files for metrics/loadgen
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  sw.stop();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()
        + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
  }
  return splits;
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

帥小哥

暂无简介

0 文章
0 评论
22 人气
更多

推荐作者

qq_E2Iff7

文章 0 评论 0

Archangel

文章 0 评论 0

freedog

文章 0 评论 0

Hunk

文章 0 评论 0

18819270189

文章 0 评论 0

wenkai

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文