Hadoop分布式缓存(Cloudera CH3)
我正在尝试使用二进制可执行文件运行一个简单的示例 缓存的存档,它似乎不起作用:
我尝试运行的示例有一个映射器,它生成三个 随机双打和一个密钥,减速器将对这三个进行平均 数字在一起并记录平均值。非常简单的东西。我写了一个 c 中的简单 EXE 确实会生成随机数:
#include <cstdio>
#include <stdlib.h>
#include <time.h>
int main(int argc, char*argv[]) {
srand ( time(NULL) );
int rand1 = rand() % 10 + 1;
int rand2 = rand() % 10 + 1;
int rand3 = rand() % 10 + 1;
printf("%s, %f, %f, %f", argv[1], (float)rand1/5, (float)rand2/5, (float)rand3/5);
return 0;
}
所以如果我调用 ./a.out [key]
我会看到
key、random1、random2、random3
我使用 python 流,这是我用 python 编写的映射器:
#!/usr/bin/python
import os
import sys
import random
import shlex, subprocess
from subprocess import PIPE
from misc import *
for line in sys.stdin:
line = line.strip()
sline = line.split(',')
# parse out the au and cost from the line ....
key = int( sline[0] )
au = int( sline[1])
cost = float( sline[2] )
command = "./a.out %d" % ( key )
cli_parts = shlex.split(command)
mp = subprocess.Popen(cli_parts, stdin=PIPE, stderr=PIPE,
stdout=PIPE)
print mp.communicate()[0].strip('\r\n')
这是减速器这只会进行平均:
#!/usr/bin/python
import os
import sys
import math
import re
from misc import *
for line in sys.stdin:
line = line.strip()
m = re.match("(\w+),\s+(.*),\s+(.*),\s+(.*)", line)
if m:
average = (float(m.groups(0)[1]) + float(m.groups(0)[2]) +
float(m.groups(0)[3])) / 3
print "key: %d average: %f" % ( int(m.groups(0)[0]), average )
else:
print "not found"
#f.close()
所以在阅读文档后,似乎我需要编译 二进制文件和 tar.gz-it
1) tar cvaf a.out.tar.gz a.out
现在我应该能够通过 - 将其传递给数据节点 cacheArchive 参数和一切都应该正常工作。这是我的 Hadoop命令:
hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop- 流媒体-0.20.2+737.jar \ -numReduceTasks 1 \ -映射器mapper1.py \ -文件mapper1.py \ -减速器reducer1.py \ -文件reducer1.py \ -文件misc.py\ -cacheArchive a.out.tar.gz \ -输入输入/* \ -输出testsvm输出\ -verbose
不用说,这不起作用,似乎是因为 映射器不生成数据。
我通过在命令行上测试它来确认我的代码是否有效:
cat input/svminput1.txt | python 映射器1.py |排序| Python reducer1.py
我希望有人能解释为什么这不起作用,如何 通过cacheArchive命令传递exe可以在数据节点上工作, 和/或如何调试它,因为错误消息来自 Cloudera html 面板没有那么有用。
谢谢
这是我看到的错误:
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:317)
at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
at org.apache.hadoop.mapred.Child.main(Child.java:211)
I am trying to run a simple example using a binary executable and the
cached archive and it does not seem to be working:
The example I am trying to run has a mapper which generates three
random doubles and a key and the reducer will average those three
numbers together and log the average. Very simple stuff. I wrote a
simple EXE in c do generate the random numbers:
#include <cstdio>
#include <stdlib.h>
#include <time.h>
int main(int argc, char*argv[]) {
srand ( time(NULL) );
int rand1 = rand() % 10 + 1;
int rand2 = rand() % 10 + 1;
int rand3 = rand() % 10 + 1;
printf("%s, %f, %f, %f", argv[1], (float)rand1/5, (float)rand2/5, (float)rand3/5);
return 0;
}
so if i call ./a.out [key]
i will see
key, random1, random2, random3
im using python streaming, and here is my mapper written in python:
#!/usr/bin/python
import os
import sys
import random
import shlex, subprocess
from subprocess import PIPE
from misc import *
for line in sys.stdin:
line = line.strip()
sline = line.split(',')
# parse out the au and cost from the line ....
key = int( sline[0] )
au = int( sline[1])
cost = float( sline[2] )
command = "./a.out %d" % ( key )
cli_parts = shlex.split(command)
mp = subprocess.Popen(cli_parts, stdin=PIPE, stderr=PIPE,
stdout=PIPE)
print mp.communicate()[0].strip('\r\n')
here is the reducer that will just do the averaging:
#!/usr/bin/python
import os
import sys
import math
import re
from misc import *
for line in sys.stdin:
line = line.strip()
m = re.match("(\w+),\s+(.*),\s+(.*),\s+(.*)", line)
if m:
average = (float(m.groups(0)[1]) + float(m.groups(0)[2]) +
float(m.groups(0)[3])) / 3
print "key: %d average: %f" % ( int(m.groups(0)[0]), average )
else:
print "not found"
#f.close()
so after reading the documentation, it seems like i need to compile
the binary and the tar.gz-it
1) tar cvaf a.out.tar.gz a.out
now I should be able to pass this to the datanodes via the -
cacheArchive parameter and everything should work fine. Here is my
Hadoop command:
hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-
streaming-0.20.2+737.jar \
-numReduceTasks 1 \
-mapper mapper1.py \
-file mapper1.py \
-reducer reducer1.py \
-file reducer1.py \
-file misc.py \
-cacheArchive a.out.tar.gz \
-input input/* \
-output testsvmoutput \
-verbose
Needless to say, this does not work, and it seems its because the
mapper is not generating data.
I confirmed my code works by testing it on the command line:
cat input/svminput1.txt | python mapper1.py | sort | python
reducer1.py
I would love for someone to explain why this is not working, how
passing an exe via the cacheArchive command works on the datanodes,
and/or how to debug this because the error messages coming out of the
Cloudera html panel are not that helpful.
Thanks
Here is the error I am seeing:
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:317)
at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
at org.apache.hadoop.mapred.Child.main(Child.java:211)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我看到你做错了一些事情。您想要设置 python 脚本 chmod a+x 并像这样进行测试 cat input/svminput1.txt | ./mapper1.py |排序| 即启动脚本(操作系统使用正确的解释器处理执行脚本)。
./reducer1.py 因为这基本上是 Hadoop 在流处理中所做的事情, 您只需通过命令行 -file无论您想要什么(就像您使用misc.py一样)将它们添加到reducer中,并且当您的map/reduce启动时,这些文件是本地“。”到您的脚本中,因此导入并使用它们或您想要的任何内容(打开一个文本文件,无论您想要什么)...您应该使用 chacheArchive 内容执行此操作,也只需将它们分别推送为 -file 就可以了。
这是关于 python 流的非常基本的文章 http ://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ 如果你还没有看过
,这是一个更高级的带有连接和键的Python流< a href="http://allthingshadoop.com/2010/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/" rel="nofollow">http://allthingshadoop .com/2010/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/ 这也可能有帮助。
希望这会有所帮助,如果我认为不再需要执行特定错误的话
I see a few things you are doing wrong. You want to set your python scripts chmod a+x and test like this cat input/svminput1.txt | ./mapper1.py | sort | ./reducer1.py because that is basically what Hadoop does in streaming is launch the script (the OS handles executing the script with the right interpreter)
Now for the other files moving into the job for use with your mapper & reducer you just add them in through the command line -file whateveryouwant (like you have with misc.py) and when your map/reduce launches those files are local "." to your script so import and use them or whatever you want (open a text file, whatever you want)... you should do this with the chacheArchive stuff also just push them each as -file should be fine.
Here is a very basic writeup to python streaming http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ if you have not seen already
and this a little more advanced python streaming with joins and keys http://allthingshadoop.com/2010/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/ that might be helpful also.
Hope this helps if not specific errors would be needed to-do anymore I think
您确定集群计算机上的
/usr/bin/python
中可以使用 python 吗?一个好的做法是始终在脚本顶部使用#!/usr/bin/env python
...这样它就不会被硬编码。另外,请务必检查集群计算机上的 python 安装...确保导入有效。
您没有在代码中使用任何 try/excepts,因此很难调试问题所在...我建议尝试/例外您的代码并将日志消息打印到众所周知的位置,例如
/ tmp
....有关更多信息,您可以查看
davidvhill.com/articles
....我的实际生产代码在这里捕获...Are you sure python is available at
/usr/bin/python
on the cluster machines? A good practice is to always use#!/usr/bin/env python
at the top of your scripts... that way it's not hardcoded.Also be sure to check the python installation on your cluster machines... make sure the imports work.
You aren't using any try/excepts in your code so it will be very difficult to debug what the problem is... I suggest try/excepting your code and printing log messages to a well-known location such as
/tmp
....For more you can check out
davidvhill.com/articles
.... my actual production code is captured here....