Spark 中的 group 和 group by
spark k-v pair 在 Python 里是一 tuple。
rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
对这个 kv pair 常用操作是 groupByKey
和 groupBy
。
groupBy
# Return an RDD of grouped items.
rdd.groupBy(f, numPartitions=None, partitionFunc=<function portable_hash at 0x10335d2a8>)
元素 item 通过函数返回一个 key,key + item 组成 k-v,然后将 key 相同的 item 组成一组:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
In [23]: rdd.groupBy(lambda x: x % 2).collect()
# 这里 0 和 1 为通过函数 f 返回的 key
# key 对应的 value 是一个 ResultIterable 对象,里面包含相同 key 的 item
Out[23]:
[(0, <pyspark.resultiterable.ResultIterable at 0x10375c4d0>),
(1, <pyspark.resultiterable.ResultIterable at 0x10375c990>)]
In [24]: for i, v in rdd.groupBy(lambda x: x % 2).collect():
...: print(i)
...: print(list(v))
...:
0
[2, 8]
1
[1, 1, 3, 5]
groupByKey
签名如下:
rdd.groupByKey(numPartitions=None, partitionFunc=<function portable_hash at 0x10335d2a8>)
"""
Docstring:
Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with numPartitions partitions.
.. note:: If you are grouping in order to perform an aggregation (such as a
sum or average) over each key, using reduceByKey or aggregateByKey will
provide much better performance.
"""
对 Key-Value 形式的 RDD 的操作。与 groupBy 类似。但是其分组所用的 key 不是由指定的函数生成的,而是采用元素本身中的 key。
In [25]: rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
In [26]: rdd.groupByKey().collect()
Out[26]:
[('a', <pyspark.resultiterable.ResultIterable at 0x103751b50>),
('b', <pyspark.resultiterable.ResultIterable at 0x103787050>)]
In [27]: for i in rdd.groupByKey().collect():
...: print(i[0], list(i[1]))
...:
('a', [1, 1])
('b', [1])
In [28]: rdd.groupByKey().mapValues(len).collect()
Out[28]: [('a', 2), ('b', 1)]
mapValues
一般有 groupByKey
就有 mapValues
,因为 groupByKey 只是把相同 key 组合起来(list -> value),mapValues 就要对这些 k-v pair 的 value(list) 进行操作。
函数签名如下:
"""
Signature: rdd.mapValues(f)
Docstring:
Pass each value in the key-value pair RDD through a map function
without changing the keys; this also retains the original RDD's
partitioning.
"""
>>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
>>> def f(x): return len(x)
>>> x.mapValues(f).collect()
[('a', 3), ('b', 1)]
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
上一篇: Hive 权限控制
下一篇: 彻底找到 Tomcat 启动速度慢的元凶
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论