火花结构化流 - 更新遵循groupbykey和mapgroupswithstate,给出重复的关键结果
我试图在数据链驱动(scala)中执行以下状态聚合:
sig_df
.as[InputRow]
.groupByKey(_.uid)
.mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents)
.writeStream
.queryName("events_per_window_2")
.format("memory")
.outputMode("update")
.start()
管理状态的功能是以下内容:
def updateAcrossEvents(uid: String,
inputs: Iterator[InputRow],
oldState: GroupState[UState]):UState =
{
var state:UState = if (oldState.exists) oldState.get else UState(uid, -999999, -999999, -999999)
for (input <- inputs) {
state = updateUStateWithEvent(state, input)
oldState.update(state)
}
state
}
此:
def updateUStateWithEvent(state:UState, input:InputRow):UState = {
// no timestamp, just ignore it
if (Option(input.timestamp).isEmpty) {
return state
}
if (input.sig_id == 10) {
state.front_in = input.sig_value.toInt
}
else if (input.sig_id == 17) {
state.rear_in = input.sig_value.toInt
}
else if (input.sig_id == 25){
state.top_in = input.sig_value.toInt
}
//return the updated state
state
}
我面临的问题是,输出具有对密钥UID的重复。以下查询返回大量结果:
SELECT uid, count(*) FROM events_per_window_2
where front_in <> -999999
or rear_in <> -999999
or top_in <> -999999
group by uid
having count(*) > 1
我有理由知道,由于输出模式是一个更新,因此我们不会得到任何欺骗。在这里我的方法可能会出现什么问题?
I am trying to execute the following stateful aggregation in Databricks (scala):
sig_df
.as[InputRow]
.groupByKey(_.uid)
.mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents)
.writeStream
.queryName("events_per_window_2")
.format("memory")
.outputMode("update")
.start()
The functions managing the state are these:
def updateAcrossEvents(uid: String,
inputs: Iterator[InputRow],
oldState: GroupState[UState]):UState =
{
var state:UState = if (oldState.exists) oldState.get else UState(uid, -999999, -999999, -999999)
for (input <- inputs) {
state = updateUStateWithEvent(state, input)
oldState.update(state)
}
state
}
And this:
def updateUStateWithEvent(state:UState, input:InputRow):UState = {
// no timestamp, just ignore it
if (Option(input.timestamp).isEmpty) {
return state
}
if (input.sig_id == 10) {
state.front_in = input.sig_value.toInt
}
else if (input.sig_id == 17) {
state.rear_in = input.sig_value.toInt
}
else if (input.sig_id == 25){
state.top_in = input.sig_value.toInt
}
//return the updated state
state
}
The issue I am facing is that the output has duplicates for the key uid. The following query returns plenty of results:
SELECT uid, count(*) FROM events_per_window_2
where front_in <> -999999
or rear_in <> -999999
or top_in <> -999999
group by uid
having count(*) > 1
I was of the understanding that since the outputMode is an update, we will not get any dupes.What might be going wrong with my approach here?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论