- 本书赞誉
- 前言
- 第一部分 基础篇
- 第1章 系统基础信息模块详解
- 第2章 业务服务监控详解
- 第3章 定制业务质量报表详解
- 第4章 Python 与系统安全
- 第二部分 高级篇
- 第5章 系统批量运维管理器 pexpect 详解
- 第6章 系统批量运维管理器 paramiko 详解
- 第7章 系统批量运维管理器Fabric详解
- 第8章 从零开发一个轻量级 WebServer
- 第9章 集中化管理平台 Ansible 详解
- 第10章 集中化管理平台 Saltstack 详解
- 第11章 统一网络控制器 Func 详解
- 第12章 Python 大数据应用详解
- 第三部分 案例篇
- 第13章 从零开始打造 B/S 自动化运维平台
- 第14章 打造 Linux 系统安全审计功能
- 第15章 构建分布式质量监控平台
- 第16章 构建桌面版 C/S 自动化运维平台
12.3 使用 Python 编写 MapReduce
Map与Reduce为两个独立函数,为了加快各节点的处理速度,使用并行的计算方式,map运算的结果再由reduce继续进行合并。例如,要统计图书馆有多少本书籍,首先一人一排进行统计(map),其次将每个人的统计结果进行汇总(reduce),最终得出总数。Hadoop除了提供原生态的Java来编写MapReduce任务,还提供了其他语言操作的API——Hadoop Streaming,它通过使用标准的输入与输出来实现map与reduce之前传递数据,映射到Python中便是sys.stdin输入数据、sys.stdout输出数据。其他业务逻辑也直接在Python中编写。
下面实现一个统计文本文件(/home/test/hadoop/input.txt)中所有单词出现的词频功能,分别使用原生Python与框架方式来编写mapreduce。文本文件内容如下:
【/home/test/hadoop/input.txt】
foo foo quux labs foo bar quux abc bar see you by test welcome test abc labs foo me python hadoop ab ac bc bec python
12.3.1 用原生Python编写MapReduce详解
(1)编写Map代码
见下面的mapper.py代码,它会从标准输入(stdin)读取数据,默认以空格分割单词,然后按行输出单词及其词频到标准输出(stdout),不过整个Map处理过程并不会统计每个单词出现的总次数,而是直接输出“word 1”,以便作为Reduce的输入进行统计,要求mapper.py具备可执行权限,执行chmod+x/home/test/hadoop/mapper.py。
【/home/test/hadoop/mapper.py】
#!/usr/bin/env python import sys #输入为标准输入stdin; for line in sys.stdin: #删除开头和结尾的空格; line = line.strip #以默认空格分隔行单词到words列表; words = line.split for word in words: #输出所有单词,格式为“单词,1”以便作为Reduce的输入; print '%s\t%s' % (word, 1)
(2)编写Reduce代码
见下面的reducer.py代码,它会从标准输入(stdin)读取mapper.py的结果,然后统计每个单词出现的总次数并输出到标准输出(stdout),要求reducer.py同样具备可执行权限,执行chmod+x/home/test/hadoop/reducer.py。
【/home/test/hadoop/reducer.py】
#!/usr/bin/env python from operator import itemgetter import sys current_word = None current_count = 0 word = None # 获取标准输入,即mapper.py的输出; for line in sys.stdin: #删除开头和结尾的空格; line = line.strip # 解析mapper.py输出作为程序的输入,以tab作为分隔符; word, count = line.split('\t', 1) # 转换count从字符型成整型; try: count = int(count) except ValueError: # count非数字时,忽略此行; continue # 要求mapper.py的输出做排序(sort)操作,以便对连续的word做判断; if current_word == word: current_count += count else: if current_word: # 输出当前word统计结果到标准输出 print '%s\t%s' % (current_word, current_count) current_count = count current_word = word # 输出最后一个word统计 if current_word == word: print '%s\t%s' % (current_word, current_count)
(3)测试代码
我们可以在Hadoop平台运行之前在本地进行测试,校验mapper.py与reducer.py运行的结果是否正确,测试结果如图12-4所示。
测试reducer.py时需要对mapper.py的输出做排序(sort)操作,当然,Hadoop环境会自动实现排序,如图12-5所示。
(4)在Hadoop平台运行代码
首先在HDFS上创建文本文件存储目录,本示例中为/user/root/word,运行命令:
# /usr/local/hadoop-1.2.1/bin/hadoop dfs -mkdir /user/root/word
上传文件至HDFS,本示例中为/home/test/hadoop/input.txt,如果有多个文件,可采用以下方法进行操作,因为Hadoop分析目标默认针对目录,目录下的文件都在运算范围中。
# /usr/local/hadoop-1.2.1/bin/hadoop fs –put /home/test/hadoop/input.txt /user/root/word/ # /usr/local/hadoop-1.2.1/bin/hadoop dfs -ls /user/root/word/ Found 1 items -rw-r--r-- 2 root supergroup 118 2014-02-10 09:49 /user/root/word/input.txt
图12-4 mapper执行结果(部分截图)
图12-5 reducer执行结果
下一步便是关键的执行MapReduce任务了,输出结果文件指定/output/word,执行以下命令:
# /usr/local/hadoop-1.2.1/bin/hadoop jar /usr/local/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar -file ./mapper.py -mapper ./mapper.py -file ./reducer.py -reducer ./reducer.py -input /user/root/word -output /output/word
图12-6为返回的执行结果,可以看到map及reduce计算的百分比进度。
图12-6 执行MapReduce任务结果
访问http://192.168.1.20:50030/jobtracker.jsp,点击生成的Jobid,查看mapreduce job信息,如图12-7所示。
图12-7 Web查看mapreduce job信息(部分截图)
查看生成的分析结果文件清单,其中/output/word/part-00000为分析结果文件,如图12-8所示。
图12-8 任务输出文件清单
最后查看结果数据,图12-9显示了单词个数统计的结果,整个分析过程结束。
图12-9 查看结果文件part-00000内容
提示 HDFS常用操作命令有:
1)创建目录,示例:bin/hadoop dfs-mkdir/data/root/test。
2)列出目录清单,示例:bin/hadoop dfs-ls/data/root。
3)删除文件或目录,示例:bin/hadoop fs-rmr/data/root/test。
4)上传文件,示例:bin/hadoop fs-put/home/test/hadoop/*.txt/data/root/test。
5)查看文件内容,示例:bin/hadoop dfs-cat/output/word/part-00000。
12.3.2 用Mrjob框架编写MapReduce详解
Mrjob(http://pythonhosted.org/mrjob/index.html)是一个编写MapReduce任务的开源Python框架,它实际上对Hadoop Streaming的命令行进行了封装,因此接触不到Hadoop的数据流命令行,使我们可以更轻松、快速编写MapReduce任务。Mrjob具有如下特点。
1)代码简洁,map及reduce函数通过一个Python文件就可以搞定;
2)支持多步骤的MapReduce任务工作流;
3)支持多种运行方式,包括内嵌方式、本地环境、Hadoop、远程亚马逊;
4)支持亚马逊网络数据分析服务Elastic MapReduce(EMR);
5)调试方便,无须任何环境支持。
安装Mrjob要求环境为Python 2.5及以上版本,源码下载地址:https://github.com/yelp/mrjob。
# pip install mrjob #PyPI安装方式 # python setup.py install #源码安装方式
回到实现一个统计文本文件(/home/test/hadoop/input.txt)中所有单词出现的词频功能,Mrjob通过mapper与reducer方法实现了MR操作,实现代码如下:
【/home/test/hadoop/word_count.py】
from mrjob.job import MRJob class MRWordCounter(MRJob): def mapper(self, key, line): for word in line.split: yield word, 1 def reducer(self, word, occurrences): yield word, sum(occurrences) if __name__ == '__main__': MRWordCounter.run
可以看出代码行数只是原生Python的1/3,逻辑也比较清晰,代码中包含了mapper、reducer函数。mapper函数接收每一行的输入数据,处理后返回一对key:value,初始化value为数据1;reducer接收mapper输出的key-value对进行整合,把相同key的value作累加(sum)操作后输出。Mrjob利用Python的yield机制将函数变成一个Generators(生成器),通过不断调用next去实现key-value的初始化或运算操作。前面介绍Mrjob支持四种运行方式,包括内嵌(-r inline)、本地(-r local)、Hadoop(-r hadoop)、Amazon EMR(-r emr),下面主要介绍前三者的运行方式。
(1)内嵌(-r inline)方式
特点是调试方便,启动单一进程模拟任务执行状态及结果,默认(-r inline)可以省略,输出文件使用“>output-file”或“-o output-file”。下面两条命令是等价的:
# python word_count.py -r inline input.txt >output.txt # python word_count.py input.txt –o output.txt
输出文件output.txt内容见图12-10。
图12-10 查看输出output.txt文件内容
(2)本地(-r local)方式
用于本地模拟Hadoop调试,与内嵌(inline)方式的区别是启动了多进程执行每一个任务,如:
# python word_count.py -r local input.txt >output.txt
执行的结果与inline一样,只是运行过程存在差异。
(3)Hadoop(-r hadoop)方式
用于Hadoop环境,支持Hadoop运行调度控制参数,如:
指定Hadoop任务调度优先级(VERY_HIGH|HIGH),如,--jobconf mapreduce.job.priority=VERY_HIGH。
Map及Reduce任务个数限制,如,--jobconf mapred.map.tasks=10--jobconf mapred.reduce.tasks=5。
注意,执行之前需要指定Hadoop环境变量,执行结果见图12-11。
访问http://192.168.1.20:50030/jobtracker.jsp,显示的最后一行便是任务执行的信息,从中可以看到任务的优先级、map及reduce的总数,如图12-12所示。
查看Hadoop分析结果文件,内容见图12-13。
Mrjob框架的介绍告一段落,下一节重点以实际案例进行说明。
图12-11 任务执行结果(部分截图)
图12-12 已完成任务清单(部分截图)
图12-13 查看任务结果文件内容
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论