kafka |确切的一方面消费者不止一次消耗消息
在我们的应用程序中,在生产者和消费者中都完全启用了。
生产者是python组件。我们已经启用了:
- iDempotence
- 使用交易(每次发送消息时每次使用新的TransActionID都会使用)
消费者是Spring Boot应用程序。我们已经启用了:
- read_comment隔离级别
- 消息
使用手册确认我们在ConfluentCloud上具有多分区Kafka主题(三个分区)的
。 我们的应用程序设计如下:
- 多个生产者应用程序实例
- ,我们有很多消费者应用程序实例(当前大约24个)
问题:
我们注意到有时在消费者中多次消耗相同的Kafka消息。我们通过使用以下消费者代码来检测到这一点。我们将先前消耗的KAFKA消息ID(带有偏移)保留在Redis中,并将其与新消费的消息进行比较。
消费者代码:
@KafkaListener(topics = "${datalake.datasetevents.topic}", groupId = "${spring.kafka.consumer.group-id}")
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.OFFSET) String offset,
@Payload InputEvent inputEvent, Acknowledgment acknowledgment) {
//KafkaHeaders.
Event event = new Event();
event.setCorrId(inputEvent.getCorrId());
event.setQn(inputEvent.getQn());
event.setCreatedTs(new Date());
event.setEventTs(inputEvent.getEventTs());
event.setMeta(inputEvent.getMeta() != null ? inputEvent.getMeta(): new HashMap<>());
event.setType(inputEvent.getType());
event.setUlid(key);
//detect message duplications
try {
String eventRedisKey = "tg_e_d_" + key.toLowerCase();
String redisVal = offset;
String tmp = redisTemplateString.opsForValue().get(eventRedisKey);
if (tmp != null) {
dlkLogging.error("kafka_event_dup", "Event consumed more than once ulid:" + event.getUlid()+ " redis offset: "+tmp+ " event offset:"+offset);
redisTemplateString.delete(eventRedisKey);
}
redisTemplateString.opsForValue().set(eventRedisKey, redisVal, 30, TimeUnit.SECONDS);
} catch (Exception e) {
dlkLogging.error("kafka_consumer_redis","Redis error at kafka consumere ", e);
}
//process the message and ack
try {
eventService.saveEvent(persistEvent, event);
ack.acknowledge();
} catch (Exception ee) {
//Refer : https://stackoverflow.com/questions/62413270/kafka-what-is-the-point-of-using-acknowledgment-nack-if-i-can-simply-not-ack
ack.nack(1);
dlkLogging.error("event_sink_error","error sinking kafka event.Will retry", ee);
}
}
行为: 我们注意到“ Kafka_event_dup”每天发送几次。
错误消息:事件不止一次消耗 ULID:01G77G8KNTSM2Q01SB1MK60BTH REDIS偏移:659238事件 偏移:659238
问题: 即使我们已经在生产者和消费者中都完全配置了一致,为什么消费者也会阅读相同的消息?
Update :阅读了几篇SO帖子后,即使已配置了确切的一致性,我们仍然需要在消费者方面实现重复数据删除逻辑?
附加信息:
消费者配置:
public DefaultKafkaConsumerFactory kafkaDatasetEventConsumerFactory(KafkaProperties properties) {
Map<String, Object> props = properties.buildConsumerProperties();
props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, CustomJsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.fr.det.datalake.eventdriven.model.kafka.InputEvent");
return new DefaultKafkaConsumerFactory(props);
}
生产者代码(Python):
def __get_producer(self):
conf = {
'bootstrap.servers': self.server,
'enable.idempotence': True,
'acks': 'all',
'retry.backoff.ms': self.sleep_seconds * 100
}
if self.sasl_mechanism:
conf['sasl.mechanisms'] = self.sasl_mechanism
if self.security_protocol:
conf['security.protocol'] = self.security_protocol
if self.sasl_username:
conf['sasl.username'] = self.sasl_username
if self.sasl_username:
conf['sasl.password'] = self.sasl_password
if self.transaction_prefix:
conf['transactional.id'] = self.__get_transaction_id()
producer = Producer(conf)
return producer
@_retry_on_error
def send_messages(self, messages, *args, **kwargs):
ts = time.time()
producer = kwargs.get('producer', None)
if producer is not None:
for message in messages:
key = message.get('key', str(ulid.from_timestamp(ts)))
value = message.get('value', None)
topic = message.get('topic', self.topic)
producer.produce(topic=topic,
value=value,
key=key,
on_delivery=self.acked)
producer.commit_transaction(30)
def _retry_on_error(func, *args, **kwargs):
def inner(self, messages, *args, **kwargs):
attempts = 0
while True:
attempts += 1
sleep_time = attempts * self.sleep_seconds
try:
producer = self.__get_producer()
self.logger.info(f"Producer: {producer}, Attempt: {attempts}")
producer.init_transactions(30)
producer.begin_transaction()
res = func(self, messages, *args, producer=producer, **kwargs)
return res
except KafkaException as e:
if attempts <= self.retry_count:
if e.args[0].txn_requires_abort():
producer.abort_transaction(30)
time.sleep(sleep_time)
continue
self.logger.error(str(e), exc_info=True, extra=extra)
break
return inner
In our applications have enabled exactly-once in both Producer and Consumer.
Producer is a python component.We have enabled:
- idempotence
- use transactions (new transactionId is used every time when we send messages)
Consumer is a Spring Boot application. We have enabled:
- read_committed isolation level
- use manual acknowledgement for messages
We have multi-partition Kafka topic (lets say 3 partitions) on ConfluentCloud.
Our application design is as follows:
- multiple Producer app instances
- for performance ,we have lots of Consumer app instances (currently around 24)
Problem:
We noticed that sometimes the same Kafka message is consumed more than once in the Consumer.We detected this by using following consumer code. We keep the previously consumed kafka message Id (with offset) in Redis and compare them with newly consumed message.
Consumer code:
@KafkaListener(topics = "${datalake.datasetevents.topic}", groupId = "${spring.kafka.consumer.group-id}")
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.OFFSET) String offset,
@Payload InputEvent inputEvent, Acknowledgment acknowledgment) {
//KafkaHeaders.
Event event = new Event();
event.setCorrId(inputEvent.getCorrId());
event.setQn(inputEvent.getQn());
event.setCreatedTs(new Date());
event.setEventTs(inputEvent.getEventTs());
event.setMeta(inputEvent.getMeta() != null ? inputEvent.getMeta(): new HashMap<>());
event.setType(inputEvent.getType());
event.setUlid(key);
//detect message duplications
try {
String eventRedisKey = "tg_e_d_" + key.toLowerCase();
String redisVal = offset;
String tmp = redisTemplateString.opsForValue().get(eventRedisKey);
if (tmp != null) {
dlkLogging.error("kafka_event_dup", "Event consumed more than once ulid:" + event.getUlid()+ " redis offset: "+tmp+ " event offset:"+offset);
redisTemplateString.delete(eventRedisKey);
}
redisTemplateString.opsForValue().set(eventRedisKey, redisVal, 30, TimeUnit.SECONDS);
} catch (Exception e) {
dlkLogging.error("kafka_consumer_redis","Redis error at kafka consumere ", e);
}
//process the message and ack
try {
eventService.saveEvent(persistEvent, event);
ack.acknowledge();
} catch (Exception ee) {
//Refer : https://stackoverflow.com/questions/62413270/kafka-what-is-the-point-of-using-acknowledgment-nack-if-i-can-simply-not-ack
ack.nack(1);
dlkLogging.error("event_sink_error","error sinking kafka event.Will retry", ee);
}
}
Behavior:
We notice the "kafka_event_dup" is sent several times per day.
Error Message: Event consumed more than once
ulid:01G77G8KNTSM2Q01SB1MK60BTH redis offset: 659238 event
offset:659238
Question:
Why consumer read the same message even though we have configured exactly-once in both Producer and Consumer?
Update : After reading several SO posts, seems we still need to implement deduplication logic in Consumer side even though exactly-once is configured?
Additional Info:
Consumer configuration:
public DefaultKafkaConsumerFactory kafkaDatasetEventConsumerFactory(KafkaProperties properties) {
Map<String, Object> props = properties.buildConsumerProperties();
props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, CustomJsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.fr.det.datalake.eventdriven.model.kafka.InputEvent");
return new DefaultKafkaConsumerFactory(props);
}
Producer code (python):
def __get_producer(self):
conf = {
'bootstrap.servers': self.server,
'enable.idempotence': True,
'acks': 'all',
'retry.backoff.ms': self.sleep_seconds * 100
}
if self.sasl_mechanism:
conf['sasl.mechanisms'] = self.sasl_mechanism
if self.security_protocol:
conf['security.protocol'] = self.security_protocol
if self.sasl_username:
conf['sasl.username'] = self.sasl_username
if self.sasl_username:
conf['sasl.password'] = self.sasl_password
if self.transaction_prefix:
conf['transactional.id'] = self.__get_transaction_id()
producer = Producer(conf)
return producer
@_retry_on_error
def send_messages(self, messages, *args, **kwargs):
ts = time.time()
producer = kwargs.get('producer', None)
if producer is not None:
for message in messages:
key = message.get('key', str(ulid.from_timestamp(ts)))
value = message.get('value', None)
topic = message.get('topic', self.topic)
producer.produce(topic=topic,
value=value,
key=key,
on_delivery=self.acked)
producer.commit_transaction(30)
def _retry_on_error(func, *args, **kwargs):
def inner(self, messages, *args, **kwargs):
attempts = 0
while True:
attempts += 1
sleep_time = attempts * self.sleep_seconds
try:
producer = self.__get_producer()
self.logger.info(f"Producer: {producer}, Attempt: {attempts}")
producer.init_transactions(30)
producer.begin_transaction()
res = func(self, messages, *args, producer=producer, **kwargs)
return res
except KafkaException as e:
if attempts <= self.retry_count:
if e.args[0].txn_requires_abort():
producer.abort_transaction(30)
time.sleep(sleep_time)
continue
self.logger.error(str(e), exc_info=True, extra=extra)
break
return inner
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
Kafka恰好是Kafka-streams功能,尽管它也可以与常规消费者和生产商一起使用。
完全只能在您的应用程序仅与Kafka交互的情况下实现:没有XA和其他类型的技术分布式交易,可以使Kafka消费者能够在确切的一方面与其他一些存储(例如REDIS)进行交互方式。
在分布式世界中,我们必须承认这是不可取的,因为它引入了锁定,争夺和指数下降的效果。如果我们不需要进入分布式世界,那么我们不需要卡夫卡,而且很多事情变得更加容易。
KAFKA中的交易本来可以在一个仅与Kafka交互的应用程序中使用,它可以保证您的应用程序将从某些主题分区中读取,2)在其他主题分区中写入一些结果,3)提交阅读与1相关的偏移,或者没有做任何事情。如果以这样的方式将几个应用程序背靠背并通过Kafka进行交互,那么如果您非常小心,您 可以准确地实现一次。如果您的消费者需要4)与REDIS互动5)与其他一些存储互动或在某处做一些副作用(例如发送电子邮件左右),那么通常无法执行步骤1,2,3,4, 5在原子上作为分布式应用程序的一部分。您 can 使用其他存储技术(是的,Kafka本质上是存储)实现此类方法,但是它们不能分发,并且您的应用程序也无法分发。从本质上讲,这就是帽定理告诉我们的。
这也是为什么确切的一致性本质上是Kafka-streams的内容:Kafka流只是Kafka消费者/生产者客户端的智能包装器,以免您构建仅与Kafka交互的应用程序。
您还可以通过其他数据处理框架(例如Spark Streaming或Flink)实现准确的流媒体流处理。
在实践中,通常不必为交易而不是在消费者中脱颖而出要简单得多。您可以保证,最大的消费者组的一个消费者在任何时间点都连接到每个分区,因此重复始终会在您的应用程序的同一实例中发生(直到它重新计算),并且取决于您的配置,复制通常仅在一个Kafka消费者缓冲区中发生,因此您无需在消费者中存储太多状态即可删除。如果您使用某种仅能增加的事件ID(本质上是Kafka偏移量是BTW的,这不是巧合),那么您只需要保持应用程序的每个实例的状态-ID您已成功处理的每个分区。
Kafka exactly-once is essentially a Kafka-Streams feature, although it can be used with regular consumer and producers as well.
Exactly once can only be achieved in a context where your applications are only interacting with Kafka: there is no XA nor other kind of distributed transactions across technologies that would enable a Kafka consumer interact with some other storage (like Redis) in an exactly-once manner.
In a distributed world, we have to acknowledge that is not desirable, since it introduce locking, contention, and exponentially degrading performance under load. If we don't need to be in a distributed world, then we don't need Kafka and many things become easier.
Transactions in Kafka are meant to be used within one application that is only interacting with Kafka, it lets you guarantee that the app will 1) read from some topic partitions, 2) write some result in some other topic partitions and 3) commit the read offsets related to 1, or do none of those things. If several apps are put back-to-back and interacting through Kafka in such manner, then you could achieve exactly once if you're very careful. If your consumer needs to 4) interact with Redis 5) interact with some other storage or do some side effect somewhere (like sending an email or so), then there is in general no way to perform steps 1,2,3,4,5 atomically as part of a distributed application. You can achieve this kind of things with other storage technologies (yes, Kafka is essentially a storage), but they cannot be distributed and your application cannot either. That's essentially what the CAP theorem tells us.
That's also why exactly-once is essentially a Kafka-streams stuff: Kafka Stream is just a smart wrapper around the Kafka consumer/producer clients that lest you build applications that interact only with Kafka.
You can also achieve exactly-once streaming processing with other data-processing framework, like Spark Streaming or Flink.
In practice it's often much simpler to not bother with transactions and just de-duplicate in the consumer. You have the guarantee that at max one consumer of the consumer group is connected to each partition at any point in time, so duplicates will always happen in the same instance of your app (until it re-scales), and, depending on your config, the duplication should typically only happen within one single Kafka consumer buffer, so you don't need to store much state in your consumer to de-duplicate. If you use some kind of event-ids that can only increase for example (which is essentially what the Kafka offset is BTW, and it's no coincidence), then you just need to keep in the state of each instance of your app the maximum event-id per partition that you've successfully processed.
我可以看到您已将ENABLE_AUTO_COMMIT_CONFIG设置为False,这意味着您已经准备了手动提交过程。如果我们不承担有效阅读的消息的偏移,那么我们将最终处理重复的消息。
从 https://wwwww.baeldung.com/kafka-exactly-once--cacely-once < /a>
另外,用于处理。保证:恰好_once以下参数,您无需明确设置。
I can see you have set ENABLE_AUTO_COMMIT_CONFIG to false, that means you are having a manual commit process in place. If we are not committing the offset of the messages read efficiently, then we will end up in processing duplicate messages.
Kindly refer session from 4.6 from https://www.baeldung.com/kafka-exactly-once
Also, for processing.guarantee : exactly_once the following parameters you no need to set explicitly.