在Python中使用Hadoop处理大型csv文件

发布于 2025-01-05 02:36:11 字数 1189 浏览 1 评论 0原文

我有一个巨大的 CSV 文件,想在 Amazon EMR (python) 上使用 Hadoop MapReduce 进行处理。

该文件有 7 个字段,但是,我只查看日期数量字段。

 "date" "receiptId" "productId" "quantity"  "price" "posId" "cashierId"

首先,我的mapper.py

import sys

def main(argv):
    line = sys.stdin.readline()
    try:
        while line:
            list = line.split('\t')

            #If date meets criteria, add quantity to express key
                if int(list[0][11:13])>=17 and int(list[0][11:13])<=19:
                    print '%s\t%s' % ("Express", int(list[3]))
            #Else, add quantity to non-express key
                else:
                    print '%s\t%s' % ("Non-express", int(list[3]))

            line =  sys.stdin.readline()
except "end of file":
        return None
if __name__ == "__main__":
        main(sys.argv)

对于reducer,我将使用流命令:aggregate。

问题:

  1. 我的代码正确吗?我在 Amazon EMR 中运行它,但输出为空。

  2. 所以我的最终结果应该是:快递,XXX 和非快递,YYY。我可以让它在返回结果之前进行除法运算吗?只是 XXX/YYY 的结果。我应该把这段代码放在哪里?减速器??

  3. 此外,这是一个巨大的 CSV 文件,因此映射会将其分成几个分区吗?或者我需要显式调用 FileSplit 吗?如果是这样,我该怎么做?

I have a huge CSV file I would like to process using Hadoop MapReduce on Amazon EMR (python).

The file has 7 fields, however, I am only looking at the date and quantity field.

 "date" "receiptId" "productId" "quantity"  "price" "posId" "cashierId"

Firstly, my mapper.py

import sys

def main(argv):
    line = sys.stdin.readline()
    try:
        while line:
            list = line.split('\t')

            #If date meets criteria, add quantity to express key
                if int(list[0][11:13])>=17 and int(list[0][11:13])<=19:
                    print '%s\t%s' % ("Express", int(list[3]))
            #Else, add quantity to non-express key
                else:
                    print '%s\t%s' % ("Non-express", int(list[3]))

            line =  sys.stdin.readline()
except "end of file":
        return None
if __name__ == "__main__":
        main(sys.argv)

For the reducer, I will be using the streaming command: aggregate.

Question:

  1. Is my code right? I ran it in Amazon EMR but i got an empty output.

  2. So my end result should be: express, XXX and non-express, YYY. Can I have it do a divide operation before returning the result? Just the result of XXX/YYY. Where should i put this code? A reducer??

  3. Also, this is a huge CSV file, so will mapping break it up into a few partitions? Or do I need to explicitly call a FileSplit? If so, how do I do that?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

长亭外,古道边 2025-01-12 02:36:11

在这里回答我自己的问题!

  1. 代码错误。如果您使用聚合库来减少,您的输出不会遵循通常的键值对。它需要一个“前缀”。

    if int(list[0][11:13])>=17 且 int(list[0][11:13])<=19:
        #这才是聚合库正确的打印方式
        #将所有内容打印为字符串。
        print "LongValueSum:" + "Express" + "\t" + list[3]
    

    其他可用的“前缀”有:DoubleValueSum、LongValueMax、LongValueMin、StringValueMax、StringValueMin、UniqValueCount、ValueHistogram。有关更多信息,请查看此处 http://hadoop.apache.org/common/docs/r0.15.2/api/org/apache/hadoop/mapred/lib/aggregate/package-summary.html

  2. 是的,如果您想做的不仅仅是基本总和、最小值、最大值或计数,您需要编写自己的减速器。

    是的

  3. 我还没有答案。

Answering my own question here!

  1. The code is wrong. If you're using aggregate library to reduce, your output does not follow the usual key value pair. It requires a "prefix".

    if int(list[0][11:13])>=17 and int(list[0][11:13])<=19:
        #This is the correct way of printing for aggregate library
        #Print all as a string.
        print  "LongValueSum:" + "Express" + "\t" + list[3]
    

    The other "prefixes" available are: DoubleValueSum, LongValueMax, LongValueMin, StringValueMax, StringValueMin, UniqValueCount, ValueHistogram. For more info, look here http://hadoop.apache.org/common/docs/r0.15.2/api/org/apache/hadoop/mapred/lib/aggregate/package-summary.html.

  2. Yes, if you want to do more than just the basic sum, min, max or count, you need to write your own reducer.

  3. I do not yet have the answer.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文