无法访问 RDD foreach 函数内的 scala 值/变量(空)
我有一个 Spark 结构化流作业,需要按照以下代码使用 forEachBatch
函数内的 rdd.forEach
:
val tableName = "ddb_table"
df
.writeStream
.foreachBatch { (batchDF: DataFrame, _: Long) =>
batchDF
.rdd
.foreach(
r => updateDDB(r, tableName, "key")
)
curDate= LocalDate.now().toString.replaceAll("-", "/")
prevDate= LocalDate.now().minusDays(1).toString.replaceAll("-", "/")
}
.outputMode(OutputMode.Append)
.option("checkpointLocation", "checkPointDir")
.start()
.awaitTermination()
发生的情况是 tableName 变量在
rdd.forEach
函数内无法识别,因为在 updateDDB
内调用 DynamoDB API 会引发异常,指出 tableName 不能为 null。
问题显然出在 rdd/forEach 及其处理变量的方式上。我阅读了一些有关广播变量的内容,但我没有足够的经验在较低级别上使用 RDD 和 Spark 来确定要走的路。
一些注意事项:
- 我需要将其放在 forEachBatch 函数内,因为除了写入 DDB 之外,我还需要更新其他变量(在本例中为 curDate 和 prevDate) 变量)
- 当我直接在函数调用中传递
tableName
参数时,代码运行成功。 - 我有一个扩展
ForEachWriter
的类,在使用forEach
而不是forEachBatch
时可以正常工作,但如第 1 点所述,我需要使用第二个,因为我需要在流批处理时间更新一些内容。
I have a Spark Structured Streaming job that needs to use the rdd.forEach
inside the forEachBatch
function as per the bellow code:
val tableName = "ddb_table"
df
.writeStream
.foreachBatch { (batchDF: DataFrame, _: Long) =>
batchDF
.rdd
.foreach(
r => updateDDB(r, tableName, "key")
)
curDate= LocalDate.now().toString.replaceAll("-", "/")
prevDate= LocalDate.now().minusDays(1).toString.replaceAll("-", "/")
}
.outputMode(OutputMode.Append)
.option("checkpointLocation", "checkPointDir")
.start()
.awaitTermination()
What happens is that the tableName
variable is not recognized inside the rdd.forEach
function because the call to the DynamoDB API inside the updateDDB
raises an exception stating that the tableName cannot be null.
The issue is clearly in the rdd/forEach and the way it works with variables. I read some things about broadcast variables, but I don't have enough experience working with RDDs and Spark in a much lower level to be sure what is the way to go.
Some notes:
- I need this to be inside the
forEachBatch
function because I need to update other variables apart from this write to DDB (in this case thecurDate
andprevDate
variables) - The code runs successfully when I pass the
tableName
parameter directly in the function call. - I have one class that extends the
ForEachWriter
that works ok when using theforEach
instead of theforEachBatch
, but as stated in point 1) I need to use the second because I need to update several things at a streaming batch time.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论