Spring Kafka Consumer 跳过一些偏移量

发布于 2025-01-10 23:10:13 字数 1959 浏览 0 评论 0原文

我们有一个非常简单的 Kafka Consumer(v 2.6.2)。 它是消费者组中唯一的消费者,并且该组是唯一从主题读取的消费者(有 6 个分区,其中约有 300 万个事件)。 Broker 也是版本 2.6.x

由于我们需要实现“excatly-once” 场景,因此我们深入研究了是否真的一次性消耗写入主题的每个事件。 不幸的是,我们发现:消费者有时会跳过一个偏移量,有时甚至会跳过分区的一堆偏移量。

消费者除了记录日志之外,并没有做更多的事情。 错误处理程序配置有日志记录。

@KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${topicname}")
public void onMessage(ConsumerRecord<String, BlaBlaEvent> record) throws Exception {
  // mostly logging
}

它会记录:(

consuming Offset=30 Partition=2
consuming Offset=31 Partition=2
consuming Offset=32 Partition=2
consuming Offset=67 Partition=2

您看到 Offset=67 了吗?)

这是我们的 application.properties

spring.kafka.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS_URL}
spring.kafka.properties.security.protocol: SSL
spring.kafka.properties.schema.registry.url=${SCHEMA_REGISTRY_URL}
spring.kafka.properties.specific.avro.reader=true

spring.kafka.ssl.trust-store-type=PKCS12
spring.kafka.ssl.key-store-type=PKCS12
spring.kafka.ssl.key-store-password=${KEYSTORE_PASSWORD}
spring.kafka.ssl.trust-store-password=${TRUSTSTORE_PASSWORD}
spring.kafka.ssl.trust-store-location=${TRUSTSTORE_PATH}
spring.kafka.ssl.key-store-location=${KEYSTORE_PATH}

spring.kafka.consumer.client-id=BlablaNotificationReader
spring.kafka.consumer.group-id=blabla-group
spring.kafka.consumer.auto-offset-reset=none
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

logging.level.org.apache.kafka=DEBUG

旁注:

  • Consumer 在 OpenShift v3.11 (Kubernetes 1.11) 中运行。当时没有 Pod 重新启动或发生任何事情。
  • 代理端和消费者端的日志都没有错误。只是默默地跳过。
  • 它大约每周发生一次

有人知道如何找到这个问题的根源吗?

We have a very simple Kafka Consumer (v 2.6.2).
It is the only consumer in a consumer-group, and this group is the only one reading from a topic (with 6 partitions, and some 3 million events in it).
Broker is also version 2.6.x

As we need to fulfil a "excatly-once" scenario, we took a deep look, if we really consume every event exactly once that is written to the topic.
Unfortunately we found out that: The consumer sometimes skips an offset, and sometimes even a bunch of offsets of a partition.

The Consumer does not do much more than logging.
An Error-Handler is configured with logging.

@KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${topicname}")
public void onMessage(ConsumerRecord<String, BlaBlaEvent> record) throws Exception {
  // mostly logging
}

And it logs:

consuming Offset=30 Partition=2
consuming Offset=31 Partition=2
consuming Offset=32 Partition=2
consuming Offset=67 Partition=2

(did you see the Offset=67?)

Here's our application.properties

spring.kafka.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS_URL}
spring.kafka.properties.security.protocol: SSL
spring.kafka.properties.schema.registry.url=${SCHEMA_REGISTRY_URL}
spring.kafka.properties.specific.avro.reader=true

spring.kafka.ssl.trust-store-type=PKCS12
spring.kafka.ssl.key-store-type=PKCS12
spring.kafka.ssl.key-store-password=${KEYSTORE_PASSWORD}
spring.kafka.ssl.trust-store-password=${TRUSTSTORE_PASSWORD}
spring.kafka.ssl.trust-store-location=${TRUSTSTORE_PATH}
spring.kafka.ssl.key-store-location=${KEYSTORE_PATH}

spring.kafka.consumer.client-id=BlablaNotificationReader
spring.kafka.consumer.group-id=blabla-group
spring.kafka.consumer.auto-offset-reset=none
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

logging.level.org.apache.kafka=DEBUG

Sidenotes:

  • Consumer runs in OpenShift v3.11 (Kubernetes 1.11). No pod restart or anything happened at that time.
  • No errors in the log on broker-side, nor on consumer-side. just silent skipping.
  • It happens about once a week

Does anyone have an idea how to find the root of this problem?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

很快妥协 2025-01-17 23:10:13

由于您正在寻找一次性语义,因此您可能拥有启用事务的生产者 - 如果是这样,则可能是由于分区具有事务记录。

KafkaConsumer 文档 指出:

具有事务消息的分区将包含指示事务结果的提交或中止标记。这些标记不会返回到应用程序,但在日志中具有偏移量。 因此,从具有事务性消息的主题中读取的应用程序将看到消耗的偏移量中的间隙。这些丢失的消息将是事务标记,并且它们会被两个隔离级别的消费者过滤掉。此外,使用 read_commissed 消费者的应用程序也可能会看到由于事务中止而导致的间隙,因为这些消息不会由消费者返回,但仍具有有效的偏移量。

我看到的唯一方法实际上如果消费者在一条或多条由于某种原因未得到处理的记录中或之后提交了偏移量,则会错过记录。

如果您想了解有关从代理轮询的记录和提交的偏移量的更多信息,可以将 org.springframework.kafka.listener.KafkaMessageListenerContainer 类的日志记录级别设置为 TRACE< /代码>。这应该可以让您很好地了解收到的记录和提交的抵消。如果所有偏移量编辑:在提交的偏移量之前都已被处理,那么您不太可能丢失任何偏移量。

为了检查“exactly-once”语义是否正确,也许您可​​以尝试通过某个 id 将生产者的日志与消费者的日志关联起来,特别是当此事件发生时。

您可能还想检查发生这种情况时的生产者日志,因为间隙可能是由于事务中止造成的。

Since you're looking for exactly-once semantics, you probably have transaction-enabled producers - if so, it might be due to the partitions having transactional records.

The KafkaConsumer documentation states:

Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction. There markers are not returned to applications, yet have an offset in the log. As a result, applications reading from topics with transactional messages will see gaps in the consumed offsets. These missing messages would be the transaction markers, and they are filtered out for consumers in both isolation levels. Additionally, applications using read_committed consumers may also see gaps due to aborted transactions, since those messages would not be returned by the consumer and yet would have valid offsets.

The only way I see to actually miss a record would be if the consumer committed an offset for or after a record / records that didn't get processed for some reason.

If you want more information about the records being polled from the broker and the offsets being committed, you can set the logging level for the org.springframework.kafka.listener.KafkaMessageListenerContainer class to TRACE. That should give you a good insight on the records being received and the offsets being committed. If all offsets EDIT: before the one being committed have been processed, it's not likely you're missing any.

In order to check if the exactly-once semantics are correct, maybe you can try to correlate the logs from your producers to the logs from your consumers through some id, specially when this event happens.

You might also want to check the producers' logs from when that happens, since the gap might be due to aborted transactions.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文