无法访问 RDD foreach 函数内的 scala 值/变量(空)

发布于 2025-01-12 16:54:19 字数 1164 浏览 1 评论 0原文

我有一个 Spark 结构化流作业,需要按照以下代码使用 forEachBatch 函数内的 rdd.forEach

val tableName = "ddb_table"

df
    .writeStream
    .foreachBatch { (batchDF: DataFrame, _: Long) =>
      batchDF
        .rdd
        .foreach(
          r => updateDDB(r, tableName, "key")
        )

      curDate= LocalDate.now().toString.replaceAll("-", "/")
      prevDate= LocalDate.now().minusDays(1).toString.replaceAll("-", "/")
    }
    .outputMode(OutputMode.Append)
    .option("checkpointLocation", "checkPointDir")
    .start()
    .awaitTermination()

发生的情况是 tableName 变量在 rdd.forEach 函数内无法识别,因为在 updateDDB 内调用 DynamoDB API 会引发异常,指出 tableName 不能为 null。

问题显然出在 rdd/forEach 及其处理变量的方式上。我阅读了一些有关广播变量的内容,但我没有足够的经验在较低级别上使用 RDD 和 Spark 来确定要走的路。

一些注意事项:

  1. 我需要将其放在 forEachBatch 函数内,因为除了写入 DDB 之外,我还需要更新其他变量(在本例中为 curDate 和 prevDate) 变量)
  2. 当我直接在函数调用中传递 tableName 参数时,代码运行成功。
  3. 我有一个扩展 ForEachWriter 的类,在使用 forEach 而不是 forEachBatch 时可以正常工作,但如第 1 点所述,我需要使用第二个,因为我需要在流批处理时间更新一些内容。

I have a Spark Structured Streaming job that needs to use the rdd.forEach inside the forEachBatch function as per the bellow code:

val tableName = "ddb_table"

df
    .writeStream
    .foreachBatch { (batchDF: DataFrame, _: Long) =>
      batchDF
        .rdd
        .foreach(
          r => updateDDB(r, tableName, "key")
        )

      curDate= LocalDate.now().toString.replaceAll("-", "/")
      prevDate= LocalDate.now().minusDays(1).toString.replaceAll("-", "/")
    }
    .outputMode(OutputMode.Append)
    .option("checkpointLocation", "checkPointDir")
    .start()
    .awaitTermination()

What happens is that the tableName variable is not recognized inside the rdd.forEach function because the call to the DynamoDB API inside the updateDDB raises an exception stating that the tableName cannot be null.

The issue is clearly in the rdd/forEach and the way it works with variables. I read some things about broadcast variables, but I don't have enough experience working with RDDs and Spark in a much lower level to be sure what is the way to go.

Some notes:

  1. I need this to be inside the forEachBatch function because I need to update other variables apart from this write to DDB (in this case the curDate and prevDate variables)
  2. The code runs successfully when I pass the tableName parameter directly in the function call.
  3. I have one class that extends the ForEachWriter that works ok when using the forEach instead of the forEachBatch, but as stated in point 1) I need to use the second because I need to update several things at a streaming batch time.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文