AggregateFunction 与 SessionWindow - 了解合并的工作原理
在使用 EventTimeSessionWindows 在 Flink 中实现 AggregateFunction 时,我无法理解在 SessionWindow 具有动态间隙的情况下何时发生合并。
代码片段:
SingleOutputStreamOperator<Tuple1<String>> aggregateData = parsedData.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(20)))
.keyBy(new ZeusRawKeyByFunction())
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<ZeusEvent>() {
@Override
public long extract(ZeusEvent event) {
if (event.getEventTypeName().equals("PlaybackSessionClosed")) {
return 100;
} else {
return Time.minutes(30).toMilliseconds();
}
}
}))
.allowedLateness(Time.minutes(10))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
.sideOutputLateData(lateEvents)
.aggregate(new ZeusAggregateFunction())
.setParallelism(parameterTool.getInt("zeus-aggregator-parallelism"))
.name("Zeus Aggregator")
我在聚合器中定义了四个函数:
createAccumulator
:这将创建一个新的累加器add
:这将继续将 1 分钟触发时间内的所有新事件添加到累加器getResult
:这将获取要写入接收器的最后一行那个触发器merge
:这何时起作用?每个触发器都会发生合并吗?
我试图了解合并是否会每分钟发生一次触发器,并且创建一个新的累加器并与前一个累加器合并。
While implementing the AggregateFunction
in Flink with EventTimeSessionWindows
, I am not able to understand when the merge happens in case of a SessionWindow with dynamic gap.
Code snippet:
SingleOutputStreamOperator<Tuple1<String>> aggregateData = parsedData.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(20)))
.keyBy(new ZeusRawKeyByFunction())
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<ZeusEvent>() {
@Override
public long extract(ZeusEvent event) {
if (event.getEventTypeName().equals("PlaybackSessionClosed")) {
return 100;
} else {
return Time.minutes(30).toMilliseconds();
}
}
}))
.allowedLateness(Time.minutes(10))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
.sideOutputLateData(lateEvents)
.aggregate(new ZeusAggregateFunction())
.setParallelism(parameterTool.getInt("zeus-aggregator-parallelism"))
.name("Zeus Aggregator")
I have defined four functions in the aggregator:
createAccumulator
: This creates a new accumulatoradd
: This will keep on adding all the new events in the 1 min trigger time to the accumulatorgetResult
: This will get the final row to write to sink for that triggermerge
: When does this work ? Does the merge happen for every trigger?
I am trying to understand if the merge will happen every min with the trigger and a new accumulator gets created and gets merged with the previous one.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
考虑一个带有时间戳 et 的事件 e 和计算为 gap(e) 的动态间隙。
当每个事件e到达窗口操作符时,它最初被分配给一个新的会话窗口,从
et
延伸到et+gap(e)
。然后,窗口运算符迭代所有会话(对于每个键独立),并且每当两个会话重叠(在时间上)时,它们就会合并以形成一个新的、更长的会话,覆盖两个会话的时间跨度的并集。这一直持续到不可能进一步合并为止。每次合并发生时,都会调用触发器的
onMerge
方法以及累加器的merge
方法。然后调用触发器的
onElement
方法,并传入事件 e。这将确保适当的计时器到位,以便当水印通过会话窗口末尾(包括适当的间隙)时,窗口将触发。因此,合并是在处理每个事件时完成的,并且与连续/周期性触发的定时无关。
Consider an event e with timestamp e.t and a dynamic gap computed as gap(e).
As each event e arrives at the window operator, it is initially assigned to a new session window extending from
e.t
toe.t + gap(e)
. Then the window operator iterates over all of the sessions (independently for each key), and whenever two sessions overlap (in time), they are merged to form a new, longer session covering the union of the timespans of both sessions. This continues until no further merging is possible.As each merge occurs, the
onMerge
method of the trigger is called, as well as themerge
method of the accumulator.Then the
onElement
method of the trigger is called, passing in the event e. This will ensure that the appropriate timer is in place so that when the watermark passes the end of the session window (which includes the appropriate gap) the window will FIRE.Thus the merging is done as each event is processed, and is not coupled to the timing of the continuous/periodic trigger.