在Flink中连接多个表格时,结果不一致
我们有4个定义的CDC来源,我们需要将数据组合到一个结果表中。我们使用SQL API为每个源创建一个表:例如:
"CREATE TABLE IF NOT EXISTS PAA31 (\n" +
" WRK_SDL_DEF_NO STRING,\n" +
" HTR_FROM_DT BIGINT,\n" +
...
" update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n" +
" PRIMARY KEY (WRK_SDL_DEF_NO) NOT ENFORCED,\n" +
" WATERMARK FOR update_time AS update_time\n" +
") WITH ('value.format' = 'debezium-json' ... )";
在定义每个表之后,我们通过运行以下查询创建一个新表:
"SELECT PAA30.WRK_SDL_DEF_NO as id,\n" +
" PAA33.DSB_TX as description,\n" +
...
"FROM PAA30\n" +
"INNER JOIN PAA33 ON PAA30.WRK_SDL_DEF_NO = PAA33.WRK_SDL_DEF_NO AND PAA33.LGG_CD = 'NL' \n" +
"INNER JOIN PAA31 ON PAA30.WRK_SDL_DEF_NO = PAA31.WRK_SDL_DEF_NO \n" +
"INNER JOIN PAA32 ON PAA30.WRK_SDL_DEF_NO = PAA32.WRK_SDL_DEF_NO";
请注意,出于格式化原因,已将一些行被排除在外。
我们遇到的问题是,执行此确切的作业会导致不一致的结果,其中有时我们有1750个产生的行(正确),但是大多数情况下,最终的行较少和随机。
这是Flink工作的计划概述。从源发送的记录数量都是正确的,但是第一个加入语句发送的记录量不是:
flink flink opercution执行计划和数字
是什么原因以及我们如何保持一致的所有数据源?
We've 4 CDC sources defined of which we need to combine the data into one result table. We're creating a table for each source using the SQL API, eg:
"CREATE TABLE IF NOT EXISTS PAA31 (\n" +
" WRK_SDL_DEF_NO STRING,\n" +
" HTR_FROM_DT BIGINT,\n" +
...
" update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n" +
" PRIMARY KEY (WRK_SDL_DEF_NO) NOT ENFORCED,\n" +
" WATERMARK FOR update_time AS update_time\n" +
") WITH ('value.format' = 'debezium-json' ... )";
After we've defined each table, we create a new table by running the following query:
"SELECT PAA30.WRK_SDL_DEF_NO as id,\n" +
" PAA33.DSB_TX as description,\n" +
...
"FROM PAA30\n" +
"INNER JOIN PAA33 ON PAA30.WRK_SDL_DEF_NO = PAA33.WRK_SDL_DEF_NO AND PAA33.LGG_CD = 'NL' \n" +
"INNER JOIN PAA31 ON PAA30.WRK_SDL_DEF_NO = PAA31.WRK_SDL_DEF_NO \n" +
"INNER JOIN PAA32 ON PAA30.WRK_SDL_DEF_NO = PAA32.WRK_SDL_DEF_NO";
Note some rows have been left out for formatting reasons.
The issue we're running into is that executing this exact job results in inconsistent outcomes where sometimes we have 1750 resulting rows (correct), however most of the times the resulting rows is less and random.
This is the plan overview for the job in Flink. The amount of records sent from the sources are all correct, however the amount of records sent of the 1st join statement is not:
Flink Job Execution Plan and numbers
What could be the cause and how can we have consistent joining of all data sources?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我看到您的管道包含一个带有处理时间触发器的事件时间窗口,并且对零顺序事件的零公差进行水印。这些可能导致问题。
Flink只能为流媒体工作负载产生完全正确的确定性结果,如果没有较晚事件,则涉及事件时间逻辑。每当处理时间逻辑干扰水印时,
,就猜测了。将需要查看更多细节,以提供更明智的答案。最小,可再现的例子是理想的。
更新:
流媒体作业也不会散发出他们的最后一组结果,除非采取了一些挑衅。在这种情况下,例如,您可以用来
迫使发出大型水印,以关闭最后一个窗口。
更新2:
常规加入不会产生时间属性或水印的结果。这是因为不可能保证将以任何特定的顺序排放结果,因此不可能有意义的水印。通常,在加入之后,不可能应用事件时间窗口。
更新3:
现在研究了最新代码,这显然与水印无关。
如果我正确理解的话,问题是,尽管结果始终包括应生产的内容,但额外的输出记录数量有所不同。我可以提出两个可能的原因:
(1)当flink与Debezium Server一起使用时,有可能重复的事件。我认为这不是解释,但这是值得注意的。
(2)联接的结果是非确定性的(随着运行而变化)。之所以发生这种情况,是因为各种输入流相互竞争,而摄入不同流的相关事件的确切顺序正在影响结果的产生方式。
联接的结果是一个变形流。我怀疑当结果完美时,不会发生缩回,而在其他情况下,产生了一些初步结果,这些结果后来进行了更新。
如果您检查输出流中的ROW_KIND信息,则应该能够确认此猜测是否正确。
我对Pulsar连接器并不熟悉,但我猜您应该使用UPSERT_PULSAR接收器。
I see that your pipeline includes an event time window with a processing time trigger, and does watermarking with zero tolerance for out-of-order events. These could be causing problems.
Flink can only produce completely correct, deterministic results for streaming workloads that involve event time logic if there are no late events. Late events can occur whenever processing time logic interferes with the watermarking, e.g.,
Just guessing, however. Would need to see more details to give a more informed answer. A minimal, reproducible example would be ideal.
Update:
It's also the case the streaming jobs won't emit their last set of results unless something is done to provoke them to do so. In this case you could, for example, use
to force a large watermark to be emitted that will close the last window.
Update 2:
Regular joins don't produce results with time attributes or watermarks. This is because it's impossible to guarantee that the results will be emitted in any particular order, so meaningful watermarking isn't possible. Normally it's not possible to apply event time windowing after such a join.
Update 3:
Having now studied the latest code, this obviously doesn't have anything to do with Watermarks.
If I understand correctly, the issue is that while the results always include what should be produced, there are varying numbers of additional output records. I can suggest two possible causes:
(1) When Flink is used with Debezium server there's the possibility of duplicate events. I don't think this is the explanation, but it is something to be aware of.
(2) The result of the join is non-deterministic (it varies from run to run). This is happening because the various input streams are racing against each other, and the exact order in which related events from different streams are ingested is affecting how the results are produced.
The result of the join is a changelog stream. I suspect that when the results are perfect, no retractions occurred, while in the other cases some preliminary results are produced that are later updated.
If you examine the ROW_KIND information in the output stream you should be able to confirm if this guess is correct.
I'm not very familiar with the pulsar connector, but I'm guessing you should be using the upsert_pulsar sink.
通过启用 minibatch聚合
这似乎可以解决本地文件系统连接器以及Flink Pulsar Connector的一致性问题。
从这些发现中,Flink似乎在我们的吞吐量中遇到了州管理的开销问题。我们仍然需要评估现实的CDC初始负载处理,但是到目前为止,Minibatch聚合似乎有望
感谢 @David-Anderson与我们一起思考并试图弄清楚这一点。
We've been able to get consistent results, even for bigger datasets, by enabling MiniBatch Aggregation
This seems to fix the consistency issue for both the local filesystem connector as well as for the Flink Pulsar connector.
From these findings, it seems Flink was having issues with the overhead of state management for our throughput. We'll still need to assess realistic CDC initial load processing, but so far enabling MiniBatch Aggregation seems promising
Thanks @david-anderson for thinking with us and trying to figure this out.