如何使用Flink中的单个窗口进行多个汇总?

发布于 2025-01-22 14:24:18 字数 1251 浏览 1 评论 0原文

我是Flink的新手,我想做很多次在Spark中做的事情。

例如,在Spark中,我可以在

ds.groupByKey(???).mapGroups(???) // aggregate 1
  .groupByKey(???).mapGroups(???) // aggregate 2

第一个汇总交易之下进行类似的输入数据,而第二个汇总处理第一个汇总的输出。我需要的是第二个骨料的输出。

但是在flink中,似乎任何聚合物都应使用特定窗口执行,

ds.keyBy(???)
  .window(???) // window 1
  .aggregate(???) // aggregate 1
  .keyBy(???)
  .window(???) // window 2
  .aggregate(???) // aggregate 2

如我设置窗口2,那么第二个汇总的输入数据可能不是第一个汇总的输出,这将违反我的愿望。

我想使用相同的批处理数据进行多个连续汇总,可以在一个窗口中收集。如何在Flink中实现它?

感谢您的帮助。


更新以获取更多详细信息。

窗口必须具有自己的策略,例如,我可能会设定窗口策略,例如

ds.keyBy(key1)
  .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.HOURS))) // window 1, 1 hour tumbling window
  .aggregate(???) // aggregate 1
  .keyBy(key2)
  .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.MINUTES))) // window 2, 1 minute tumbling window
  .aggregate(???) // aggregate 2

1窗口下方的窗口策略,可能会在一个小时的时间窗口中收集10亿行,并且在汇总后,它会输出一百万行。

我想在总计2中对一百万行进行一些计算,但我不知道哪种窗口策略可以准确地收集一百万行。

如果我将窗口2设置为上面的翻滚时间窗口,则可能将一百万行分为两批,而汇总2的输出将不是我需要的。

I'm new to Flink, and I want to do something I have done in Spark many times.

For example, in Spark I can do something like below

ds.groupByKey(???).mapGroups(???) // aggregate 1
  .groupByKey(???).mapGroups(???) // aggregate 2

The first aggregate deals with a batch of input data, and the second aggregate deals with the output of the first aggregate. What I need is the output of the second aggregate.

But in Flink, it seems that any aggregate should execute with a specific window like below

ds.keyBy(???)
  .window(???) // window 1
  .aggregate(???) // aggregate 1
  .keyBy(???)
  .window(???) // window 2
  .aggregate(???) // aggregate 2

If I set the window 2, then the input data of the second aggregate may NOT be the output of the first aggregate, which will go against my wish.

I want to do multiple continuous aggregate with the same batch data, which can be gathered in a single window. How to realize it in Flink?

Thanks for your help.


Update for more details.

Window must have its own strategy, for example I may set window strategy like below

ds.keyBy(key1)
  .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.HOURS))) // window 1, 1 hour tumbling window
  .aggregate(???) // aggregate 1
  .keyBy(key2)
  .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.MINUTES))) // window 2, 1 minute tumbling window
  .aggregate(???) // aggregate 2

Window 1 may gather one billion rows in the one hour of tumbling time window, and after aggregate it outputs one million rows.

I want to do some calculation with those one million rows in aggregate 2, but I don't know which window strategy could gather exactly those one million rows.

If I set the window 2 with tumbling time window like above, it may split those one million rows into two batch, and the output of aggregate 2 will not be what I need.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

╰つ倒转 2025-01-29 14:24:18

您可以通过使用事件时间窗口而不是处理时间窗口来避​​免此问题。而且,如果您在要用作计时基础的事件中还没有时间戳,那么您可以执行此类操作以使用摄入时间时间戳:


WatermarkStrategy<MyType> watermarkStrategy =
        WatermarkStrategy
            .<MyType>forMonotonousTimestamps()
            .withTimestampAssigner(
                (event, streamRecordTimestamp) -> Instant.now());

DataStream<MyType> timestampedEvents = ds
        .assignTimestampsAndWatermarks(watermarkStrategy);

timestampedEvents.keyBy(...)
  .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.MINUTES)))
  .aggregate(...)
  .keyBy(...)
  .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.HOURS)))
  .aggregate(...)

这有效,因为第一窗口产生的事件将每个时间戳都用时间戳将其分配给窗口的末端。这就要求第二个窗口的持续时间与第一个窗口的持续时间相同或倍数。

同样,对窗口2使用的密钥分区进行任意更改(与窗口1相比)可能会产生荒谬的结果。

You can avoid this problem by using event-time windows rather than processing-time windows. And if you don't already have timestamps in the events that you want to use as the basis for timing, then you can do something like this in order to use ingestion-time timestamps:


WatermarkStrategy<MyType> watermarkStrategy =
        WatermarkStrategy
            .<MyType>forMonotonousTimestamps()
            .withTimestampAssigner(
                (event, streamRecordTimestamp) -> Instant.now());

DataStream<MyType> timestampedEvents = ds
        .assignTimestampsAndWatermarks(watermarkStrategy);

timestampedEvents.keyBy(...)
  .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.MINUTES)))
  .aggregate(...)
  .keyBy(...)
  .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.HOURS)))
  .aggregate(...)

This works because the events produced by the first window will each be timestamped with the timestamp for the end of the window they were assigned to. This requires then that the second window's duration be the same as, or a multiple of, the first window's duration.

Similarly, making arbitrary changes to the key-partitioning used by window 2 (compared to window 1) may produce nonsensical results.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文