什么时候需要 ResultIterable?
groupByKey
RDD 的 函数返回 (
ResultIterable
类存在的原因时,我在 类定义:
特殊的可迭代结果。使用此方法是因为标准迭代器无法被 pickle
问题 1:为什么这个迭代器是“pickleable”的,而标准迭代器则不是?
我什至尝试在本地重写groupByKey
函数,但注释了最终的.mapValues(ResultIterable)
,基本上复制粘贴 仅必要的代码groupByKey
函数工作:
import pyspark
from pyspark.shuffle import Aggregator, ExternalMerger, \
get_used_memory, ExternalSorter, ExternalGroupBy
import os
from pyspark.resultiterable import ResultIterable
sc = pyspark.SparkContext('local[*]')
def portable_hash(x):
if 'PYTHONHASHSEED' not in os.environ:
raise RuntimeError("Randomness of hash of string should be disabled via PYTHONHASHSEED")
if x is None:
return 0
if isinstance(x, tuple):
h = 0x345678
for i in x:
h ^= portable_hash(i)
h *= 1000003
h &= sys.maxsize
h ^= len(x)
if h == -1:
h = -2
return int(h)
return hash(x)
def groupByKey(rdd, numPartitions=None, partitionFunc=portable_hash):
def createCombiner(x):
return [x]
def mergeValue(xs, x):
xs.append(x)
return xs
def mergeCombiners(a, b):
a.extend(b)
return a
memory = rdd._memory_limit()
serializer = rdd._jrdd_deserializer
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
def combine(iterator):
merger = ExternalMerger(agg, memory * 0.9, serializer)
merger.mergeValues(iterator)
return merger.items()
locally_combined = rdd.mapPartitions(combine, preservesPartitioning=True)
shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)
def groupByKey(it):
merger = ExternalGroupBy(agg, memory, serializer)
merger.mergeCombiners(it)
return merger.items()
return shuffled.mapPartitions(groupByKey, True)#.mapValues(ResultIterable)
test_rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(groupByKey(test_rdd).collect())
这会产生预期的结果,即
[('a', [1, 1]), ('b', [1])]
问题 2:从我的 UDF groupByKey
开始,您能否提供一个示例,其中缺少 .mapValues( ResultIterable)
变得有必要吗?
groupByKey
function for RDDs returns pairs of type (<key_type>, ResultIterable)
. While I was trying to understand the reason why the ResultIterable
class exist, I found the following docstring in the class definition:
A special result iterable. This is used because the standard iterator can not be pickled
Question 1: Why is this iterator "pickleable" and the standard one is not?
I even tried to locally re-write the groupByKey
function, but commenting the final .mapValues(ResultIterable)
, basically copy-pasting only the necessary code that would make the groupByKey
function working:
import pyspark
from pyspark.shuffle import Aggregator, ExternalMerger, \
get_used_memory, ExternalSorter, ExternalGroupBy
import os
from pyspark.resultiterable import ResultIterable
sc = pyspark.SparkContext('local[*]')
def portable_hash(x):
if 'PYTHONHASHSEED' not in os.environ:
raise RuntimeError("Randomness of hash of string should be disabled via PYTHONHASHSEED")
if x is None:
return 0
if isinstance(x, tuple):
h = 0x345678
for i in x:
h ^= portable_hash(i)
h *= 1000003
h &= sys.maxsize
h ^= len(x)
if h == -1:
h = -2
return int(h)
return hash(x)
def groupByKey(rdd, numPartitions=None, partitionFunc=portable_hash):
def createCombiner(x):
return [x]
def mergeValue(xs, x):
xs.append(x)
return xs
def mergeCombiners(a, b):
a.extend(b)
return a
memory = rdd._memory_limit()
serializer = rdd._jrdd_deserializer
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
def combine(iterator):
merger = ExternalMerger(agg, memory * 0.9, serializer)
merger.mergeValues(iterator)
return merger.items()
locally_combined = rdd.mapPartitions(combine, preservesPartitioning=True)
shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)
def groupByKey(it):
merger = ExternalGroupBy(agg, memory, serializer)
merger.mergeCombiners(it)
return merger.items()
return shuffled.mapPartitions(groupByKey, True)#.mapValues(ResultIterable)
test_rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(groupByKey(test_rdd).collect())
And this produces the expected result, that is
[('a', [1, 1]), ('b', [1])]
Question 2: starting from my UDF groupByKey
, could you provide an example where the missing .mapValues(ResultIterable)
becomes necessary?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论