Spark的sortBy函数为什么会生成4个MapPartitionsRDD?
在spark-shell中执行两段程序:
第一段sortBy:
val list1: List[(String, Int)] = List(("the", 12), ("they", 2), ("do", 4), ("wild", 1), ("and", 5), ("into", 4))
val listRDD1: RDD[(String, Int)] = sc.parallelize(list1)
val result1: RDD[(String, Int)] = listRDD1.sortBy(_._2, false)
result1.collect()
在webui中查看程序的DAG,产生了3个Stage:
其中出现了4个MapPartitionsRDD,3个在ShuffledRDD之前,一个在shuffledRDD之后。
查看sortBy的源码:
f sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.size)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
this.keyBy[K](f).sortByKey(ascending, numPartitions).values
其中的keyBy会在shuffle之前生成一个MapPartitionsRDD, values会在shuffled之后生成一个MapPartitionsRDD。剩下的两个MapPartitionsRDD应该是sortByKey生成的。
用程序验证这个猜想:
val list2: List[(Int, (String, Int))] = List((12, ("the", 12)), (2, ("they", 2)), (4, ("do", 4)), (1, ("wild", 1)), (5, ("and", 5)), (4, ("into", 4)))
val listRDD2: RDD[(Int, (String, Int))] = sc.parallelize(list2)
val result2: RDD[(Int, (String, Int))] = listRDD2.sortByKey(false)
result2.collect()
查看程序的DAG,也是三个Stage:
看DAG确实是生成了两个MapPartitionsRDD,但是这两个MapPartitionsRDD都是怎么生成的?而且为什么中间又出现了一个parallelize阶段?求大佬解答。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论