返回介绍

数学基础

统计学习

深度学习

工具

Scala

一、累加器

发布于 2023-07-17 23:38:23 字数 3162 浏览 0 评论 0 收藏 0

  1. 在集群中执行代码时,一个难点是:理解变量和方法的范围、生命周期。下面是一个闭包的例子:

    ​x
    counter = 0
    rdd = sc.parallelize(data)
    ​
    def increment_counter(x):
        global counter
        counter += x
    rdd.foreach(increment_counter)
    print("Counter value: ", counter)

    上述代码的行为是不确定的,并且无法按照预期正常工作。

  2. 在执行作业时,spark 会分解RDD 操作到每个executortask 中。在执行之前,spark 计算任务的闭包

    • 所谓闭包:指的是executor 要在RDD 上进行计算时,必须对执行节点可见的那些变量和方法
    • 闭包被序列化,并被发送到每个executor
  3. 在上述代码中,闭包的变量的副本被发送给每个executor,当counterforeach 函数引用时,它已经不再是驱动器节点的counter

    • 虽然驱动器程序中,仍然有一个counter 在内存中;但是对于executors ,它是不可见的。
    • executor 看到的只是序列化的闭包的一个副本。所有对counter 的操作都是在executor 的本地进行。
    • 要想正确实现预期目标,则需要使用累加器

1.1 Accumulator

  1. 一个累加器(Accumulator)变量只支持累加操作

    • 工作节点和驱动器程序对它都可以执行+= 操作,但是只有驱动器程序可以访问它的值。

      在工作节点上,累加器对象看起来就像是一个只写的变量

    • 工作节点对它执行的任何累加,都将自动的传播到驱动器程序中。

  2. SparkContext 的累加器变量只支持基本的数据类型,如int、float 等。

    • 你可以通过AccumulatorParam 来实现自定义的累加器
  3. Accumulator 的方法:

    • .add(term):向累加器中增加值term
  4. Accumulator 的属性:

    • .value:获取累加器的值。只可以在驱动器程序中使用
  5. 通常使用累加器的流程为:

    • 在驱动器程序中调用SparkContext.accumulator(init_value) 来创建出带有初始值的累加器
    • 在执行器的代码中使用累加器的+= 方法或者.add(term) 方法来增加累加器的值
    • 在驱动器程序中使用累加器的.value 属性来访问累加器的值

    示例:

    
    
    xxxxxxxxxx
    file=sc.textFile('xxx.txt') acc=sc.accumulator(0) def xxx(line): global acc #访问全局变量 if yyy: acc+=1 return zzz rdd=file.map(xxx)

1.2 累加器与容错性

  1. spark 中同一个任务可能被运行多次:

    • 如果工作节点失败了,则spark 会在另一个节点上重新运行该任务
    • 如果工作节点处理速度比别的节点慢很多,则spark 也会抢占式的在另一个节点上启动一个投机性的任务副本
    • 甚至有时候spark 需要重新运行任务来获取缓存中被移出内存的数据
  2. spark 同一个任务被运行多次时,任务中的累加器的处理规则:

    • 在行动操作中使用的累加器,spark 确保每个任务对各累加器修改应用一次

      • 因此:如果想要一个无论在失败还是重新计算时,都绝对可靠的累加器,我们必须将它放在foreach() 这样的行动操作中
    • 在转化操作中使用的累加器,无法保证只修改应用一次。

      • 转化操作中累加器可能发生不止一次更新
      • 在转化操作中,累加器通常只用于调试目的

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文