Pig 通过带有导入模块的 python 脚本进行流式传输
使用 Pigtmp$ pig --version Apache Pig 版本 0.8.1-cdh3u1(重新导出) 编译于 2011 年 7 月 18 日,08:29:40
我有一个 python 脚本 (c-python),它导入另一个脚本,在我的示例中都非常简单:
DATA example$ hadoop fs -cat /user/pavel/trivial.log
1 one
2 two
3 three
不包含示例 - 工作正常
example$ pig -f trivial_stream.pig
(1,1,one)
()
(1,2,two)
()
(1,3,three)
()
其中 1) trivial_stream.pig:
DEFINE test_stream `test_stream.py` SHIP ('test_stream.py');
A = LOAD 'trivial.log' USING PigStorage('\t') AS (mynum: int, mynumstr: chararray);
C = STREAM A THROUGH test_stream;
DUMP C;
2) test_stream.py
#! /usr/bin/env python
import sys
import string
for line in sys.stdin:
if len(line) == 0: continue
new_line = line
print "%d\t%s" % (1, new_line)
所以本质上我只是用一个键聚合行,没什么特别的。
示例包括 - 炸弹! 现在我想从 python 导入模块附加一个字符串,该模块与 test_stream.py 位于同一目录中。我尝试以多种不同的方式发送导入模块,但得到相同的错误(见下文)
1)trivial_stream.pig:
DEFINE test_stream `test_stream.py` SHIP ('test_stream.py', 'test_import.py');
A = LOAD 'trivial.log' USING PigStorage('\t') AS (mynum: int, mynumstr: chararray);
C = STREAM A THROUGH test_stream;
DUMP C;
2)test_stream.py
#! /usr/bin/env python
import sys
import string
import test_import
for line in sys.stdin:
if len(line) == 0: continue
new_line = ("%s-%s") % (line.strip(), test_import.getTestLine())
print "%d\t%s" % (1, new_line)
3)test_import.py
def getTestLine():
return "test line";
现在
example$ pig -f trivial_stream.pig
后端错误消息
org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'test_stream.py ' failed with exit status: 1
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:265)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.cleanup(PigMapBase.java:103)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
at org.apache.hadoop.mapred.Child.main(Child.java:264)
Pig Stack Trace
ERROR 2997: Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'test_stream.py ' failed with exit status: 1
org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1066: Unable to open iterator for alias C. Backend error : Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'test_stream.py ' failed with exit status: 1
at org.apache.pig.PigServer.openIterator(PigServer.java:753)
at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:615)
at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:303)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:168)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:144)
at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:90)
at org.apache.pig.Main.run(Main.java:396)
at org.apache.pig.Main.main(Main.java:107)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 2997: Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'test_stream.py ' failed with exit status: 1
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher.getErrorMessages(Launcher.java:221)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher.getStats(Launcher.java:151)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher.launchPig(MapReduceLauncher.java:337)
at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.execute(HExecutionEngine.java:382)
at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1209)
at org.apache.pig.PigServer.storeEx(PigServer.java:885)
at org.apache.pig.PigServer.store(PigServer.java:827)
at org.apache.pig.PigServer.openIterator(PigServer.java:739)
... 7 more
非常感谢您的帮助! -帕维尔
Working with pigtmp$ pig --version
Apache Pig version 0.8.1-cdh3u1 (rexported)
compiled Jul 18 2011, 08:29:40
I have a python script (c-python), which imports another script, both very simple in my example:
DATA
example$ hadoop fs -cat /user/pavel/trivial.log
1 one
2 two
3 three
EXAMPLE WITHOUT INCLUDE - works fine
example$ pig -f trivial_stream.pig
(1,1,one)
()
(1,2,two)
()
(1,3,three)
()
where
1) trivial_stream.pig:
DEFINE test_stream `test_stream.py` SHIP ('test_stream.py');
A = LOAD 'trivial.log' USING PigStorage('\t') AS (mynum: int, mynumstr: chararray);
C = STREAM A THROUGH test_stream;
DUMP C;
2) test_stream.py
#! /usr/bin/env python
import sys
import string
for line in sys.stdin:
if len(line) == 0: continue
new_line = line
print "%d\t%s" % (1, new_line)
So essentially I just aggregate lines with one key, nothing special.
EXAMPLE WITH INCLUDE - bombs!
Now I'd like to append a string from a python import module which sits in the same directory as test_stream.py. I've tried to ship the import module in many different ways but get the same error (see below)
1) trivial_stream.pig:
DEFINE test_stream `test_stream.py` SHIP ('test_stream.py', 'test_import.py');
A = LOAD 'trivial.log' USING PigStorage('\t') AS (mynum: int, mynumstr: chararray);
C = STREAM A THROUGH test_stream;
DUMP C;
2) test_stream.py
#! /usr/bin/env python
import sys
import string
import test_import
for line in sys.stdin:
if len(line) == 0: continue
new_line = ("%s-%s") % (line.strip(), test_import.getTestLine())
print "%d\t%s" % (1, new_line)
3) test_import.py
def getTestLine():
return "test line";
Now
example$ pig -f trivial_stream.pig
Backend error message
org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'test_stream.py ' failed with exit status: 1
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:265)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.cleanup(PigMapBase.java:103)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
at org.apache.hadoop.mapred.Child.main(Child.java:264)
Pig Stack Trace
ERROR 2997: Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'test_stream.py ' failed with exit status: 1
org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1066: Unable to open iterator for alias C. Backend error : Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'test_stream.py ' failed with exit status: 1
at org.apache.pig.PigServer.openIterator(PigServer.java:753)
at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:615)
at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:303)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:168)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:144)
at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:90)
at org.apache.pig.Main.run(Main.java:396)
at org.apache.pig.Main.main(Main.java:107)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 2997: Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'test_stream.py ' failed with exit status: 1
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher.getErrorMessages(Launcher.java:221)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher.getStats(Launcher.java:151)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher.launchPig(MapReduceLauncher.java:337)
at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.execute(HExecutionEngine.java:382)
at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1209)
at org.apache.pig.PigServer.storeEx(PigServer.java:885)
at org.apache.pig.PigServer.store(PigServer.java:827)
at org.apache.pig.PigServer.openIterator(PigServer.java:739)
... 7 more
Thanks you much for your help!
-Pavel
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
上面评论的正确答案:
依赖项没有被发布,如果你希望你的Python应用程序与pig一起工作,你需要tar它(不要忘记init.py!),然后将.tar文件包含在pig的SHIP语句中。您要做的第一件事就是解压该应用程序。路径可能存在问题,因此我建议在 tar 提取之前进行以下操作:sys.path.insert(0, os.getcwd())。
Correct answer from comment above:
The dependencies aren't shipped, if you want your python app to work with pig you need to tar it (don't forget init.py's!), then include the .tar file in pig's SHIP statement. The first thing you do is untar the app. There might be issues with paths, so I'd suggest the following even before tar extraction: sys.path.insert(0, os.getcwd()).
您需要将当前目录附加到
test_stream.py
中的sys.path
中:因此,您在那里使用的 SHIP 命令确实会传送 python 脚本,但您只需要告诉Python 去哪里找。
You need to append the current directory to
sys.path
in yourtest_stream.py
:Thus the SHIP command you had there does ship the python script, but you just need to tell Python where to look.