FLINK -KAFKA连接器恰好一次错误
我正在使用Flink 1.15.0。 For Kafka integration I wrote
KafkaSource:
public static KafkaSource<String> kafkaSource(String bootstrapServers, String topic, String groupId) {
return KafkaSource.<String>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(topic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
and KafkaSink:
public static KafkaSink<WordCountPojo> kafkaSink(String brokers, String topic, Properties producerProperties) {
return KafkaSink.<WordCountPojo>builder()
.setKafkaProducerConfig(producerProperties)
.setBootstrapServers(brokers)
.setRecordSerializer(new WordCountPojoSerializer(topic))
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix(UUID.randomUUID().toString())
.build();
}
for completeness this is my custom serializer
public class WordCountPojoSerializer implements KafkaRecordSerializationSchema<WordCountPojo> {
private String topic;
private ObjectMapper mapper;
public WordCountPojoSerializer(String topic) {
this.topic = topic;
mapper = new ObjectMapper();
}
@Override
public ProducerRecord<byte[], byte[]> serialize(WordCountPojo wordCountPojo, KafkaSinkContext kafkaSinkContext, Long timestamp) {
try {
byte[] serializedValue = mapper.writeValueAsBytes(wordCountPojo);
return new ProducerRecord<>(topic, null, timestamp,null, serializedValue);
} catch (JsonProcessingException e) {
return null;
}
}
}
Property transaction.timeout.ms
is set to 60_000ms (1 minute)
My application run for a while, then, suddenly, stop除了
Caused by: org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
生产者似乎试图在两项交易之间产生记录,但我不确定。 我可以将Transaction.mms增加到最大900_000(15分钟),但我认为这可能无法解决问题。
你能帮我吗?
I'm using Flink 1.15.0. For Kafka integration I wrote
KafkaSource:
public static KafkaSource<String> kafkaSource(String bootstrapServers, String topic, String groupId) {
return KafkaSource.<String>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(topic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
and KafkaSink:
public static KafkaSink<WordCountPojo> kafkaSink(String brokers, String topic, Properties producerProperties) {
return KafkaSink.<WordCountPojo>builder()
.setKafkaProducerConfig(producerProperties)
.setBootstrapServers(brokers)
.setRecordSerializer(new WordCountPojoSerializer(topic))
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix(UUID.randomUUID().toString())
.build();
}
for completeness this is my custom serializer
public class WordCountPojoSerializer implements KafkaRecordSerializationSchema<WordCountPojo> {
private String topic;
private ObjectMapper mapper;
public WordCountPojoSerializer(String topic) {
this.topic = topic;
mapper = new ObjectMapper();
}
@Override
public ProducerRecord<byte[], byte[]> serialize(WordCountPojo wordCountPojo, KafkaSinkContext kafkaSinkContext, Long timestamp) {
try {
byte[] serializedValue = mapper.writeValueAsBytes(wordCountPojo);
return new ProducerRecord<>(topic, null, timestamp,null, serializedValue);
} catch (JsonProcessingException e) {
return null;
}
}
}
Property transaction.timeout.ms
is set to 60_000ms (1 minute)
My application run for a while, then, suddenly, stop with the exception
Caused by: org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
It seems that producer tries to produce record between two transaction, but I'm not sure.
I could increase transaction.timeout.ms to max 900_000 (15 minute) but I don't think this may solve the problem.
Can you help me please?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论