Spark 中的 group 和 group by

发布于 2024-07-21 09:52:13 字数 2747 浏览 11 评论 0

spark k-v pair 在 Python 里是一 tuple。

rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])

对这个 kv pair 常用操作是 groupByKeygroupBy

groupBy

# Return an RDD of grouped items.
rdd.groupBy(f, numPartitions=None, partitionFunc=<function portable_hash at 0x10335d2a8>)

元素 item 通过函数返回一个 key,key + item 组成 k-v,然后将 key 相同的 item 组成一组:

rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
In [23]: rdd.groupBy(lambda x: x % 2).collect()
# 这里 0 和 1 为通过函数 f 返回的 key
# key 对应的 value 是一个 ResultIterable 对象,里面包含相同 key 的 item
Out[23]:
[(0, <pyspark.resultiterable.ResultIterable at 0x10375c4d0>),
 (1, <pyspark.resultiterable.ResultIterable at 0x10375c990>)]

In [24]: for i, v in rdd.groupBy(lambda x: x % 2).collect():
    ...:     print(i)
    ...:     print(list(v))
    ...:
0
[2, 8]
1
[1, 1, 3, 5]

groupByKey

签名如下:

rdd.groupByKey(numPartitions=None, partitionFunc=<function portable_hash at 0x10335d2a8>)

"""
Docstring:
Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with numPartitions partitions.

.. note:: If you are grouping in order to perform an aggregation (such as a
    sum or average) over each key, using reduceByKey or aggregateByKey will
    provide much better performance.

"""

对 Key-Value 形式的 RDD 的操作。与 groupBy 类似。但是其分组所用的 key 不是由指定的函数生成的,而是采用元素本身中的 key。

In [25]: rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])

In [26]: rdd.groupByKey().collect()
Out[26]:
[('a', <pyspark.resultiterable.ResultIterable at 0x103751b50>),
 ('b', <pyspark.resultiterable.ResultIterable at 0x103787050>)]

In [27]: for i in rdd.groupByKey().collect():
    ...:     print(i[0], list(i[1]))
    ...:
('a', [1, 1])
('b', [1])

In [28]: rdd.groupByKey().mapValues(len).collect()
Out[28]: [('a', 2), ('b', 1)]

mapValues

一般有 groupByKey 就有 mapValues,因为 groupByKey 只是把相同 key 组合起来(list -> value),mapValues 就要对这些 k-v pair 的 value(list) 进行操作。

函数签名如下:

"""
Signature: rdd.mapValues(f)
Docstring:
Pass each value in the key-value pair RDD through a map function
without changing the keys; this also retains the original RDD's
partitioning.
"""

>>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
>>> def f(x): return len(x)
>>> x.mapValues(f).collect()
[('a', 3), ('b', 1)]

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

0 文章
0 评论
495 人气
更多

推荐作者

内心激荡

文章 0 评论 0

JSmiles

文章 0 评论 0

左秋

文章 0 评论 0

迪街小绵羊

文章 0 评论 0

瞳孔里扚悲伤

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文