从kafka阅读时,请使用keyby vs reinterpretaskeyedstream()
我有一个简单的Flink流处理应用程序(Flink版本1.13)。 Flink应用程序读取了Kakfa,对记录进行了陈述的处理,然后将结果写回Kafka。 从Kafka主题阅读后,我选择使用retinterpretaSkeyedStream()
而不是keyby()
避免进行混乱,因为记录已经在kakfa中进行了分区。用于分区的键是记录的字符串字段(使用默认的kafka分区器)。 Kafka主题有24个分区。
映射类定义如下。它跟踪记录的状态。
public class EnvelopeMapper extends
KeyedProcessFunction<String, Envelope, Envelope> {
...
}
记录的处理如下:
DataStream<Envelope> messageStream =
env.addSource(kafkaSource)
DataStreamUtils.reinterpretAsKeyedStream(messageStream, Envelope::getId)
.process(new EnvelopeMapper(parameters))
.addSink(kafkaSink);
平行性为1,代码运行正常。随着平行性大于1(例如4),我正在遇到以下错误:
2022-06-12 21:06:30,720 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Map -> Flat Map -> KeyedProcess -> Map -> Sink: Unnamed (4/4) (7ca12ec043a45e1436f45d4b20976bd7) switched from RUNNING to FAILED on 100.101.231.222:44685-bd10d5 @ 100.101.231.222 (dataPort=37839).
java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=96, endKeyGroup=127} does not contain key group 85
基于堆栈跟踪,当explovemapper
class class的验证记录已将记录发送到正确的复制品时,似乎发生了例外。映射器对象。
当使用reinterpretAskeyedStream()
时,记录如何在eventMapper
的不同副本中分布?
先感谢您, 艾哈迈德。
更新
在@david Anderson的反馈后 ,替换为reinterpretaSkeyedStream()
,用keyby()
。记录的处理如下如下:
DataStream<Envelope> messageStream =
env.addSource(kafkaSource) // Line x
.map(statelessMapper1)
.flatMap(statelessMapper2);
messageStream.keyBy(Envelope::getId)
.process(new EnvelopeMapper(parameters))
.addSink(kafkaSink);
如果keyby()
在从kakfa阅读后立即完成(用“行x”标记)而在状态映射之前立即完成性能((<)代码> emplowemapper )。
I have a simple Flink stream processing application (Flink version 1.13). The Flink app reads from Kakfa, does stateful processing of the record, then writes the result back to Kafka.
After reading from Kafka topic, I choose to use reinterpretAsKeyedStream()
and not keyBy()
to avoid a shuffle, since the records are already partitioned in Kakfa. The key used to partition in Kakfa is a String field of the record (using the default kafka partitioner). The Kafka topic has 24 partitions.
The mapping class is defined as follows. It keeps track of the state of the record.
public class EnvelopeMapper extends
KeyedProcessFunction<String, Envelope, Envelope> {
...
}
The processing of the record is as follows:
DataStream<Envelope> messageStream =
env.addSource(kafkaSource)
DataStreamUtils.reinterpretAsKeyedStream(messageStream, Envelope::getId)
.process(new EnvelopeMapper(parameters))
.addSink(kafkaSink);
With parallelism of 1, the code runs fine. With parallelism greater than 1 (e.g. 4), I am running into the follow error:
2022-06-12 21:06:30,720 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Map -> Flat Map -> KeyedProcess -> Map -> Sink: Unnamed (4/4) (7ca12ec043a45e1436f45d4b20976bd7) switched from RUNNING to FAILED on 100.101.231.222:44685-bd10d5 @ 100.101.231.222 (dataPort=37839).
java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=96, endKeyGroup=127} does not contain key group 85
Based on the stack trace, it seems the exception happens when EnvelopeMapper
class validates the record is sent to the right replica of the mapper object.
When reinterpretAsKeyedStream()
is used, how are the records distributed among the different replicas of the EventMapper
?
Thank you in advance,
Ahmed.
Update
After feedback from @David Anderson, replaced reinterpretAsKeyedStream()
with keyBy()
. The processing of the record is now as follows:
DataStream<Envelope> messageStream =
env.addSource(kafkaSource) // Line x
.map(statelessMapper1)
.flatMap(statelessMapper2);
messageStream.keyBy(Envelope::getId)
.process(new EnvelopeMapper(parameters))
.addSink(kafkaSink);
Is there any difference in performance if keyBy()
is done right after reading from Kakfa (marked with "Line x") vs right before the stateful Mapper (EnvelopeMapper
).
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
在
您的情况下,声称记录已经按照您使用的
keyby(keyselector)
完全分布。通常情况下,这种记录直接从卡夫卡出来。即使它们是由Kafka中的Key划分的,Kafka分区也不会与Flink的主要组正确相关。reinterpretaSkeyedStream
仅在处理窗口或过程函数的输出之类的情况下直接有用,在此情况下,您知道输出记录是以特定方式分区的。在Kafka上成功使用它可能非常困难:首先,您必须非常谨慎地将数据写入Kafka,或者使用KeySelector做一些棘手的事情,以使其计算的密钥组与键如何与键保持一致。被映射到卡夫卡分区。一种情况并不困难的一种情况是,是否通过与正在读取数据并使用
retinterpretaskeyedstream
的下游作业相同的配置来写入Kafka。With
you are asserting that the records are already distributed exactly as they would be if you had instead used
keyBy(keySelector)
. This will not normally be the case with records coming straight out of Kafka. Even if they are partitioned by key in Kafka, the Kafka partitions won't be correctly associated with Flink's key groups.reinterpretAsKeyedStream
is only straightforwardly useful in cases such as handling the output of a window or process function where you know that the output records are key partitioned in a particular way. To use it successfully with Kafka is can be very difficult: you must either be very careful in how the data is written to Kafka in the first place, or do something tricky with the keySelector so that the keyGroups it computes line up with how the keys are mapped to Kafka partitions.One case where this isn't difficult is if the data is written to Kafka by a Flink job running with the same configuration as the downstream job that is reading the data and using
reinterpretAsKeyedStream
.