Consumer不会从topic中获取新记录,并且每次启动时总是指向相同的偏移位置

发布于 2025-01-11 07:10:09 字数 1486 浏览 0 评论 0原文

我正在使用 spring-kafka 2.3.7

问题 1: 系统中有两个消费者 (KafkaListeners)。当应用程序启动其中一个消费者时,总是指向相同的主题/分区/偏移量。这 可能是由于消费者重新平衡。

问题 2:(1) 之后,同一个 Consumer 也无法使用任何新的 Message发送到 Topic

LOG:

[tConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  [                                    ] : [Consumer clientId=consumer-1, groupId=enrollmentConsumer] Setting offset for partition bev3_enrollment_sync_topic_dev-1 to the committed offset 
FetchPosition{offset=79, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=b-1.cpsdmskclustertestenv.7hij1q.c2.kafka.us-east-2.amazonaws.com:9094 (id: 1 rack: use2-az1), epoch=3}}

[tConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  [                                    ] : [Consumer clientId=consumer-1, groupId=enrollmentConsumer] Setting offset for partition bev3_enrollment_sync_topic_dev-0 to the committed offset 
FetchPosition{offset=73, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=b-2.cpsdmskclustertestenv.7hij1q.c2.kafka.us-east-2.amazonaws.com:9094 (id: 2 rack: use2-az2), epoch=3}}

但是在更改 group-id 后问题就解决了。但我很好奇如何才能将旧的 Offset(s) 与旧的 group-id 一起提交。我正在使用 STCHACKMODE.MANUAL_IMMEDIATEoffset->LATEST 谢谢。

I am using spring-kafka 2.3.7

Problem 1: Two consumers (KafkaListeners) are in the system. When the application starts one of the Consumers always points to same Topic/Partition/OffSet(s). This
might be due to consumer re-balancing.

Problem 2: After (1) the same Consumer is also not being able to consume any new Message that is sent to the Topic

LOG:

[tConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  [                                    ] : [Consumer clientId=consumer-1, groupId=enrollmentConsumer] Setting offset for partition bev3_enrollment_sync_topic_dev-1 to the committed offset 
FetchPosition{offset=79, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=b-1.cpsdmskclustertestenv.7hij1q.c2.kafka.us-east-2.amazonaws.com:9094 (id: 1 rack: use2-az1), epoch=3}}

[tConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  [                                    ] : [Consumer clientId=consumer-1, groupId=enrollmentConsumer] Setting offset for partition bev3_enrollment_sync_topic_dev-0 to the committed offset 
FetchPosition{offset=73, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=b-2.cpsdmskclustertestenv.7hij1q.c2.kafka.us-east-2.amazonaws.com:9094 (id: 2 rack: use2-az2), epoch=3}}

However after changing the group-id it is solved. But i am curious to know how can the older Offset(s) be committed with the older group-id. I am using STCH, ACKMODE.MANUAL_IMMEDIATE and offset->LATESTThanks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

稍尽春風 2025-01-18 07:10:09

将分配提交容器属性设置为 NEVER 默认

    /**
     * Set the assignment commit option. Default
     * {@link AssignmentCommitOption#LATEST_ONLY_NO_TX}.
     * @param assignmentCommitOption the option.
     * @since 2.3.6
     */
    public void setAssignmentCommitOption(AssignmentCommitOption assignmentCommitOption) {

情况下,如果自动偏移重置为最新,则容器会在分配期间提交当前位置。

Set the assignment commit container property to NEVER

    /**
     * Set the assignment commit option. Default
     * {@link AssignmentCommitOption#LATEST_ONLY_NO_TX}.
     * @param assignmentCommitOption the option.
     * @since 2.3.6
     */
    public void setAssignmentCommitOption(AssignmentCommitOption assignmentCommitOption) {

By default, the container commits the current position during assignment if the auto offset reset is latest.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文