Hadoop 操作顺序

发布于 2024-11-28 10:40:17 字数 2605 浏览 6 评论 0原文

根据yahoo的hadoop教程上找到的附图,操作顺序是map>结合>分区,后面应该是reduce

这是我的一个由映射操作发出的示例键

LongValueSum:geo_US|1311722400|E        1

假设有100个相同类型的键,这应该组合为

geo_US|1311722400|E     100

然后我想按第一个管道之前的值对键进行分区( |) http://hadoop.apache.org/common/docs/r0.20.2/streaming.html#A+Useful+Partitioner+Class+%28secondary+sort%2C+the+-partitioner+org.apache。 hadoop.mapred.lib.KeyFieldBasedPartitioner+option%29

geo_US

所以这是我的流命令

hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-streaming-0.20.203.0.jar \
-D mapred.reduce.tasks=8 \
-D stream.num.map.output.key.fields=1 \
-D mapred.text.key.partitioner.options=-k1,1 \
-D stream.map.output.field.separator=\| \
-file mapper.py \
-mapper mapper.py \
-file reducer.py \
-reducer reducer.py \
-combiner org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input input_file \
-output output_path

这是我得到的错误

java.lang.NumberFormatException: For input string: "1311722400|E    1"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
at java.lang.Long.parseLong(Long.java:419)
at java.lang.Long.parseLong(Long.java:468)
at org.apache.hadoop.mapred.lib.aggregate.LongValueSum.addNextValue(LongValueSum.java:48)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:59)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:35)
at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1349)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1435)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1297)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:371)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)

我看起来像 partitioner 是在组合器之前运行。有什么想法吗?

According to the attached image found on yahoo's hadoop tutorial, the order of operations is map > combine > partition which should be followed by reduce

Here is my an example key emmited by the map operation

LongValueSum:geo_US|1311722400|E        1

Assuming there are 100 keys of the same type, this should get combined as

geo_US|1311722400|E     100

Then i'd like to partition the keys by the value before the first pipe(|)
http://hadoop.apache.org/common/docs/r0.20.2/streaming.html#A+Useful+Partitioner+Class+%28secondary+sort%2C+the+-partitioner+org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner+option%29

geo_US

so here's my streaming command

hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-streaming-0.20.203.0.jar \
-D mapred.reduce.tasks=8 \
-D stream.num.map.output.key.fields=1 \
-D mapred.text.key.partitioner.options=-k1,1 \
-D stream.map.output.field.separator=\| \
-file mapper.py \
-mapper mapper.py \
-file reducer.py \
-reducer reducer.py \
-combiner org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input input_file \
-output output_path

This is the error I get

java.lang.NumberFormatException: For input string: "1311722400|E    1"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
at java.lang.Long.parseLong(Long.java:419)
at java.lang.Long.parseLong(Long.java:468)
at org.apache.hadoop.mapred.lib.aggregate.LongValueSum.addNextValue(LongValueSum.java:48)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:59)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:35)
at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1349)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1435)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1297)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:371)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)

I looks like the partitioner is running before the combiner. Any thoughts?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

梦途 2024-12-05 10:40:17

不保证组合器将在 hadoop 版本 > 上实际运行。 0.16。
在 hadoop 17 中,如果单个 占用整个排序缓冲区,则组合器不会运行。在版本> 0.18 中,combiner 可以在 Map 和 Reduce 阶段运行多次。

基本上,您的算法不应该依赖于是否调用合并函数,因为它只是一种优化。有关更多信息,请参阅Haddop,权威指南一书。谷歌图书 此处

There is no guarantee that the Combiner will be run actually for hadoop versions > 0.16.
In hadoop 17, the combiner is not run if a single <K,V> occupies the entire sort buffer. in Versions > 0.18, the combiner can be run multiple times both in the map and reduce phases.

Basically yours algorithms should not be dependent on whether the Combine function is called, since its meant to be just an optimization. For more information check out the book Haddop, A definitive guide.. found the snippet that talks about Combine functions on google books here

秋叶绚丽 2024-12-05 10:40:17

我查了《Hadoop权威指南》第6章Shuffle和Sort。映射输出首先缓冲在内存中。当内存超过其阈值时,映射输出将写入磁盘。在写入磁盘之前,数据将被分区。在每个分区内,数据将按键排序。之后如果有combiner函数,则合并排序输出。

磁盘上可能有很多溢出文件,如果至少有 3 个溢出文件,则组合器将在输出写入磁盘之前再次运行。

最后,所有溢出文件将合并为一个文件,以减少IO次数。

简而言之,
对于映射器:map -->分区-->排序--->组合器

和减速器:复制形式映射器 --> merge(如果存在则调用组合器)->减少

I have checked the "Hadoop Definitive Guide" Chapter 6 Shuffle and Sort. Map output is bufferd in memory first. When the memory exceeds its threshold, map output will be written to disk. Before it writes to disk, data will be partitioned. Within each partition, data will be sorted by key. After that if there is combiner function, combine the sort output.

There may be many spill files on disk, if there at least 3 spill files, the combiner will be run again before the output is written to disk.

At last, all spill files will be merged into one file to reduce number of IO.

In short,
for mapper: map --> partition --> sort ---> combiner

and for reduer: copy form mapper --> merge (combiner called if exists) -> reduce

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文