AggregateFunction 与 SessionWindow - 了解合并的工作原理

发布于 2025-01-10 05:53:13 字数 1545 浏览 3 评论 0原文

在使用 EventTimeSessionWindows 在 Flink 中实现 AggregateFunction 时,我无法理解在 SessionWindow 具有动态间隙的情况下何时发生合并。

代码片段:

SingleOutputStreamOperator<Tuple1<String>> aggregateData = parsedData.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(20)))
.keyBy(new ZeusRawKeyByFunction())
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<ZeusEvent>() {
                    @Override
                    public long extract(ZeusEvent event) {
                        if (event.getEventTypeName().equals("PlaybackSessionClosed")) {
                            return 100;
                        } else {
                            return Time.minutes(30).toMilliseconds();
                        }
                    }
                }))
                .allowedLateness(Time.minutes(10))
                .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
                .sideOutputLateData(lateEvents)
                .aggregate(new ZeusAggregateFunction())
                .setParallelism(parameterTool.getInt("zeus-aggregator-parallelism"))
                .name("Zeus Aggregator")

我在聚合器中定义了四个函数:

  • createAccumulator:这将创建一个新的累加器

  • add:这将继续将 1 分钟触发时间内的所有新事件添加到累加器

  • getResult:这将获取要写入接收器的最后一行那个触发器

  • merge:这何时起作用?每个触发器都会发生合并吗?

我试图了解合并是否会每分钟发生一次触发器,并且创建一个新的累加器并与前一个累加器合并。

While implementing the AggregateFunction in Flink with EventTimeSessionWindows, I am not able to understand when the merge happens in case of a SessionWindow with dynamic gap.

Code snippet:

SingleOutputStreamOperator<Tuple1<String>> aggregateData = parsedData.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(20)))
.keyBy(new ZeusRawKeyByFunction())
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<ZeusEvent>() {
                    @Override
                    public long extract(ZeusEvent event) {
                        if (event.getEventTypeName().equals("PlaybackSessionClosed")) {
                            return 100;
                        } else {
                            return Time.minutes(30).toMilliseconds();
                        }
                    }
                }))
                .allowedLateness(Time.minutes(10))
                .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
                .sideOutputLateData(lateEvents)
                .aggregate(new ZeusAggregateFunction())
                .setParallelism(parameterTool.getInt("zeus-aggregator-parallelism"))
                .name("Zeus Aggregator")

I have defined four functions in the aggregator:

  • createAccumulator: This creates a new accumulator

  • add: This will keep on adding all the new events in the 1 min trigger time to the accumulator

  • getResult: This will get the final row to write to sink for that trigger

  • merge: When does this work ? Does the merge happen for every trigger?

I am trying to understand if the merge will happen every min with the trigger and a new accumulator gets created and gets merged with the previous one.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

笑忘罢 2025-01-17 05:53:13

考虑一个带有时间戳 et 的事件 e 和计算为 gap(e) 的动态间隙。

当每个事件e到达窗口操作符时,它最初被分配给一个新的会话窗口,从et延伸到et+gap(e) 。然后,窗口运算符迭代所有会话(对于每个键独立),并且每当两个会话重叠(在时间上)时,它们就会合并以形成一个新的、更长的会话,覆盖两个会话的时间跨度的并集。这一直持续到不可能进一步合并为止。

每次合并发生时,都会调用触发器的 onMerge 方法以及累加器的 merge 方法。

然后调用触发器的 onElement 方法,并传入事件 e。这将确保适当的计时器到位,以便当水印通过会话窗口末尾(包括适当的间隙)时,窗口将触发。

因此,合并是在处理每个事件时完成的,并且与连续/周期性触发的定时无关。

Consider an event e with timestamp e.t and a dynamic gap computed as gap(e).

As each event e arrives at the window operator, it is initially assigned to a new session window extending from e.t to e.t + gap(e). Then the window operator iterates over all of the sessions (independently for each key), and whenever two sessions overlap (in time), they are merged to form a new, longer session covering the union of the timespans of both sessions. This continues until no further merging is possible.

As each merge occurs, the onMerge method of the trigger is called, as well as the merge method of the accumulator.

Then the onElement method of the trigger is called, passing in the event e. This will ensure that the appropriate timer is in place so that when the watermark passes the end of the session window (which includes the appropriate gap) the window will FIRE.

Thus the merging is done as each event is processed, and is not coupled to the timing of the continuous/periodic trigger.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文