从kafka阅读时,请使用keyby vs reinterpretaskeyedstream()

发布于 2025-02-08 00:14:27 字数 2039 浏览 3 评论 0原文

我有一个简单的Flink流处理应用程序(Flink版本1.13)。 Flink应用程序读取了Kakfa,对记录进行了陈述的处理,然后将结果写回Kafka。 从Kafka主题阅读后,我选择使用retinterpretaSkeyedStream()而不是keyby()避免进行混乱,因为记录已经在kakfa中进行了分区。用于分区的键是记录的字符串字段(使用默认的kafka分区器)。 Kafka主题有24个分区

映射类定义如下。它跟踪记录的状态。

public class EnvelopeMapper extends
        KeyedProcessFunction<String, Envelope, Envelope> {
   ...
}

记录的处理如下:

        DataStream<Envelope> messageStream =
                env.addSource(kafkaSource)

        DataStreamUtils.reinterpretAsKeyedStream(messageStream, Envelope::getId)
                .process(new EnvelopeMapper(parameters))
                .addSink(kafkaSink);

平行性为1,代码运行正常。随着平行性大于1(例如4),我正在遇到以下错误:

2022-06-12 21:06:30,720 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Map -> Flat Map -> KeyedProcess -> Map -> Sink: Unnamed (4/4) (7ca12ec043a45e1436f45d4b20976bd7) switched from RUNNING to FAILED on 100.101.231.222:44685-bd10d5 @ 100.101.231.222 (dataPort=37839).
java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=96, endKeyGroup=127} does not contain key group 85

基于堆栈跟踪,当explovemapper class class的验证记录已将记录发送到正确的复制品时,似乎发生了例外。映射器对象。

当使用reinterpretAskeyedStream()时,记录如何在eventMapper的不同副本中分布?

先感谢您, 艾哈迈德。

更新

在@david Anderson的反馈后 ,替换为reinterpretaSkeyedStream(),用keyby()。记录的处理如下如下:

        DataStream<Envelope> messageStream =
                env.addSource(kafkaSource)      // Line x
                   .map(statelessMapper1)
                   .flatMap(statelessMapper2);

        messageStream.keyBy(Envelope::getId)
                     .process(new EnvelopeMapper(parameters))
                     .addSink(kafkaSink);

如果keyby()在从kakfa阅读后立即完成(用“行x”标记)而在状态映射之前立即完成性能((<)代码> emplowemapper )。

I have a simple Flink stream processing application (Flink version 1.13). The Flink app reads from Kakfa, does stateful processing of the record, then writes the result back to Kafka.
After reading from Kafka topic, I choose to use reinterpretAsKeyedStream() and not keyBy() to avoid a shuffle, since the records are already partitioned in Kakfa. The key used to partition in Kakfa is a String field of the record (using the default kafka partitioner). The Kafka topic has 24 partitions.

The mapping class is defined as follows. It keeps track of the state of the record.

public class EnvelopeMapper extends
        KeyedProcessFunction<String, Envelope, Envelope> {
   ...
}

The processing of the record is as follows:

        DataStream<Envelope> messageStream =
                env.addSource(kafkaSource)

        DataStreamUtils.reinterpretAsKeyedStream(messageStream, Envelope::getId)
                .process(new EnvelopeMapper(parameters))
                .addSink(kafkaSink);

With parallelism of 1, the code runs fine. With parallelism greater than 1 (e.g. 4), I am running into the follow error:

2022-06-12 21:06:30,720 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Map -> Flat Map -> KeyedProcess -> Map -> Sink: Unnamed (4/4) (7ca12ec043a45e1436f45d4b20976bd7) switched from RUNNING to FAILED on 100.101.231.222:44685-bd10d5 @ 100.101.231.222 (dataPort=37839).
java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=96, endKeyGroup=127} does not contain key group 85

Based on the stack trace, it seems the exception happens when EnvelopeMapper class validates the record is sent to the right replica of the mapper object.

When reinterpretAsKeyedStream() is used, how are the records distributed among the different replicas of the EventMapper?

Thank you in advance,
Ahmed.

Update

After feedback from @David Anderson, replaced reinterpretAsKeyedStream() with keyBy(). The processing of the record is now as follows:

        DataStream<Envelope> messageStream =
                env.addSource(kafkaSource)      // Line x
                   .map(statelessMapper1)
                   .flatMap(statelessMapper2);

        messageStream.keyBy(Envelope::getId)
                     .process(new EnvelopeMapper(parameters))
                     .addSink(kafkaSink);

Is there any difference in performance if keyBy() is done right after reading from Kakfa (marked with "Line x") vs right before the stateful Mapper (EnvelopeMapper).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

仙女 2025-02-15 00:14:27

reinterpretAsKeyedStream(
    DataStream<T> stream,
    KeySelector<T, K> keySelector,
    TypeInformation<K> typeInfo)

您的情况下,声称记录已经按照您使用的keyby(keyselector)完全分布。通常情况下,这种记录直接从卡夫卡出来。即使它们是由Kafka中的Key划分的,Kafka分区也不会与Flink的主要组正确相关。

reinterpretaSkeyedStream仅在处理窗口或过程函数的输出之类的情况下直接有用,在此情况下,您知道输出记录是以特定方式分区的。在Kafka上成功使用它可能非常困难:首先,您必须非常谨慎地将数据写入Kafka,或者使用KeySelector做一些棘手的事情,以使其计算的密钥组与键如何与键保持一致。被映射到卡夫卡分区。

一种情况并不困难的一种情况是,是否通过与正在读取数据并使用retinterpretaskeyedstream的下游作业相同的配置来写入Kafka。

With

reinterpretAsKeyedStream(
    DataStream<T> stream,
    KeySelector<T, K> keySelector,
    TypeInformation<K> typeInfo)

you are asserting that the records are already distributed exactly as they would be if you had instead used keyBy(keySelector). This will not normally be the case with records coming straight out of Kafka. Even if they are partitioned by key in Kafka, the Kafka partitions won't be correctly associated with Flink's key groups.

reinterpretAsKeyedStream is only straightforwardly useful in cases such as handling the output of a window or process function where you know that the output records are key partitioned in a particular way. To use it successfully with Kafka is can be very difficult: you must either be very careful in how the data is written to Kafka in the first place, or do something tricky with the keySelector so that the keyGroups it computes line up with how the keys are mapped to Kafka partitions.

One case where this isn't difficult is if the data is written to Kafka by a Flink job running with the same configuration as the downstream job that is reading the data and using reinterpretAsKeyedStream.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文