Flink AndientTimestampSandwatermark
我想通过request_time来计算健康检查数据的状态代码,从当前时间开始使用1分钟的窗口。
众所周知,健康检查每分钟发送约60条请求。因此,结果应该像{环境:“ xx”,主机:“ xx”,200:60,300:0,400:0,500:0}
,
但实际结果是{环境:“ xx”,主机:“ xx”,200:50000,300:0,400:0,500:0}
,它计算了许多先前的数据。
我的代码就像
env.fromSource(kafkasource).filter().flatMap()
.assignTimeStampAndWaterMarks(
WaterMarkStrategy.<OnjectNode>forboundedOutOfOrderness(DurationOfSeconds(60).withTimeStampAssigner(assigner))
.keyBy("env","host")
.window(1m)
.reduce()
有人知道缺少什么或我在逻辑上错了吗?
I want to count the status code of health check data by request_time with a 1 minute window from the current time.
It's known that the health check sends about 60 requests per minute. So the result should be like {environment: "XX", host: "XX", 200: 60, 300: 0, 400: 0, 500: 0}
But the actual result is {environment: "XX", host: "XX", 200: 50000, 300: 0, 400: 0, 500: 0}
, which counts a lot of previous data.
My code is like
env.fromSource(kafkasource).filter().flatMap()
.assignTimeStampAndWaterMarks(
WaterMarkStrategy.<OnjectNode>forboundedOutOfOrderness(DurationOfSeconds(60).withTimeStampAssigner(assigner))
.keyBy("env","host")
.window(1m)
.reduce()
Does anyone know what's missing or I am wrong in the logic?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
可能是您应该在
redain()
和flatmap()
中提供代码的更多详细信息。基于当前可用的信息,此问题可能是中的状态
没有重置。您可以在中检查
中的代码,或者是否正确设置了TTL。May be you should provide more detail info of your code in
reduce()
andflatmap()
. Based on currently available information, this problem may be that the state inreduce
is not being reset. you can check the code inreduce
or whether is correctly ttl set.