spring kafka消费者中的无限重试@retryabletopic

发布于 2025-01-09 04:57:58 字数 19814 浏览 1 评论 0原文

我正在使用 @RetryableTopic 在 kafka 消费者中实现重试逻辑。 我给出的配置如下:

@RetryableTopic(
            attempts = "4",
            backoff = @Backoff(delay = 300000, multiplier = 10.0),
            autoCreateTopics = "false",
            topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
    )

但是,它不是重试 4 次,而是无限重试,而且也没有延迟时间。有人可以帮我解决代码吗? 我希望消息重试 4 次,第一次延迟 - 5 分钟后,然后是 10 分钟后第二次延迟,20 分钟后第三次延迟...

代码如下:

int i = 1;

@RetryableTopic(
        attempts = "4",
        backoff = @Backoff(delay = 300000, multiplier = 10.0),
        autoCreateTopics = "false",
        topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(topics = "topic_string_data", containerFactory = "default")
public void consume(@Payload String message , @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    String prachi = null;
    System.out.println("current time: " + new Date());
    System.out.println("retry method invoked -> " + i++ + " times from topic: " + topic);
    System.out.println("current time: " + new Date());
    prachi.equals("abc");
}

@DltHandler
public void listenDlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                      @Header(KafkaHeaders.OFFSET) long offset) {
    System.out.println("current time dlt: " + new Date());
    System.out.println("DLT Received: " + in + " from " + topic + " offset " + offset + " -> " + i++ + " times");
    System.out.println("current time dlt: " + new Date());
    //dump event to dlt queue
}

Kafka 配置:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "grp_STRING");
    return new DefaultKafkaConsumerFactory<>(config);

    //inject consumer factory to kafka listener consumer factory
}

@Bean(name = "default")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

运行应用程序时记录:这不是完整的日志:

32:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.186  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Setting offset for partition topic_string_data-retry-1-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.187  INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.187  INFO 96675 --- [3-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-retry-0-0]
    2022-02-23 13:58:40.187  INFO 96675 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-2, groupId=grp_STRING] Setting offset for partition topic_string_data-1 to the committed offset FetchPosition{offset=27, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.187  INFO 96675 --- [4-retry-1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-retry-1-0]
    2022-02-23 13:58:40.187  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Setting offset for partition topic_string_data-retry-2-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.188  INFO 96675 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-2, groupId=grp_STRING] Setting offset for partition topic_string_data-0 to the committed offset FetchPosition{offset=24, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.188  INFO 96675 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-4, groupId=grp_STRING] Setting offset for partition topic_string-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.188  INFO 96675 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-4, groupId=grp_STRING] Setting offset for partition topic_string-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.188  INFO 96675 --- [5-retry-2-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-retry-2-0]
    2022-02-23 13:58:40.188  INFO 96675 --- [ntainer#2-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-1, topic_string_data-0]
    2022-02-23 13:58:40.189  INFO 96675 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string-1, topic_string-0]
    2022-02-23 13:58:40.188  INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-2 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.190  INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-0 to the committed offset FetchPosition{offset=14, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.191  INFO 96675 --- [ner#6-dlt-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-dlt-0, topic_string_data-dlt-1, topic_string_data-dlt-2]
    2022-02-23 13:58:40.196  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 1 for partition topic_string_data-retry-1-0
    2022-02-23 13:58:40.196  WARN 96675 --- [4-retry-1-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic_string_data-retry-1 is not ready for consumption, backing off for approx. 26346 millis.
    current time: Wed Feb 23 13:58:40 IST 2022
    retry method invoked -> 3 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:40 IST 2022
    2022-02-23 13:58:40.713  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:40 IST 2022
    retry method invoked -> 4 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:40 IST 2022
    2022-02-23 13:58:41.228  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:41 IST 2022
    retry method invoked -> 5 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:41 IST 2022
    2022-02-23 13:58:41.740  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:41 IST 2022
    retry method invoked -> 6 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:41 IST 2022
    2022-02-23 13:58:42.254  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:42 IST 2022
    retry method invoked -> 7 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:42 IST 2022
    2022-02-23 13:58:42.777  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:42 IST 2022
    retry method invoked -> 8 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:42 IST 2022
    2022-02-23 13:58:43.298  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:43 IST 2022
    retry method invoked -> 9 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:43 IST 2022
    2022-02-23 13:58:43.809  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:43 IST 2022
    retry method invoked -> 10 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:43 IST 2022
    current time: Wed Feb 23 13:59:10 IST 2022
    retry method invoked -> 11 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:10 IST 2022
    2022-02-23 13:59:10.733  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-1-0
    2022-02-23 13:59:10.736  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 1 for partition topic_string_data-retry-2-0
    2022-02-23 13:59:10.737  WARN 96675 --- [5-retry-2-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic_string_data-retry-2 is not ready for consumption, backing off for approx. 29483 millis.
    current time: Wed Feb 23 13:59:10 IST 2022
    retry method invoked -> 12 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:10 IST 2022
    2022-02-23 13:59:11.249  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:11 IST 2022
    retry method invoked -> 13 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:11 IST 2022
    2022-02-23 13:59:11.769  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:11 IST 2022
    retry method invoked -> 14 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:11 IST 2022
    2022-02-23 13:59:12.286  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:12 IST 2022
    retry method invoked -> 15 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:12 IST 2022
    2022-02-23 13:59:12.805  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:12 IST 2022
    retry method invoked -> 16 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:12 IST 2022
    2022-02-23 13:59:13.339  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:13 IST 2022
    retry method invoked -> 17 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:13 IST 2022
    2022-02-23 13:59:13.856  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:13 IST 2022
    retry method invoked -> 18 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:13 IST 2022
    2022-02-23 13:59:14.372  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 9 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:14 IST 2022
    retry method invoked -> 19 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:14 IST 2022
    current time: Wed Feb 23 13:59:40 IST 2022
    retry method invoked -> 20 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:40 IST 2022
    2022-02-23 13:59:40.846  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:40 IST 2022
    DLT Received: prachi from topic_string_data-dlt offset 14 -> 21 times
    current time dlt: Wed Feb 23 13:59:46 IST 2022
    current time: Wed Feb 23 13:59:46 IST 2022
    retry method invoked -> 22 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:46 IST 2022
    2022-02-23 13:59:47.466  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:47 IST 2022
    DLT Received: prachi from topic_string_data-dlt offset 15 -> 23 times
    current time dlt: Wed Feb 23 13:59:47 IST 2022
    current time: Wed Feb 23 13:59:47 IST 2022
    retry method invoked -> 24 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:47 IST 2022
    2022-02-23 13:59:47.981  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:47 IST 2022
    DLT Received: prachisharma from topic_string_data-dlt offset 16 -> 25 times
    current time dlt: Wed Feb 23 13:59:47 IST 2022
    current time: Wed Feb 23 13:59:47 IST 2022
    retry method invoked -> 26 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:47 IST 2022
    2022-02-23 13:59:48.493  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:48 IST 2022
    DLT Received: hie from topic_string_data-dlt offset 17 -> 27 times
    current time dlt: Wed Feb 23 13:59:48 IST 2022
    current time: Wed Feb 23 13:59:48 IST 2022
    retry method invoked -> 28 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:48 IST 2022
    2022-02-23 13:59:49.011  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:49 IST 2022
    DLT Received: hieeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee from topic_string_data-dlt offset 18 -> 29 times
    current time dlt: Wed Feb 23 13:59:49 IST 2022
    current time: Wed Feb 23 13:59:49 IST 2022
    retry method invoked -> 30 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:49 IST 2022
    2022-02-23 13:59:49.527  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:49 IST 2022
    DLT Received: hie from topic_string_data-dlt offset 19 -> 31 times
    current time dlt: Wed Feb 23 13:59:49 IST 2022
    current time: Wed Feb 23 13:59:49 IST 2022
    retry method invoked -> 32 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:49 IST 2022
    current time dlt: Wed Feb 23 13:59:50 IST 2022
    DLT Received: hi from topic_string_data-dlt offset 20 -> 33 times
    current time dlt: Wed Feb 23 13:59:50 IST 2022
    2022-02-23 13:59:50.039  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-2-0
    current time: Wed Feb 23 13:59:50 IST 2022
    retry method invoked -> 34 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:50 IST 2022
    2022-02-23 13:59:50.545  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 9 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:50 IST 2022
    DLT Received: hi from topic_string_data-dlt offset 21 -> 35 times
    current time dlt: Wed Feb 23 13:59:50 IST 2022
    current time: Wed Feb 23 13:59:50 IST 2022
    retry method invoked -> 36 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:50 IST 2022
    current time dlt: Wed Feb 23 13:59:51 IST 2022
    DLT Received: hi from topic_string_data-dlt offset 22 -> 37 times
    current time dlt: Wed Feb 23 13:59:51 IST 2022

I am using @RetryableTopic to implement retry logic in kafka consumer.
I gave config as below:

@RetryableTopic(
            attempts = "4",
            backoff = @Backoff(delay = 300000, multiplier = 10.0),
            autoCreateTopics = "false",
            topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
    )

However, instead of retrying for 4 times, it retries infinitely, and that too in no delay time. Can someone please help me with the code?
I want the message to be retried 4 times with first time delay- after 5 mins, then 2nd delay after 10 mins, third delay after 20 mins...

Code is below:

int i = 1;

@RetryableTopic(
        attempts = "4",
        backoff = @Backoff(delay = 300000, multiplier = 10.0),
        autoCreateTopics = "false",
        topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(topics = "topic_string_data", containerFactory = "default")
public void consume(@Payload String message , @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    String prachi = null;
    System.out.println("current time: " + new Date());
    System.out.println("retry method invoked -> " + i++ + " times from topic: " + topic);
    System.out.println("current time: " + new Date());
    prachi.equals("abc");
}

@DltHandler
public void listenDlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                      @Header(KafkaHeaders.OFFSET) long offset) {
    System.out.println("current time dlt: " + new Date());
    System.out.println("DLT Received: " + in + " from " + topic + " offset " + offset + " -> " + i++ + " times");
    System.out.println("current time dlt: " + new Date());
    //dump event to dlt queue
}

Kafka config:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "grp_STRING");
    return new DefaultKafkaConsumerFactory<>(config);

    //inject consumer factory to kafka listener consumer factory
}

@Bean(name = "default")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

Logs when running app: this is not the complete logs:

32:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.186  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Setting offset for partition topic_string_data-retry-1-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.187  INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.187  INFO 96675 --- [3-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-retry-0-0]
    2022-02-23 13:58:40.187  INFO 96675 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-2, groupId=grp_STRING] Setting offset for partition topic_string_data-1 to the committed offset FetchPosition{offset=27, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.187  INFO 96675 --- [4-retry-1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-retry-1-0]
    2022-02-23 13:58:40.187  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Setting offset for partition topic_string_data-retry-2-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.188  INFO 96675 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-2, groupId=grp_STRING] Setting offset for partition topic_string_data-0 to the committed offset FetchPosition{offset=24, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.188  INFO 96675 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-4, groupId=grp_STRING] Setting offset for partition topic_string-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.188  INFO 96675 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-4, groupId=grp_STRING] Setting offset for partition topic_string-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.188  INFO 96675 --- [5-retry-2-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-retry-2-0]
    2022-02-23 13:58:40.188  INFO 96675 --- [ntainer#2-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-1, topic_string_data-0]
    2022-02-23 13:58:40.189  INFO 96675 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string-1, topic_string-0]
    2022-02-23 13:58:40.188  INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-2 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.190  INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-0 to the committed offset FetchPosition{offset=14, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.191  INFO 96675 --- [ner#6-dlt-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-dlt-0, topic_string_data-dlt-1, topic_string_data-dlt-2]
    2022-02-23 13:58:40.196  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 1 for partition topic_string_data-retry-1-0
    2022-02-23 13:58:40.196  WARN 96675 --- [4-retry-1-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic_string_data-retry-1 is not ready for consumption, backing off for approx. 26346 millis.
    current time: Wed Feb 23 13:58:40 IST 2022
    retry method invoked -> 3 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:40 IST 2022
    2022-02-23 13:58:40.713  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:40 IST 2022
    retry method invoked -> 4 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:40 IST 2022
    2022-02-23 13:58:41.228  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:41 IST 2022
    retry method invoked -> 5 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:41 IST 2022
    2022-02-23 13:58:41.740  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:41 IST 2022
    retry method invoked -> 6 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:41 IST 2022
    2022-02-23 13:58:42.254  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:42 IST 2022
    retry method invoked -> 7 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:42 IST 2022
    2022-02-23 13:58:42.777  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:42 IST 2022
    retry method invoked -> 8 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:42 IST 2022
    2022-02-23 13:58:43.298  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:43 IST 2022
    retry method invoked -> 9 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:43 IST 2022
    2022-02-23 13:58:43.809  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:43 IST 2022
    retry method invoked -> 10 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:43 IST 2022
    current time: Wed Feb 23 13:59:10 IST 2022
    retry method invoked -> 11 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:10 IST 2022
    2022-02-23 13:59:10.733  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-1-0
    2022-02-23 13:59:10.736  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 1 for partition topic_string_data-retry-2-0
    2022-02-23 13:59:10.737  WARN 96675 --- [5-retry-2-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic_string_data-retry-2 is not ready for consumption, backing off for approx. 29483 millis.
    current time: Wed Feb 23 13:59:10 IST 2022
    retry method invoked -> 12 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:10 IST 2022
    2022-02-23 13:59:11.249  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:11 IST 2022
    retry method invoked -> 13 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:11 IST 2022
    2022-02-23 13:59:11.769  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:11 IST 2022
    retry method invoked -> 14 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:11 IST 2022
    2022-02-23 13:59:12.286  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:12 IST 2022
    retry method invoked -> 15 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:12 IST 2022
    2022-02-23 13:59:12.805  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:12 IST 2022
    retry method invoked -> 16 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:12 IST 2022
    2022-02-23 13:59:13.339  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:13 IST 2022
    retry method invoked -> 17 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:13 IST 2022
    2022-02-23 13:59:13.856  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:13 IST 2022
    retry method invoked -> 18 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:13 IST 2022
    2022-02-23 13:59:14.372  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 9 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:14 IST 2022
    retry method invoked -> 19 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:14 IST 2022
    current time: Wed Feb 23 13:59:40 IST 2022
    retry method invoked -> 20 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:40 IST 2022
    2022-02-23 13:59:40.846  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:40 IST 2022
    DLT Received: prachi from topic_string_data-dlt offset 14 -> 21 times
    current time dlt: Wed Feb 23 13:59:46 IST 2022
    current time: Wed Feb 23 13:59:46 IST 2022
    retry method invoked -> 22 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:46 IST 2022
    2022-02-23 13:59:47.466  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:47 IST 2022
    DLT Received: prachi from topic_string_data-dlt offset 15 -> 23 times
    current time dlt: Wed Feb 23 13:59:47 IST 2022
    current time: Wed Feb 23 13:59:47 IST 2022
    retry method invoked -> 24 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:47 IST 2022
    2022-02-23 13:59:47.981  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:47 IST 2022
    DLT Received: prachisharma from topic_string_data-dlt offset 16 -> 25 times
    current time dlt: Wed Feb 23 13:59:47 IST 2022
    current time: Wed Feb 23 13:59:47 IST 2022
    retry method invoked -> 26 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:47 IST 2022
    2022-02-23 13:59:48.493  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:48 IST 2022
    DLT Received: hie from topic_string_data-dlt offset 17 -> 27 times
    current time dlt: Wed Feb 23 13:59:48 IST 2022
    current time: Wed Feb 23 13:59:48 IST 2022
    retry method invoked -> 28 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:48 IST 2022
    2022-02-23 13:59:49.011  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:49 IST 2022
    DLT Received: hieeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee from topic_string_data-dlt offset 18 -> 29 times
    current time dlt: Wed Feb 23 13:59:49 IST 2022
    current time: Wed Feb 23 13:59:49 IST 2022
    retry method invoked -> 30 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:49 IST 2022
    2022-02-23 13:59:49.527  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:49 IST 2022
    DLT Received: hie from topic_string_data-dlt offset 19 -> 31 times
    current time dlt: Wed Feb 23 13:59:49 IST 2022
    current time: Wed Feb 23 13:59:49 IST 2022
    retry method invoked -> 32 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:49 IST 2022
    current time dlt: Wed Feb 23 13:59:50 IST 2022
    DLT Received: hi from topic_string_data-dlt offset 20 -> 33 times
    current time dlt: Wed Feb 23 13:59:50 IST 2022
    2022-02-23 13:59:50.039  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-2-0
    current time: Wed Feb 23 13:59:50 IST 2022
    retry method invoked -> 34 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:50 IST 2022
    2022-02-23 13:59:50.545  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 9 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:50 IST 2022
    DLT Received: hi from topic_string_data-dlt offset 21 -> 35 times
    current time dlt: Wed Feb 23 13:59:50 IST 2022
    current time: Wed Feb 23 13:59:50 IST 2022
    retry method invoked -> 36 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:50 IST 2022
    current time dlt: Wed Feb 23 13:59:51 IST 2022
    DLT Received: hi from topic_string_data-dlt offset 22 -> 37 times
    current time dlt: Wed Feb 23 13:59:51 IST 2022

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

み青杉依旧 2025-01-16 04:57:58

似乎有两个不同的问题。

一是您似乎已经在主题中拥有记录,如果您将其配置为最早,应用程序将在启动时读取所有这些记录。您可以将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为 latest,或者,如果您在 docker 上本地运行,则可以停止 Kafka 容器并使用类似 的内容修剪卷>docker system prune --volumes (请注意,这将从所有已停止的容器中删除数据 - 明智地使用)。

您可以尝试其中之一并再次测试吗?

另一个问题是框架错误地将默认 maxDelay 设置为 30 秒,尽管注释指出默认值是忽略。我将为此打开一个问题并在此处添加链接。

现在您可以设置 maxDelay,例如 @Backoff(delay = 600000, multiplier = 3.0, maxDelay = 5400000),那么应用程序应该具有您想要的 10、30 和 90 分钟的正确延迟。

请告诉我这是否适合您,或者您是否还有与此问题相关的任何其他问题。

编辑:问题已打开,您可以关注那里的开发 https://github.com/ spring-projects/spring-kafka/issues/2137

应该在下一个版本中修复。

编辑2:实际上 @BackOff 注释中的措辞是相当含糊,但似乎行为是正确的,您应该显式设置更大的 maxDelay。

文档应在下一版本中澄清此行为。

编辑3:为了回答您在评论中的问题,可重试主题的工作方式是分区在延迟期间暂停,但消费者不断轮询代理,因此较长的延迟不会触发重新平衡。

从您的日志来看,重新平衡来自主要主题的分区,因此它不太可能与此功能有任何关系。

编辑 4:可重试主题功能在 Spring 中为 Apache Kafka 2.7.0 发布,它使用 kafka-clients 2.7.0。但是,该功能已进行了多项改进,因此我建议如果可能的话,使用最新的 Spring Kafka 版本(当前为 2.8.3)以从中受益。

It seems there are two separate problems.

One is that you seem to already have records in the topics, and if you have it configured to earliest the app will read all those records when it starts up. You can either set ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to latest, or, if you're running locally on docker, you can stop the Kafka container and prune the volumes with something like docker system prune --volumes (note that this will erase data from all your stopped containers - use wisely).

Can you try one of these and test again?

The other problem is that the framework is wrongly setting the default maxDelay of 30s even though the annotation states the default is to ignore. I'll open an issue for that and add the link here.

For now You can set a maxDelay such as @Backoff(delay = 600000, multiplier = 3.0, maxDelay = 5400000), then the application should have the correct delays for 10, 30 and 90 minutes as you wanted.

Let me know if that works out for you, or if you have any other problems related to this issue.

EDIT: Issue opened, you can follow the development there https://github.com/spring-projects/spring-kafka/issues/2137

It should be fixed in the next release.

EDIT 2: Actually the phrasing in the @BackOff annotation is rather ambiguous, but seems like the behavior is correct and you should explicitly set a larger maxDelay.

The documentation should clarify this behavior in the next release.

EDIT 3: To answer your question in the comments, the way retryable topics work is the partition is paused for the duration of the delay, but the consumer keeps polling the broker, so longer delays don't trigger rebalancing.

From your logs the rebalancing is from the main topic's partitions, so it's unlikely it has anything to do with this feature.

EDIT 4: The Retryable Topics feature was released in Spring for Apache Kafka 2.7.0, which uses kafka-clients 2.7.0. However, there have been several improvements to the feature, so I recommend using the latest Spring Kafka version (currently 2.8.3) if possible to benefit from those.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文