火花结构化流 - 更新遵循groupbykey和mapgroupswithstate,给出重复的关键结果

发布于 2025-01-31 21:46:10 字数 1460 浏览 5 评论 0原文

我试图在数据链驱动(scala)中执行以下状态聚合:

sig_df
  .as[InputRow]
  .groupByKey(_.uid)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("events_per_window_2")
  .format("memory")
  .outputMode("update")
  .start()

管理状态的功能是以下内容:

    def updateAcrossEvents(uid: String,
                       inputs: Iterator[InputRow],
                       oldState: GroupState[UState]):UState = 
{
     var state:UState = if (oldState.exists) oldState.get else UState(uid, -999999, -999999, -999999)
 
  for (input <- inputs) {
    state = updateUStateWithEvent(state, input)
    oldState.update(state)
  }
  state
}

此:

def updateUStateWithEvent(state:UState, input:InputRow):UState = {
// no timestamp, just ignore it  
if (Option(input.timestamp).isEmpty) {
    return state
  }  

if (input.sig_id == 10) {
  state.front_in = input.sig_value.toInt
}
  else if (input.sig_id == 17) {
   state.rear_in = input.sig_value.toInt
  }
  else if (input.sig_id == 25){
    state.top_in = input.sig_value.toInt
  }
  
  //return the updated state
  state
}

我面临的问题是,输出具有对密钥UID的重复。以下查询返回大量结果:

SELECT uid, count(*) FROM events_per_window_2
where front_in <> -999999
or rear_in <> -999999
or top_in <> -999999
group by uid
having count(*) > 1

我有理由知道,由于输出模式是一个更新,因此我们不会得到任何欺骗。在这里我的方法可能会出现什么问题?

I am trying to execute the following stateful aggregation in Databricks (scala):

sig_df
  .as[InputRow]
  .groupByKey(_.uid)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("events_per_window_2")
  .format("memory")
  .outputMode("update")
  .start()

The functions managing the state are these:

    def updateAcrossEvents(uid: String,
                       inputs: Iterator[InputRow],
                       oldState: GroupState[UState]):UState = 
{
     var state:UState = if (oldState.exists) oldState.get else UState(uid, -999999, -999999, -999999)
 
  for (input <- inputs) {
    state = updateUStateWithEvent(state, input)
    oldState.update(state)
  }
  state
}

And this:

def updateUStateWithEvent(state:UState, input:InputRow):UState = {
// no timestamp, just ignore it  
if (Option(input.timestamp).isEmpty) {
    return state
  }  

if (input.sig_id == 10) {
  state.front_in = input.sig_value.toInt
}
  else if (input.sig_id == 17) {
   state.rear_in = input.sig_value.toInt
  }
  else if (input.sig_id == 25){
    state.top_in = input.sig_value.toInt
  }
  
  //return the updated state
  state
}

The issue I am facing is that the output has duplicates for the key uid. The following query returns plenty of results:

SELECT uid, count(*) FROM events_per_window_2
where front_in <> -999999
or rear_in <> -999999
or top_in <> -999999
group by uid
having count(*) > 1

I was of the understanding that since the outputMode is an update, we will not get any dupes.What might be going wrong with my approach here?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文