如何使用 KeyedProcessFunction 实现在 apache flink 中进行多个窗口聚合?

发布于 2025-01-20 03:57:34 字数 588 浏览 2 评论 0原文

我想扩展下面的窗口聚合以计算更高的窗口聚集。

我的下部窗口聚合使用了keyedProcessfunction,并且实现了OnTimer,以便在窗口末端将数据冲入水槽中。

我的代码类似于此处解释的内容: https://nightlies.apache.org/flink/flink/flink-docs-release-1.14/docs/learn-flink/event_driven/ 。但是我还需要在较高的窗口上计算聚合,

Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
 ... ->window(1 day) -> out_1 -> out_2 -> out_3 ... ->out_n

如何扩展较低的窗口结果以计算较高的窗口聚合?

I want to extend my lower window aggregations to compute higher window aggregations.

My lower window aggregation is using the KeyedProcessFunction, and onTimer is implemented so as to flush data into sink at end of window.

My code is similar to what has been explained here : https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/learn-flink/event_driven/. But I need to compute aggregations over higher windows as well,

Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
 ... ->window(1 day) -> out_1 -> out_2 -> out_3 ... ->out_n

How do I extend the lower window results to calculate the higher window aggregations?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

倥絔 2025-01-27 03:57:34

您可以从窗口的下层层获取输出,然后将其发送到较高的窗口中,例如:

val hourlyTips = fares
    .map((f: TaxiFare) => (f.driverId, f.tip))
    .keyBy(_._1)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .reduce((f1: (Long, Float), f2: (Long, Float)) => { (f1._1, f1._2 + f2._2) })

val dailyTips = hourlyTips
    .keyBy(_._1)
    .window(TumblingEventTimeWindows.of(Time.hours(24)))
    .reduce((f1: (Long, Float), f2: (Long, Float)) => { (f1._1, f1._2 + f2._2) })

hourlyTips.addSink(...)
dailyTips.addSink(...)

无论是使用DataStream Windows实现窗口,还是使用keyedProcessfunction实现自己的窗口。

You can take the output from a lower layer of windowing and send it into a higher one, like this, for example:

val hourlyTips = fares
    .map((f: TaxiFare) => (f.driverId, f.tip))
    .keyBy(_._1)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .reduce((f1: (Long, Float), f2: (Long, Float)) => { (f1._1, f1._2 + f2._2) })

val dailyTips = hourlyTips
    .keyBy(_._1)
    .window(TumblingEventTimeWindows.of(Time.hours(24)))
    .reduce((f1: (Long, Float), f2: (Long, Float)) => { (f1._1, f1._2 + f2._2) })

hourlyTips.addSink(...)
dailyTips.addSink(...)

This basic approach should work whether the windowing is implemented using DataStream windows, or with windows you implement yourself using a KeyedProcessFunction.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文