Flink AndientTimestampSandwatermark
我想通过request_time来计算健康检查数据的状态代码,从当前时间开始使用1分钟的窗口。
众所周知,健康检查每分钟发送约60条请求。因此,结果应该像{环境:“ xx”,主机:“ xx”,200:60,300:0,400:0,500:0}
,
但实际结果是{环境:“ xx”,主机:“ xx”,200:50000,300:0,400:0,500:0}
,它计算了许多先前的数据。
我的代码就像
env.fromSource(kafkasource).filter().flatMap()
.assignTimeStampAndWaterMarks(
WaterMarkStrategy.<OnjectNode>forboundedOutOfOrderness(DurationOfSeconds(60).withTimeStampAssigner(assigner))
.keyBy("env","host")
.window(1m)
.reduce()
有人知道缺少什么或我在逻辑上错了吗?
I want to count the status code of health check data by request_time with a 1 minute window from the current time.
It's known that the health check sends about 60 requests per minute. So the result should be like {environment: "XX", host: "XX", 200: 60, 300: 0, 400: 0, 500: 0}
But the actual result is {environment: "XX", host: "XX", 200: 50000, 300: 0, 400: 0, 500: 0}
, which counts a lot of previous data.
My code is like
env.fromSource(kafkasource).filter().flatMap()
.assignTimeStampAndWaterMarks(
WaterMarkStrategy.<OnjectNode>forboundedOutOfOrderness(DurationOfSeconds(60).withTimeStampAssigner(assigner))
.keyBy("env","host")
.window(1m)
.reduce()
Does anyone know what's missing or I am wrong in the logic?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
可能是您应该在
redain()
和flatmap()
中提供代码的更多详细信息。基于当前可用的信息,此问题可能是中的状态
没有重置。您可以在中检查
中的代码,或者是否正确设置了TTL。May be you should provide more detail info of your code in
reduce()
andflatmap()
. Based on currently available information, this problem may be that the state inreduce
is not being reset. you can check the code inreduce
or whether is correctly ttl set.