Spark的sortBy函数为什么会生成4个MapPartitionsRDD?

发布于 2022-09-11 17:57:09 字数 1826 浏览 18 评论 0

在spark-shell中执行两段程序:
第一段sortBy:

val list1: List[(String, Int)] = List(("the", 12), ("they", 2), ("do", 4), ("wild", 1), ("and", 5), ("into", 4))
val listRDD1: RDD[(String, Int)] = sc.parallelize(list1)
val result1: RDD[(String, Int)] = listRDD1.sortBy(_._2, false)
result1.collect()

在webui中查看程序的DAG,产生了3个Stage:

clipboard.png

clipboard.png

clipboard.png

其中出现了4个MapPartitionsRDD,3个在ShuffledRDD之前,一个在shuffledRDD之后。
查看sortBy的源码:

f sortBy[K](
    f: (T) => K,
    ascending: Boolean = true,
    numPartitions: Int = this.partitions.size)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
  this.keyBy[K](f).sortByKey(ascending, numPartitions).values

其中的keyBy会在shuffle之前生成一个MapPartitionsRDD, values会在shuffled之后生成一个MapPartitionsRDD。剩下的两个MapPartitionsRDD应该是sortByKey生成的。

用程序验证这个猜想:

val list2: List[(Int, (String, Int))] = List((12, ("the", 12)), (2, ("they", 2)), (4, ("do", 4)), (1, ("wild", 1)), (5, ("and", 5)), (4, ("into", 4)))
val listRDD2: RDD[(Int, (String, Int))] = sc.parallelize(list2)
val result2: RDD[(Int, (String, Int))] = listRDD2.sortByKey(false)
result2.collect()

查看程序的DAG,也是三个Stage:

clipboard.png

clipboard.png

clipboard.png

看DAG确实是生成了两个MapPartitionsRDD,但是这两个MapPartitionsRDD都是怎么生成的?而且为什么中间又出现了一个parallelize阶段?求大佬解答。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文