按两个值对 rdd 进行排序并获取每组前 10 个

发布于 2025-01-12 13:34:19 字数 1296 浏览 0 评论 0原文

假设我在 pyspark 中有以下 RDD,其中每一行都是一个列表:

[foo, apple]
[foo, orange]
[foo, apple]
[foo, apple]
[foo, grape]
[foo, grape]
[foo, plum]
[bar, orange]
[bar, orange]
[bar, orange]
[bar, grape]
[bar, apple]
[bar, apple]
[bar, plum]
[scrog, apple]
[scrog, apple]
[scrog, orange]
[scrog, orange]
[scrog, grape]
[scrog, plum]

我想显示每个组(索引 0)的前 3 个水果(索引 1),按水果数量排序。假设为了简单起见,不太关心关系(例如,scroggrapeplum 计数为 1;不关心是哪个) 。

我的目标是输出如下:

foo, apple, 3
foo, grape, 2
foo, orange, 1
bar, orange, 3
bar, apple, 2
bar, plum, 1   # <------- NOTE: could also be "grape" of count 1
scrog, orange, 2  # <---------- NOTE: "scrog" has many ties, which is okay
scrog, apple, 2
scrog, grape, 1

我可以想到一种可能效率低下的方法:

  • 获取唯一的组和 .collect() 作为列表,
  • 进行计数和排序
  • 按组过滤完整的 rdd,对水果使用 像 zipWithIndex() 一样,获取前 3 个计数,
  • 保存为新 RDD,格式为 (,,)
  • 合并所有 RDD结束

但是我不仅对更多 Spark 特定方法感兴趣,而且对那些可能跳过诸如 collect()zipWithIndex() 等昂贵操作的方法感兴趣。

作为奖励——但不是必需的——如果我确实想应用排序/过滤来解决关系,那么这可能是最好的实现。

任何建议非常感谢!

更新:在我的上下文中,无法使用数据框;必须仅使用 RDD。

Suppose I have the following RDD in pyspark, where each row is a list:

[foo, apple]
[foo, orange]
[foo, apple]
[foo, apple]
[foo, grape]
[foo, grape]
[foo, plum]
[bar, orange]
[bar, orange]
[bar, orange]
[bar, grape]
[bar, apple]
[bar, apple]
[bar, plum]
[scrog, apple]
[scrog, apple]
[scrog, orange]
[scrog, orange]
[scrog, grape]
[scrog, plum]

I would like to show the top 3 fruit (index 1) for each group (index 0), ordered by the count of fruit. Suppose for the sake of simplicity, not caring much about ties (e.g. scrog has count 1 for grape and plum; don't care which).

My goal is output like:

foo, apple, 3
foo, grape, 2
foo, orange, 1
bar, orange, 3
bar, apple, 2
bar, plum, 1   # <------- NOTE: could also be "grape" of count 1
scrog, orange, 2  # <---------- NOTE: "scrog" has many ties, which is okay
scrog, apple, 2
scrog, grape, 1

I can think of a likely inefficient approach:

  • get unique groups and .collect() as list
  • filter full rdd by group, count and sort fruits
  • use something like zipWithIndex() to get top 3 counts
  • save as new RDD with format (<group>, <fruit>, <count>)
  • union all RDDs at end

But I'm interested in not only more spark specific approaches, but ones that might skip expensive actions like collect() and zipWithIndex().

As a bonus -- but not required -- if I did want to apply sorting/filtering to address ties, where that might best be accomplished.

Any advice much appreciated!

UPDATE: in my context, unable to use dataframes; must use RDDs only.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

深空失忆 2025-01-19 13:34:19

pyspark中的mapreduceByKey操作使用

.reduceByKey对计数求和,使用.groupByKey对组进行分组,选择每组前 3 个带有 .mapheapq.nlargest

rdd = sc.parallelize([
    ["foo", "apple"], ["foo", "orange"], ["foo", "apple"], ["foo", "apple"],
    ["foo", "grape"], ["foo", "grape"], ["foo", "plum"], ["bar", "orange"],
    ["bar", "orange"], ["bar", "orange"], ["bar", "grape"], ["bar", "apple"],
    ["bar", "apple"], ["bar", "plum"], ["scrog", "apple"], ["scrog", "apple"],
    ["scrog", "orange"], ["scrog", "orange"], ["scrog", "grape"], ["scrog", "plum"]
])

from operator import add
from heapq import nlargest

n = 3

results = rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add) \
             .map(lambda x: (x[0][0], (x[1], x[0][1]))).groupByKey() \
             .map(lambda x: (x[0], nlargest(n, x[1])))

print( results.collect() )
# [('bar', [(3, 'orange'), (2, 'apple'), (1, 'plum')]),
#  ('scrog', [(2, 'orange'), (2, 'apple'), (1, 'plum')]),
#  ('foo', [(3, 'apple'), (2, 'grape'), (1, 'plum')])]

标准 python

为了进行比较,如果你有一个简单的 python 列表而不是 rdd,那么在 python 中进行分组的最简单方法是使用字典:

data = [
    ["foo", "apple"], ["foo", "orange"], ["foo", "apple"], ["foo", "apple"],
    ["foo", "grape"], ["foo", "grape"], ["foo", "plum"], ["bar", "orange"],
    ["bar", "orange"], ["bar", "orange"], ["bar", "grape"], ["bar", "apple"],
    ["bar", "apple"], ["bar", "plum"], ["scrog", "apple"], ["scrog", "apple"],
    ["scrog", "orange"], ["scrog", "orange"], ["scrog", "grape"], ["scrog", "plum"]
]

from heapq import nlargest
from operator import itemgetter

d = {}
for k,v in data:
    d.setdefault(k, {})
    d[k][v] = d[k].get(v, 0) + 1
print(d)
# {'foo': {'apple': 3, 'orange': 1, 'grape': 2, 'plum': 1}, 'bar': {'orange': 3, 'grape': 1, 'apple': 2, 'plum': 1}, 'scrog': {'apple': 2, 'orange': 2, 'grape': 1, 'plum': 1}}

n = 3
results = [(k,v,c) for k,subd in d.items()
                   for v,c in nlargest(n, subd.items(), key=itemgetter(1))]
print(results)
# [('foo', 'apple', 3), ('foo', 'grape', 2), ('foo', 'orange', 1), ('bar', 'orange', 3), ('bar', 'apple', 2), ('bar', 'grape', 1), ('scrog', 'apple', 2), ('scrog', 'orange', 2), ('scrog', 'grape', 1)]

map and reduceByKey operations in pyspark

Sum the counts with .reduceByKey, group the groups with .groupByKey, select the top 3 of each group with .map and heapq.nlargest.

rdd = sc.parallelize([
    ["foo", "apple"], ["foo", "orange"], ["foo", "apple"], ["foo", "apple"],
    ["foo", "grape"], ["foo", "grape"], ["foo", "plum"], ["bar", "orange"],
    ["bar", "orange"], ["bar", "orange"], ["bar", "grape"], ["bar", "apple"],
    ["bar", "apple"], ["bar", "plum"], ["scrog", "apple"], ["scrog", "apple"],
    ["scrog", "orange"], ["scrog", "orange"], ["scrog", "grape"], ["scrog", "plum"]
])

from operator import add
from heapq import nlargest

n = 3

results = rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add) \
             .map(lambda x: (x[0][0], (x[1], x[0][1]))).groupByKey() \
             .map(lambda x: (x[0], nlargest(n, x[1])))

print( results.collect() )
# [('bar', [(3, 'orange'), (2, 'apple'), (1, 'plum')]),
#  ('scrog', [(2, 'orange'), (2, 'apple'), (1, 'plum')]),
#  ('foo', [(3, 'apple'), (2, 'grape'), (1, 'plum')])]

Standard python

For comparison, if you have a simple python list instead of an rdd, the easiest way to do grouping in python is with dictionaries:

data = [
    ["foo", "apple"], ["foo", "orange"], ["foo", "apple"], ["foo", "apple"],
    ["foo", "grape"], ["foo", "grape"], ["foo", "plum"], ["bar", "orange"],
    ["bar", "orange"], ["bar", "orange"], ["bar", "grape"], ["bar", "apple"],
    ["bar", "apple"], ["bar", "plum"], ["scrog", "apple"], ["scrog", "apple"],
    ["scrog", "orange"], ["scrog", "orange"], ["scrog", "grape"], ["scrog", "plum"]
]

from heapq import nlargest
from operator import itemgetter

d = {}
for k,v in data:
    d.setdefault(k, {})
    d[k][v] = d[k].get(v, 0) + 1
print(d)
# {'foo': {'apple': 3, 'orange': 1, 'grape': 2, 'plum': 1}, 'bar': {'orange': 3, 'grape': 1, 'apple': 2, 'plum': 1}, 'scrog': {'apple': 2, 'orange': 2, 'grape': 1, 'plum': 1}}

n = 3
results = [(k,v,c) for k,subd in d.items()
                   for v,c in nlargest(n, subd.items(), key=itemgetter(1))]
print(results)
# [('foo', 'apple', 3), ('foo', 'grape', 2), ('foo', 'orange', 1), ('bar', 'orange', 3), ('bar', 'apple', 2), ('bar', 'grape', 1), ('scrog', 'apple', 2), ('scrog', 'orange', 2), ('scrog', 'grape', 1)]
感情旳空白 2025-01-19 13:34:19
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
spark = (SparkSession.builder.appName("foo").getOrCreate())

initial_list = [["foo", "apple"], ["foo", "orange"],
            ["foo", "apple"], ["foo", "apple"],
            ["foo", "grape"], ["foo", "grape"],
            ["foo", "plum"], ["bar", "orange"],
            ["bar", "orange"], ["bar", "orange"],
            ["bar", "grape"], ["bar", "apple"],
            ["bar", "apple"], ["bar", "plum"],
            ["scrog", "apple"], ["scrog", "apple"],
            ["scrog", "orange"], ["scrog", "orange"],
            ["scrog", "grape"], ["scrog", "plum"]]
# creating rdd
rdd = spark.sparkContext.parallelize(initial_list)
# converting rdd to dataframe
df = rdd.toDF()

# group by index 0 and index 1 to get count of each
df2 = df.groupby(df._1, df._2).count()

window = Window.partitionBy(df2['_1']).orderBy(df2['count'].desc())
# picking only first 3 from decreasing order of count
df3 = df2.select('*',         rank().over(window).alias('rank')).filter(col('rank') <= 3)
# display the reqruired dataframe
df3.select('_1', '_2', 'count').orderBy('_1', col('count').desc()).show()
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
spark = (SparkSession.builder.appName("foo").getOrCreate())

initial_list = [["foo", "apple"], ["foo", "orange"],
            ["foo", "apple"], ["foo", "apple"],
            ["foo", "grape"], ["foo", "grape"],
            ["foo", "plum"], ["bar", "orange"],
            ["bar", "orange"], ["bar", "orange"],
            ["bar", "grape"], ["bar", "apple"],
            ["bar", "apple"], ["bar", "plum"],
            ["scrog", "apple"], ["scrog", "apple"],
            ["scrog", "orange"], ["scrog", "orange"],
            ["scrog", "grape"], ["scrog", "plum"]]
# creating rdd
rdd = spark.sparkContext.parallelize(initial_list)
# converting rdd to dataframe
df = rdd.toDF()

# group by index 0 and index 1 to get count of each
df2 = df.groupby(df._1, df._2).count()

window = Window.partitionBy(df2['_1']).orderBy(df2['count'].desc())
# picking only first 3 from decreasing order of count
df3 = df2.select('*',         rank().over(window).alias('rank')).filter(col('rank') <= 3)
# display the reqruired dataframe
df3.select('_1', '_2', 'count').orderBy('_1', col('count').desc()).show()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文