Flink SQL间隔连接不触发

发布于 2025-02-03 02:47:24 字数 1044 浏览 6 评论 0原文

我在两个无界流之间有一个简单的间隔连接。这可以处理较小的工作负载,但是在较大的(生产环境)的情况下,它不再起作用。从观察输出来看,我可以看到只有在扫描整个主题(并因此读取内存?)后,flink sql sql作业触发器/emitts记录,但是我希望在Ingle Match IS IS时触发记录成立。由于在我的生产环境中,该作业无法承受将整个桌子读成记忆的内容。

我正在制作的间隔联接与此处提供的示例非常相似: https://github.com/ververica/flink-sql-cookbook/blob/main/main/moins/02_interval_joins/02_interval_joins.md

SELECT
  o.id AS order_id,
  o.order_time,
  s.shipment_time,
  TIMESTAMPDIFF(DAY,o.order_time,s.shipment_time) AS day_diff
FROM orders o
JOIN shipments s ON o.id = s.order_id
WHERE 
    o.order_time BETWEEN s.shipment_time - INTERVAL '3' DAY AND s.shipment_time;

除了我的时间间隔尽可能小(尽可能小)秒)。我在Flink SQL源表上也有5秒钟的水印。

我如何指示Flink在与Join进行单个“匹配”后立即触发记录?由于目前,这项工作正在尝试在发出任何记录之前扫描整个桌子,这对于我的数据量不可行。从我的理解来看,它只需要扫描到间隔(时间窗口)并进行检查,然后一旦通过间隔,就会发出/触发记录。

同样,从观察集群中,我可以看到水印正在移动,但没有发射记录。

I have a simple interval join between two unbounded streams. This works with small workloads, but with a larger (production environment) it no longer works. From observing the output I can see that the Flink SQL Job triggers/emitts records only once the entire topic has been scanned (and consequently read into memory?), but I would want the job to trigger the record as soon as as ingle match is found. Since in my production environment the job cannot withstand reading the entire table into memory.

The interval join which I'm making is very similar to the examples provided here: https://github.com/ververica/flink-sql-cookbook/blob/main/joins/02_interval_joins/02_interval_joins.md

SELECT
  o.id AS order_id,
  o.order_time,
  s.shipment_time,
  TIMESTAMPDIFF(DAY,o.order_time,s.shipment_time) AS day_diff
FROM orders o
JOIN shipments s ON o.id = s.order_id
WHERE 
    o.order_time BETWEEN s.shipment_time - INTERVAL '3' DAY AND s.shipment_time;

Except my time interval is as small as possible (couple of seconds). I also have a watermark of 5 seconds on the Flink SQL source tables.

How can I instruct Flink to emitt/trigger the records as soon as it has made a single 'match' with the join? As currently the job is trying to scan the entire table before emitting any records, which is not feasible with my data volumes. From my understanding it should only need to scan up until the interval (time window) and check that, and once the interval is passed then the record is emitted/triggered.

Also from observing the cluster I can see that the watermark is moving, but no records are being emitted.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

够钟 2025-02-10 02:47:24

可能放弃了一些数据,您可以检查活动时间是否合理。在这个场景中,您可以尝试使用常规加入并设置3天TTL(table.state.ttl = 3天),该>)可以确保加入的每个数据的输出。

May be some data was abandoned, you can check your event time whether if it's reasonable . In this scenes, you can try to use regular join and set a 3 days ttl(table.state.ttl = 3 days) which can ensure output for every data joined.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文