返回介绍

Using Hadoop MapReduce

发布于 2025-02-25 23:44:05 字数 8576 浏览 0 评论 0 收藏 0

We want to count the number of times each word occurs in a set of books. We will do this in Python.

start-dfs.sh
start-yarn.sh

This will generate a lot of chatter

15/04/06 12:42:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [localhost]
localhost: starting namenode, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/hadoop-cliburn-namenode-lister.dulci.duhs.duke.edu.out
localhost: starting datanode, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/hadoop-cliburn-datanode-lister.dulci.duhs.duke.edu.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/hadoop-cliburn-secondarynamenode-lister.dulci.duhs.duke.edu.out
15/04/06 12:42:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
starting yarn daemons
starting resourcemanager, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/yarn-cliburn-resourcemanager-lister.dulci.duhs.duke.edu.out
localhost: starting nodemanager, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/yarn-cliburn-nodemanager-lister.dulci.duhs.duke.edu.out
hdfs dfs -mkdir /user
hdfs dfs -mkdir /user/cliburn
hadoop dfs -copyFromLocal books /user/cliburn/books
%%file mapper.py
#!/usr/bin/env python

import sys

def read_input(file):
    for line in file:
        yield line.split()

def main(sep='\t'):
    data = read_input(sys.stdin)
    for words in data:
        for word in words:
            print '%s%s%d' % (word, sep, 1)

if __name__ == '__main__':
    main()
Writing mapper.py
%%file reducer.py
#!/usr/bin/env python

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, sep):
    for line in file:
        yield line.rstrip().split(sep, 1)

def main(sep='\t'):
    data = read_mapper_output(sys.stdin, sep=sep)
    for word, group in groupby(data, itemgetter(0)):
        total_count = sum(int(count) for word, count in group)
        print '%s%s%d' % (word, sep, total_count)

if __name__ == '__main__':
    main()
Overwriting reducer.py
! chmod +x maper.py
! chmod +x reducer.py

The native language for Hadoop is Java, but Hadoop stremaing allows custom prograsm in other langauges to write the mapper, combiner and reducer functions. For full set of options, see http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/HadoopStreaming.html

hadoop jar $HADOOP_HOME/libexec/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-file ./mapper.py    -mapper ./mapper.py \
-file ./reducer.py   -reducer ./reducer.py \
-input /user/cliburn/books/* -output /user/cliburn/books-output
hdfs dfs -ls /user/cliburn/books-output
hdfs dfs -cat /user/cliburn/books-output/part-00000

./sbin/stop-yarn.sh ./sbin/stop-dfs.sh

Using MrJob

The Python module mrjob removes a lot of the boilerplate and can also send jobs to Amazon’s implemtation of Hadoop known as Elastic Map Reduce (EMR).

! pip install mrjob
Requirement already satisfied (use --upgrade to upgrade): mrjob in /Users/cliburn/anaconda/lib/python2.7/site-packages
Requirement already satisfied (use --upgrade to upgrade): boto>=2.2.0 in /Users/cliburn/anaconda/lib/python2.7/site-packages (from mrjob)
Requirement already satisfied (use --upgrade to upgrade): PyYAML in /Users/cliburn/anaconda/lib/python2.7/site-packages (from mrjob)
Requirement already satisfied (use --upgrade to upgrade): simplejson>=2.0.9 in /Users/cliburn/anaconda/lib/python2.7/site-packages (from mrjob)
%%file word_count.py
# From http://mrjob.readthedocs.org/en/latest/guides/quickstart.html#writing-your-first-job
from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()
Writing word_count.py

Running the job

As a single Python process for debugging

python word_count.py books/*

To run on Hadoop cluster

python word_count.py -r hadoop books/*

To run on Amazon EMR using files on S3

python word_count.py -r emr s3://<path_to_books>

Java version

For comparison, here is the first Java version from the official tutorial :

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Hadoop MapReduce Patterns

Most Hadoop work flows are organized as several rounds of map/reduce - this is known as job chaining. Because I/O is so expensive, chain folding where jobs are rearranged to minimize inputs/outputs and job merging where unrelated jobs using the same dataset are run togtether are common. In mrjob , job chaining is performed via the steps abstraction .

There are several common patterns that are repeatedly used in Hadoop MapReduce programs:

  • Summarization (e.g. sum, mean, counting)
  • Filtering (e.g. subsampling, removing poor quality items, top 10 lists)
  • Data organization (e.g. converting to hiearhical format, binning)
  • Joins

While it is certinly possible, it will take a lot of work to code, debug and optimize any non-trivial program using just MapReduce construct, for example regularized logistic regression on a large data set. Hence, we will switch our focus to more modern tools such as Spark and Impala that provide higher level abstractions and often greater efficiency.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文