使用hadoop map-reduce计算值列表的中位数

发布于 2025-01-21 18:51:38 字数 2823 浏览 4 评论 0 原文

我是Hadoop Mrjob的新手。我有一个文本文件,其中包含每行数据“ id groupID值”。我正在尝试使用Hadoop Map-reduce计算文本文件中所有值的中位数。但是,当涉及到仅计算中位价值时,我一直困扰着。我得到的是每个ID的中间值:

"123213"        5.0
"123218"        2
"231532"        1
"234634"        7
"234654"        2
"345345"        9
"345445"        4.5
"345645"        2
"346324"        2
"436324"        6
"436456"        2
"674576"        10
"781623"        1.5

输出应该像“所有值的中值是:####”。我被本文 我的python文件中位数mrjob.py

from mrjob.job import MRJob
from mrjob.step import MRStep

class MRMedian(MRJob):
    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_stats, combiner=self.reducer_count_stats),
            MRStep(reducer=self.reducer_sort_by_values),
            MRStep(reducer=self.reducer_retrieve_median)
        ]

    def mapper_get_stats(self, _, line):
        line_arr = line.split(" ")
        values = int(float(line_arr[-1]))
        id = line_arr[0]
        yield id, values

    def reducer_count_stats(self, key, values):
        yield str(sum(values)).zfill(2), key

    def reducer_sort_by_values(self, values, ids):
        for id in ids:
            yield id, values

    def reducer_retrieve_median(self, id, values):
        valList=[]
        median = 0
        for val in values:
            valList.append(int(val))
        N = len(valList)
        #find the median
        if N % 2 == 0:
            #if N is even
            m1 = N / 2
            m2 = (N / 2) + 1
            #Convert to integer, match post
            m1 = int(m1) - 1
            m2 = int(m2) - 1
            median = (valList[m1] + valList[m2]) / 2 
        else:
            m = (N + 1) / 2
            # Convert to integer, match position
            m = int(m) - 1
            median = valList[m]
        yield (id, median)

if __name__ == '__main__':
   MRMedian.run()

我的原始文本文件约为100万和10亿个数据,但是我创建了一个具有任意数据的测试文件。它具有名称 input.txt

781623 2 2.3243
781623 1 1.1243
234654 1 2.122
123218 8 2.1245
436456 22 2.26346
436324 3 6.6667
346324 8 2.123
674576 1 10.1232
345345 1 9.56135
345645 7 2.1231
345445 10 6.1232
231532 1 1.1232
234634 6 7.124
345445 6 3.654376
123213 18 8.123
123213 2 2.1232

我关心的是值。考虑到这可能是重复的。我在终端中运行命令行以运行代码 python中间mrjob.py input.txt

更新:分配点不使用任何库,所以我需要手动对列表进行分类(或我所理解的一些),然后手动计算中位数(硬编码)。否则,使用MapReduce的目标就会消失。在此任务中不允许使用Pyspark。检查此链接以获取更多灵感计算中位数>

I'm new to Hadoop mrjob. I have a text file which consists of data "id groupId value" in each line. I am trying to calculate a median of all values in the text file using Hadoop map-reduce. But i'm stuck when it comes to calculate only the median value. What I get is a median value for each id like:

"123213"        5.0
"123218"        2
"231532"        1
"234634"        7
"234654"        2
"345345"        9
"345445"        4.5
"345645"        2
"346324"        2
"436324"        6
"436456"        2
"674576"        10
"781623"        1.5

The output should be like "median value of all values is: ####". I got influnced by this article https://computehustle.com/2019/09/02/getting-started-with-mapreduce-in-python/
My python file median-mrjob.py :

from mrjob.job import MRJob
from mrjob.step import MRStep

class MRMedian(MRJob):
    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_stats, combiner=self.reducer_count_stats),
            MRStep(reducer=self.reducer_sort_by_values),
            MRStep(reducer=self.reducer_retrieve_median)
        ]

    def mapper_get_stats(self, _, line):
        line_arr = line.split(" ")
        values = int(float(line_arr[-1]))
        id = line_arr[0]
        yield id, values

    def reducer_count_stats(self, key, values):
        yield str(sum(values)).zfill(2), key

    def reducer_sort_by_values(self, values, ids):
        for id in ids:
            yield id, values

    def reducer_retrieve_median(self, id, values):
        valList=[]
        median = 0
        for val in values:
            valList.append(int(val))
        N = len(valList)
        #find the median
        if N % 2 == 0:
            #if N is even
            m1 = N / 2
            m2 = (N / 2) + 1
            #Convert to integer, match post
            m1 = int(m1) - 1
            m2 = int(m2) - 1
            median = (valList[m1] + valList[m2]) / 2 
        else:
            m = (N + 1) / 2
            # Convert to integer, match position
            m = int(m) - 1
            median = valList[m]
        yield (id, median)

if __name__ == '__main__':
   MRMedian.run()

My original text files is about 1million and 1billion line of data, but I have created a test file which has arbitrary data. It has the name input.txt :

781623 2 2.3243
781623 1 1.1243
234654 1 2.122
123218 8 2.1245
436456 22 2.26346
436324 3 6.6667
346324 8 2.123
674576 1 10.1232
345345 1 9.56135
345645 7 2.1231
345445 10 6.1232
231532 1 1.1232
234634 6 7.124
345445 6 3.654376
123213 18 8.123
123213 2 2.1232

What I care about is the values. Considering that might be duplicates. I run the command line in the terminal to run the code python median-mrjob.py input.txt

Update: The point of the assignment is not to use any libraries, so I need to sort the list manually(or maybe some of it as I understood) and calculate the median manually(hardcoding). Otherwise the goal of using MapReduce disappears. Using PySpark is not allowed in this assignment. Check this link for more inspiration Computing median in map reduce

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

ˇ宁静的妩媚 2025-01-28 18:51:38

输出应该像“所有值的中值是:####”

,然后您需要先将所有数据强加到一个还原器(有效地击败使用MapReduce的目的)。

您可以通过不使用ID作为密钥而

def mapper_get_stats(self, _, line):
    line_arr = line.split()
    if line_arr:  # prevent empty lines
        value = float(line_arr[-1])
        yield None, value

在此之后丢弃它来做到这一点,然后找到中间(我修复了您的参数顺序),

def reducer_retrieve_median(self, key, values):
    import statistics
    yield None, f"median value of all values is: {statistics.median(values)}"  # automatically sorts the data

两个步骤

class MRMedian(MRJob):
    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_stats),
            MRStep(reducer=self.reducer_retrieve_median)
        ]

因此,仅针对给定文件的

null    "median value of all values is: 2.2938799999999997"

,您应该看到

原始文本文件约为100万,数据

线

这并不重要,而是哪个?

您应该首先将文件上传到HDFS,然后可以使用比MRJOB更好的工具,例如Hive或Pig。

The output should be like "median value of all values is: ####"

Then you need to force all data to one reducer first (effectively defeating the purpose of using MapReduce).

You'd do that by not using the ID as the key and discarding it

def mapper_get_stats(self, _, line):
    line_arr = line.split()
    if line_arr:  # prevent empty lines
        value = float(line_arr[-1])
        yield None, value

After that, sort and find the median (I fixed your parameter order)

def reducer_retrieve_median(self, key, values):
    import statistics
    yield None, f"median value of all values is: {statistics.median(values)}"  # automatically sorts the data

So, only two steps

class MRMedian(MRJob):
    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_stats),
            MRStep(reducer=self.reducer_retrieve_median)
        ]

For the given file, you should see

null    "median value of all values is: 2.2938799999999997"

original text files is about 1million and 1billion line of data

Not that it matters, but which is it?

You should upload the file to HDFS first, then you can use better tools than MrJob for this like Hive or Pig.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文