Flink AndientTimestampSandwatermark

发布于 2025-02-01 05:35:13 字数 564 浏览 4 评论 0原文

我想通过request_time来计算健康检查数据的状态代码,从当前时间开始使用1分钟的窗口。

众所周知,健康检查每分钟发送约60条请求。因此,结果应该像{环境:“ xx”,主机:“ xx”,200:60,300:0,400:0,500:0}

但实际结果是{环境:“ xx”,主机:“ xx”,200:50000,300:0,400:0,500:0},它计算了许多先前的数据。

我的代码就像

  env.fromSource(kafkasource).filter().flatMap()
    .assignTimeStampAndWaterMarks(
    WaterMarkStrategy.<OnjectNode>forboundedOutOfOrderness(DurationOfSeconds(60).withTimeStampAssigner(assigner))
    .keyBy("env","host")
    .window(1m)
    .reduce()

有人知道缺少什么或我在逻辑上错了吗?

I want to count the status code of health check data by request_time with a 1 minute window from the current time.

It's known that the health check sends about 60 requests per minute. So the result should be like {environment: "XX", host: "XX", 200: 60, 300: 0, 400: 0, 500: 0}

But the actual result is {environment: "XX", host: "XX", 200: 50000, 300: 0, 400: 0, 500: 0}, which counts a lot of previous data.

My code is like

  env.fromSource(kafkasource).filter().flatMap()
    .assignTimeStampAndWaterMarks(
    WaterMarkStrategy.<OnjectNode>forboundedOutOfOrderness(DurationOfSeconds(60).withTimeStampAssigner(assigner))
    .keyBy("env","host")
    .window(1m)
    .reduce()

Does anyone know what's missing or I am wrong in the logic?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

迷路的信 2025-02-08 05:35:13

可能是您应该在redain()flatmap()中提供代码的更多详细信息。基于当前可用的信息,此问题可能是中的状态没有重置。您可以在中检查中的代码,或者是否正确设置了TTL。

May be you should provide more detail info of your code in reduce() and flatmap(). Based on currently available information, this problem may be that the state in reduce is not being reset. you can check the code in reduce or whether is correctly ttl set.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文