返回介绍

12.3 使用 Python 编写 MapReduce

发布于 2024-01-29 22:54:22 字数 7652 浏览 0 评论 0 收藏 0

Map与Reduce为两个独立函数,为了加快各节点的处理速度,使用并行的计算方式,map运算的结果再由reduce继续进行合并。例如,要统计图书馆有多少本书籍,首先一人一排进行统计(map),其次将每个人的统计结果进行汇总(reduce),最终得出总数。Hadoop除了提供原生态的Java来编写MapReduce任务,还提供了其他语言操作的API——Hadoop Streaming,它通过使用标准的输入与输出来实现map与reduce之前传递数据,映射到Python中便是sys.stdin输入数据、sys.stdout输出数据。其他业务逻辑也直接在Python中编写。

下面实现一个统计文本文件(/home/test/hadoop/input.txt)中所有单词出现的词频功能,分别使用原生Python与框架方式来编写mapreduce。文本文件内容如下:

【/home/test/hadoop/input.txt】

foo foo quux labs foo bar quux abc bar see you by test welcome test
abc labs foo me python hadoop ab ac bc bec python

12.3.1 用原生Python编写MapReduce详解

(1)编写Map代码

见下面的mapper.py代码,它会从标准输入(stdin)读取数据,默认以空格分割单词,然后按行输出单词及其词频到标准输出(stdout),不过整个Map处理过程并不会统计每个单词出现的总次数,而是直接输出“word 1”,以便作为Reduce的输入进行统计,要求mapper.py具备可执行权限,执行chmod+x/home/test/hadoop/mapper.py。

【/home/test/hadoop/mapper.py】

#!/usr/bin/env python
import sys
#输入为标准输入stdin;
for line in sys.stdin:
  #删除开头和结尾的空格;
  line = line.strip
  #以默认空格分隔行单词到words列表;
  words = line.split
  for word in words:
     #输出所有单词,格式为“单词,1”以便作为Reduce的输入;
     print '%s\t%s' % (word, 1)

(2)编写Reduce代码

见下面的reducer.py代码,它会从标准输入(stdin)读取mapper.py的结果,然后统计每个单词出现的总次数并输出到标准输出(stdout),要求reducer.py同样具备可执行权限,执行chmod+x/home/test/hadoop/reducer.py。

【/home/test/hadoop/reducer.py】

#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# 获取标准输入,即mapper.py的输出;
for line in sys.stdin:
  #删除开头和结尾的空格;
  line = line.strip
  # 解析mapper.py输出作为程序的输入,以tab作为分隔符;
  word, count = line.split('\t', 1)
  # 转换count从字符型成整型;
  try:
     count = int(count)
  except ValueError:
     # count非数字时,忽略此行;
     continue
  # 要求mapper.py的输出做排序(sort)操作,以便对连续的word做判断;
  if current_word == word:
     current_count += count
  else:
     if current_word:
        # 输出当前word统计结果到标准输出
        print '%s\t%s' % (current_word, current_count)
     current_count = count
     current_word = word
# 输出最后一个word统计
if current_word == word:
  print '%s\t%s' % (current_word, current_count)

(3)测试代码

我们可以在Hadoop平台运行之前在本地进行测试,校验mapper.py与reducer.py运行的结果是否正确,测试结果如图12-4所示。

测试reducer.py时需要对mapper.py的输出做排序(sort)操作,当然,Hadoop环境会自动实现排序,如图12-5所示。

(4)在Hadoop平台运行代码

首先在HDFS上创建文本文件存储目录,本示例中为/user/root/word,运行命令:

# /usr/local/hadoop-1.2.1/bin/hadoop dfs -mkdir /user/root/word

上传文件至HDFS,本示例中为/home/test/hadoop/input.txt,如果有多个文件,可采用以下方法进行操作,因为Hadoop分析目标默认针对目录,目录下的文件都在运算范围中。

# /usr/local/hadoop-1.2.1/bin/hadoop fs –put /home/test/hadoop/input.txt /user/root/word/
# /usr/local/hadoop-1.2.1/bin/hadoop dfs -ls /user/root/word/
Found 1 items
-rw-r--r--  2 root supergroup  118 2014-02-10 09:49 /user/root/word/input.txt

图12-4 mapper执行结果(部分截图)

图12-5 reducer执行结果

下一步便是关键的执行MapReduce任务了,输出结果文件指定/output/word,执行以下命令:

# /usr/local/hadoop-1.2.1/bin/hadoop jar /usr/local/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar -file ./mapper.py -mapper ./mapper.py -file ./reducer.py -reducer ./reducer.py -input /user/root/word -output /output/word

图12-6为返回的执行结果,可以看到map及reduce计算的百分比进度。

图12-6 执行MapReduce任务结果

访问http://192.168.1.20:50030/jobtracker.jsp,点击生成的Jobid,查看mapreduce job信息,如图12-7所示。

图12-7 Web查看mapreduce job信息(部分截图)

查看生成的分析结果文件清单,其中/output/word/part-00000为分析结果文件,如图12-8所示。

图12-8 任务输出文件清单

最后查看结果数据,图12-9显示了单词个数统计的结果,整个分析过程结束。

图12-9 查看结果文件part-00000内容

提示  HDFS常用操作命令有:

1)创建目录,示例:bin/hadoop dfs-mkdir/data/root/test。

2)列出目录清单,示例:bin/hadoop dfs-ls/data/root。

3)删除文件或目录,示例:bin/hadoop fs-rmr/data/root/test。

4)上传文件,示例:bin/hadoop fs-put/home/test/hadoop/*.txt/data/root/test。

5)查看文件内容,示例:bin/hadoop dfs-cat/output/word/part-00000。

12.3.2 用Mrjob框架编写MapReduce详解

Mrjob(http://pythonhosted.org/mrjob/index.html)是一个编写MapReduce任务的开源Python框架,它实际上对Hadoop Streaming的命令行进行了封装,因此接触不到Hadoop的数据流命令行,使我们可以更轻松、快速编写MapReduce任务。Mrjob具有如下特点。

1)代码简洁,map及reduce函数通过一个Python文件就可以搞定;

2)支持多步骤的MapReduce任务工作流;

3)支持多种运行方式,包括内嵌方式、本地环境、Hadoop、远程亚马逊;

4)支持亚马逊网络数据分析服务Elastic MapReduce(EMR);

5)调试方便,无须任何环境支持。

安装Mrjob要求环境为Python 2.5及以上版本,源码下载地址:https://github.com/yelp/mrjob。

# pip install mrjob    #PyPI安装方式
# python setup.py install    #源码安装方式

回到实现一个统计文本文件(/home/test/hadoop/input.txt)中所有单词出现的词频功能,Mrjob通过mapper与reducer方法实现了MR操作,实现代码如下:

【/home/test/hadoop/word_count.py】

from mrjob.job import MRJob
class MRWordCounter(MRJob):
  def mapper(self, key, line):
     for word in line.split:
        yield word, 1
  def reducer(self, word, occurrences):
     yield word, sum(occurrences)
if __name__ == '__main__':
  MRWordCounter.run

可以看出代码行数只是原生Python的1/3,逻辑也比较清晰,代码中包含了mapper、reducer函数。mapper函数接收每一行的输入数据,处理后返回一对key:value,初始化value为数据1;reducer接收mapper输出的key-value对进行整合,把相同key的value作累加(sum)操作后输出。Mrjob利用Python的yield机制将函数变成一个Generators(生成器),通过不断调用next去实现key-value的初始化或运算操作。前面介绍Mrjob支持四种运行方式,包括内嵌(-r inline)、本地(-r local)、Hadoop(-r hadoop)、Amazon EMR(-r emr),下面主要介绍前三者的运行方式。

(1)内嵌(-r inline)方式

特点是调试方便,启动单一进程模拟任务执行状态及结果,默认(-r inline)可以省略,输出文件使用“>output-file”或“-o output-file”。下面两条命令是等价的:

# python word_count.py -r inline input.txt >output.txt
# python word_count.py input.txt –o output.txt

输出文件output.txt内容见图12-10。

图12-10 查看输出output.txt文件内容

(2)本地(-r local)方式

用于本地模拟Hadoop调试,与内嵌(inline)方式的区别是启动了多进程执行每一个任务,如:

# python word_count.py -r local input.txt >output.txt

执行的结果与inline一样,只是运行过程存在差异。

(3)Hadoop(-r hadoop)方式

用于Hadoop环境,支持Hadoop运行调度控制参数,如:

指定Hadoop任务调度优先级(VERY_HIGH|HIGH),如,--jobconf mapreduce.job.priority=VERY_HIGH。

Map及Reduce任务个数限制,如,--jobconf mapred.map.tasks=10--jobconf mapred.reduce.tasks=5。

注意,执行之前需要指定Hadoop环境变量,执行结果见图12-11。

访问http://192.168.1.20:50030/jobtracker.jsp,显示的最后一行便是任务执行的信息,从中可以看到任务的优先级、map及reduce的总数,如图12-12所示。

查看Hadoop分析结果文件,内容见图12-13。

Mrjob框架的介绍告一段落,下一节重点以实际案例进行说明。

图12-11 任务执行结果(部分截图)

图12-12 已完成任务清单(部分截图)

图12-13 查看任务结果文件内容

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文