如何使用 PIG 加载文件夹中的每个文件?

发布于 2024-12-03 14:05:49 字数 443 浏览 1 评论 0原文

我有一个每天创建的文件文件夹,所有文件都存储相同类型的信息。我想制作一个脚本来加载最新的 10 个,将它们联合起来,然后在它们上运行一些其他代码。由于 Pig 已经有一个 ls 方法,我想知道是否有一种简单的方法可以获取最后 10 个创建的文件,并使用相同的加载器和选项以通用名称加载它们。我猜它看起来会是这样的:

REGISTER /usr/local/lib/hadoop/hadoop-lzo-0.4.13.jar;
REGISTER /usr/local/lib/hadoop/elephant-bird-2.0.5.jar;
FOREACH file in some_path:
    file = LOAD 'file' 
    USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t') 
    AS (i1, i2, i3);

I have a folder of files created daily that all store the same type of information. I'd like to make a script that loads the newest 10 of them, UNIONs them, and then runs some other code on them. Since pig already has an ls method, I was wondering if there was a simple way for me to get the last 10 created files, and load them all under generic names using the same loader and options. I'm guessing it would look something like:

REGISTER /usr/local/lib/hadoop/hadoop-lzo-0.4.13.jar;
REGISTER /usr/local/lib/hadoop/elephant-bird-2.0.5.jar;
FOREACH file in some_path:
    file = LOAD 'file' 
    USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t') 
    AS (i1, i2, i3);

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

○闲身 2024-12-10 14:05:49

这不是我能够立即完成的事情,而是可以使用某种包装脚本或帮助程序脚本(bash、perl 等)在脚本之外完成的事情。如果您编写一个名为 last10.sh 的脚本,它将输出最后 10 个文件,以逗号分隔:

$ ./last10.sh
/input/file38,/input/file39,...,/input/file48

对于最近 10 个文件,类似这样的操作应该可以解决问题:

hadoop fs -ls /input/ | sort -k6,7 | tail -n10 | awk '{print $8}' | tr '\n' ','

您可以这样做:

$ pig -p files="`last10.sh`" my_mr.pig

然后,在你的 Pig 脚本,这样做:

data = LOAD '$files'
       USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t')
       AS (i1, i2, i3);

Pig 加载单独的文件,如果它们像这样用逗号分隔。这相当于做:

data = LOAD '/input/file38,/input/file39,...,/input/file48'
       USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t')
       AS (i1, i2, i3);

This is not something I've been able to do out of the box, and is something that can be done outside of the script with some sort of wrapper script or helper script (bash, perl, etc.). If you write a script, called last10.sh, that would output your last 10 files, comma separated:

$ ./last10.sh
/input/file38,/input/file39,...,/input/file48

Something like this should do the trick for the most recent 10 files:

hadoop fs -ls /input/ | sort -k6,7 | tail -n10 | awk '{print $8}' | tr '\n' ','

you could do:

$ pig -p files="`last10.sh`" my_mr.pig

Then, in your pig script, do:

data = LOAD '$files'
       USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t')
       AS (i1, i2, i3);

Pig loads up the separate files if they are comma separated like this. This would be equivalent to doing:

data = LOAD '/input/file38,/input/file39,...,/input/file48'
       USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t')
       AS (i1, i2, i3);
静若繁花 2024-12-10 14:05:49

Donald Miner 的答案仍然非常有效,但 IMO 现在有更好的方法,使用 Python 中的嵌入式 Pig。 O'Reilly有一个简短的解释在这里这里还有一个演示,介绍了为什么这是您想要做的事情,以及它是如何工作的。长话短说,在运行 Pig 脚本以确定脚本的各个部分之前,最好能够访问很多功能。通过在 Jython 中包装和/或动态生成脚本的一部分,您可以做到这一点。麾!

Donald Miner's answer still works perfectly well, but IMO there's a better approach to this now using Embedded Pig in Python. O'Reilly has a brief explanation here. There's also a presentation on why this is something you'd want to do, and how it works here. Long story short, there's a lot of functionality it would be nice to have access to before running a pig script to determine parts of the script. Wrapping and/or dynamically generating parts of the script in Jython let's you do that. Rejoice!

悸初 2024-12-10 14:05:49

我喜欢以上两种方法。只是想为 oozie 爱好者多一种选择。 oozie 中的 Java 操作在由“oozie.action.output.properties”配置的位置吐出一个文件,Pig 操作将其传递给 Pig 脚本。与上面的 2 相比,这绝对不是一个优雅的解决方案。我在 oozie 中使用 java 调度配置嵌入式 Pig 时遇到了麻烦,所以我不得不采用这个解决方案。

<workflow-app xmlns='uri:oozie:workflow:0.1' name='java-wf'>
<start to='java1' />

<action name='java1'>
    <java>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <configuration>
           <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <main-class>org.apache.oozie.test.MyTest</main-class>
        <arg>${outputFileName}</arg>
        <capture-output/>
    </java>
    <ok to="pig1" />
    <error to="fail" />
</action>


<action name='pig1'>
    <pig>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <configuration>
            <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <script>script.pig</script>
        <param>MY_VAR=${wf:actionData('java1')['PASS_ME']}</param>
    </pig>
    <ok to="end" />
    <error to="fail" />
</action>

<kill name="fail">
    <message>Pig failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />

I like above 2 approaches. Just wanted to give one more option for oozie enthusiasts. Java action in oozie spits out a file in location configured by "oozie.action.output.properties" and Pig action takes it that passes to pig script. This is definitely not elegant solution compared to above 2. I have had trouble configuring embedded pig using java schedule in oozie so I had to go with this solution.

<workflow-app xmlns='uri:oozie:workflow:0.1' name='java-wf'>
<start to='java1' />

<action name='java1'>
    <java>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <configuration>
           <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <main-class>org.apache.oozie.test.MyTest</main-class>
        <arg>${outputFileName}</arg>
        <capture-output/>
    </java>
    <ok to="pig1" />
    <error to="fail" />
</action>


<action name='pig1'>
    <pig>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <configuration>
            <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <script>script.pig</script>
        <param>MY_VAR=${wf:actionData('java1')['PASS_ME']}</param>
    </pig>
    <ok to="end" />
    <error to="fail" />
</action>

<kill name="fail">
    <message>Pig failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文