spring kafka消费者中的无限重试@retryabletopic
我正在使用 @RetryableTopic 在 kafka 消费者中实现重试逻辑。 我给出的配置如下:
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 300000, multiplier = 10.0),
autoCreateTopics = "false",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
但是,它不是重试 4 次,而是无限重试,而且也没有延迟时间。有人可以帮我解决代码吗? 我希望消息重试 4 次,第一次延迟 - 5 分钟后,然后是 10 分钟后第二次延迟,20 分钟后第三次延迟...
代码如下:
int i = 1;
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 300000, multiplier = 10.0),
autoCreateTopics = "false",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(topics = "topic_string_data", containerFactory = "default")
public void consume(@Payload String message , @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
String prachi = null;
System.out.println("current time: " + new Date());
System.out.println("retry method invoked -> " + i++ + " times from topic: " + topic);
System.out.println("current time: " + new Date());
prachi.equals("abc");
}
@DltHandler
public void listenDlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) long offset) {
System.out.println("current time dlt: " + new Date());
System.out.println("DLT Received: " + in + " from " + topic + " offset " + offset + " -> " + i++ + " times");
System.out.println("current time dlt: " + new Date());
//dump event to dlt queue
}
Kafka 配置:
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "grp_STRING");
return new DefaultKafkaConsumerFactory<>(config);
//inject consumer factory to kafka listener consumer factory
}
@Bean(name = "default")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
运行应用程序时记录:这不是完整的日志:
32:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.186 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Setting offset for partition topic_string_data-retry-1-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.187 INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.187 INFO 96675 --- [3-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-retry-0-0]
2022-02-23 13:58:40.187 INFO 96675 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-2, groupId=grp_STRING] Setting offset for partition topic_string_data-1 to the committed offset FetchPosition{offset=27, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.187 INFO 96675 --- [4-retry-1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-retry-1-0]
2022-02-23 13:58:40.187 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Setting offset for partition topic_string_data-retry-2-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.188 INFO 96675 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-2, groupId=grp_STRING] Setting offset for partition topic_string_data-0 to the committed offset FetchPosition{offset=24, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.188 INFO 96675 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-4, groupId=grp_STRING] Setting offset for partition topic_string-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.188 INFO 96675 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-4, groupId=grp_STRING] Setting offset for partition topic_string-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.188 INFO 96675 --- [5-retry-2-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-retry-2-0]
2022-02-23 13:58:40.188 INFO 96675 --- [ntainer#2-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-1, topic_string_data-0]
2022-02-23 13:58:40.189 INFO 96675 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string-1, topic_string-0]
2022-02-23 13:58:40.188 INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-2 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.190 INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-0 to the committed offset FetchPosition{offset=14, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.191 INFO 96675 --- [ner#6-dlt-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-dlt-0, topic_string_data-dlt-1, topic_string_data-dlt-2]
2022-02-23 13:58:40.196 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 1 for partition topic_string_data-retry-1-0
2022-02-23 13:58:40.196 WARN 96675 --- [4-retry-1-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic_string_data-retry-1 is not ready for consumption, backing off for approx. 26346 millis.
current time: Wed Feb 23 13:58:40 IST 2022
retry method invoked -> 3 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:40 IST 2022
2022-02-23 13:58:40.713 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:40 IST 2022
retry method invoked -> 4 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:40 IST 2022
2022-02-23 13:58:41.228 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:41 IST 2022
retry method invoked -> 5 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:41 IST 2022
2022-02-23 13:58:41.740 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:41 IST 2022
retry method invoked -> 6 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:41 IST 2022
2022-02-23 13:58:42.254 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:42 IST 2022
retry method invoked -> 7 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:42 IST 2022
2022-02-23 13:58:42.777 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:42 IST 2022
retry method invoked -> 8 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:42 IST 2022
2022-02-23 13:58:43.298 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:43 IST 2022
retry method invoked -> 9 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:43 IST 2022
2022-02-23 13:58:43.809 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:43 IST 2022
retry method invoked -> 10 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:43 IST 2022
current time: Wed Feb 23 13:59:10 IST 2022
retry method invoked -> 11 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:10 IST 2022
2022-02-23 13:59:10.733 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-1-0
2022-02-23 13:59:10.736 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 1 for partition topic_string_data-retry-2-0
2022-02-23 13:59:10.737 WARN 96675 --- [5-retry-2-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic_string_data-retry-2 is not ready for consumption, backing off for approx. 29483 millis.
current time: Wed Feb 23 13:59:10 IST 2022
retry method invoked -> 12 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:10 IST 2022
2022-02-23 13:59:11.249 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:11 IST 2022
retry method invoked -> 13 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:11 IST 2022
2022-02-23 13:59:11.769 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:11 IST 2022
retry method invoked -> 14 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:11 IST 2022
2022-02-23 13:59:12.286 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:12 IST 2022
retry method invoked -> 15 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:12 IST 2022
2022-02-23 13:59:12.805 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:12 IST 2022
retry method invoked -> 16 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:12 IST 2022
2022-02-23 13:59:13.339 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:13 IST 2022
retry method invoked -> 17 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:13 IST 2022
2022-02-23 13:59:13.856 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:13 IST 2022
retry method invoked -> 18 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:13 IST 2022
2022-02-23 13:59:14.372 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 9 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:14 IST 2022
retry method invoked -> 19 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:14 IST 2022
current time: Wed Feb 23 13:59:40 IST 2022
retry method invoked -> 20 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:40 IST 2022
2022-02-23 13:59:40.846 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:40 IST 2022
DLT Received: prachi from topic_string_data-dlt offset 14 -> 21 times
current time dlt: Wed Feb 23 13:59:46 IST 2022
current time: Wed Feb 23 13:59:46 IST 2022
retry method invoked -> 22 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:46 IST 2022
2022-02-23 13:59:47.466 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:47 IST 2022
DLT Received: prachi from topic_string_data-dlt offset 15 -> 23 times
current time dlt: Wed Feb 23 13:59:47 IST 2022
current time: Wed Feb 23 13:59:47 IST 2022
retry method invoked -> 24 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:47 IST 2022
2022-02-23 13:59:47.981 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:47 IST 2022
DLT Received: prachisharma from topic_string_data-dlt offset 16 -> 25 times
current time dlt: Wed Feb 23 13:59:47 IST 2022
current time: Wed Feb 23 13:59:47 IST 2022
retry method invoked -> 26 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:47 IST 2022
2022-02-23 13:59:48.493 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:48 IST 2022
DLT Received: hie from topic_string_data-dlt offset 17 -> 27 times
current time dlt: Wed Feb 23 13:59:48 IST 2022
current time: Wed Feb 23 13:59:48 IST 2022
retry method invoked -> 28 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:48 IST 2022
2022-02-23 13:59:49.011 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:49 IST 2022
DLT Received: hieeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee from topic_string_data-dlt offset 18 -> 29 times
current time dlt: Wed Feb 23 13:59:49 IST 2022
current time: Wed Feb 23 13:59:49 IST 2022
retry method invoked -> 30 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:49 IST 2022
2022-02-23 13:59:49.527 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:49 IST 2022
DLT Received: hie from topic_string_data-dlt offset 19 -> 31 times
current time dlt: Wed Feb 23 13:59:49 IST 2022
current time: Wed Feb 23 13:59:49 IST 2022
retry method invoked -> 32 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:49 IST 2022
current time dlt: Wed Feb 23 13:59:50 IST 2022
DLT Received: hi from topic_string_data-dlt offset 20 -> 33 times
current time dlt: Wed Feb 23 13:59:50 IST 2022
2022-02-23 13:59:50.039 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-2-0
current time: Wed Feb 23 13:59:50 IST 2022
retry method invoked -> 34 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:50 IST 2022
2022-02-23 13:59:50.545 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 9 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:50 IST 2022
DLT Received: hi from topic_string_data-dlt offset 21 -> 35 times
current time dlt: Wed Feb 23 13:59:50 IST 2022
current time: Wed Feb 23 13:59:50 IST 2022
retry method invoked -> 36 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:50 IST 2022
current time dlt: Wed Feb 23 13:59:51 IST 2022
DLT Received: hi from topic_string_data-dlt offset 22 -> 37 times
current time dlt: Wed Feb 23 13:59:51 IST 2022
I am using @RetryableTopic to implement retry logic in kafka consumer.
I gave config as below:
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 300000, multiplier = 10.0),
autoCreateTopics = "false",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
However, instead of retrying for 4 times, it retries infinitely, and that too in no delay time. Can someone please help me with the code?
I want the message to be retried 4 times with first time delay- after 5 mins, then 2nd delay after 10 mins, third delay after 20 mins...
Code is below:
int i = 1;
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 300000, multiplier = 10.0),
autoCreateTopics = "false",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(topics = "topic_string_data", containerFactory = "default")
public void consume(@Payload String message , @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
String prachi = null;
System.out.println("current time: " + new Date());
System.out.println("retry method invoked -> " + i++ + " times from topic: " + topic);
System.out.println("current time: " + new Date());
prachi.equals("abc");
}
@DltHandler
public void listenDlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) long offset) {
System.out.println("current time dlt: " + new Date());
System.out.println("DLT Received: " + in + " from " + topic + " offset " + offset + " -> " + i++ + " times");
System.out.println("current time dlt: " + new Date());
//dump event to dlt queue
}
Kafka config:
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "grp_STRING");
return new DefaultKafkaConsumerFactory<>(config);
//inject consumer factory to kafka listener consumer factory
}
@Bean(name = "default")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Logs when running app: this is not the complete logs:
32:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.186 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Setting offset for partition topic_string_data-retry-1-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.187 INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.187 INFO 96675 --- [3-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-retry-0-0]
2022-02-23 13:58:40.187 INFO 96675 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-2, groupId=grp_STRING] Setting offset for partition topic_string_data-1 to the committed offset FetchPosition{offset=27, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.187 INFO 96675 --- [4-retry-1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-retry-1-0]
2022-02-23 13:58:40.187 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Setting offset for partition topic_string_data-retry-2-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.188 INFO 96675 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-2, groupId=grp_STRING] Setting offset for partition topic_string_data-0 to the committed offset FetchPosition{offset=24, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.188 INFO 96675 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-4, groupId=grp_STRING] Setting offset for partition topic_string-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.188 INFO 96675 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-4, groupId=grp_STRING] Setting offset for partition topic_string-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.188 INFO 96675 --- [5-retry-2-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-retry-2-0]
2022-02-23 13:58:40.188 INFO 96675 --- [ntainer#2-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-1, topic_string_data-0]
2022-02-23 13:58:40.189 INFO 96675 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string-1, topic_string-0]
2022-02-23 13:58:40.188 INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-2 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.190 INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-0 to the committed offset FetchPosition{offset=14, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
2022-02-23 13:58:40.191 INFO 96675 --- [ner#6-dlt-0-C-1] o.s.k.l.KafkaMessageListenerContainer : grp_STRING: partitions assigned: [topic_string_data-dlt-0, topic_string_data-dlt-1, topic_string_data-dlt-2]
2022-02-23 13:58:40.196 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 1 for partition topic_string_data-retry-1-0
2022-02-23 13:58:40.196 WARN 96675 --- [4-retry-1-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic_string_data-retry-1 is not ready for consumption, backing off for approx. 26346 millis.
current time: Wed Feb 23 13:58:40 IST 2022
retry method invoked -> 3 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:40 IST 2022
2022-02-23 13:58:40.713 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:40 IST 2022
retry method invoked -> 4 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:40 IST 2022
2022-02-23 13:58:41.228 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:41 IST 2022
retry method invoked -> 5 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:41 IST 2022
2022-02-23 13:58:41.740 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:41 IST 2022
retry method invoked -> 6 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:41 IST 2022
2022-02-23 13:58:42.254 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:42 IST 2022
retry method invoked -> 7 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:42 IST 2022
2022-02-23 13:58:42.777 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:42 IST 2022
retry method invoked -> 8 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:42 IST 2022
2022-02-23 13:58:43.298 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:43 IST 2022
retry method invoked -> 9 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:43 IST 2022
2022-02-23 13:58:43.809 INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-0-0
current time: Wed Feb 23 13:58:43 IST 2022
retry method invoked -> 10 times from topic: topic_string_data-retry-0
current time: Wed Feb 23 13:58:43 IST 2022
current time: Wed Feb 23 13:59:10 IST 2022
retry method invoked -> 11 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:10 IST 2022
2022-02-23 13:59:10.733 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-1-0
2022-02-23 13:59:10.736 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 1 for partition topic_string_data-retry-2-0
2022-02-23 13:59:10.737 WARN 96675 --- [5-retry-2-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic_string_data-retry-2 is not ready for consumption, backing off for approx. 29483 millis.
current time: Wed Feb 23 13:59:10 IST 2022
retry method invoked -> 12 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:10 IST 2022
2022-02-23 13:59:11.249 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:11 IST 2022
retry method invoked -> 13 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:11 IST 2022
2022-02-23 13:59:11.769 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:11 IST 2022
retry method invoked -> 14 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:11 IST 2022
2022-02-23 13:59:12.286 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:12 IST 2022
retry method invoked -> 15 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:12 IST 2022
2022-02-23 13:59:12.805 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:12 IST 2022
retry method invoked -> 16 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:12 IST 2022
2022-02-23 13:59:13.339 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:13 IST 2022
retry method invoked -> 17 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:13 IST 2022
2022-02-23 13:59:13.856 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:13 IST 2022
retry method invoked -> 18 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:13 IST 2022
2022-02-23 13:59:14.372 INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 9 for partition topic_string_data-retry-1-0
current time: Wed Feb 23 13:59:14 IST 2022
retry method invoked -> 19 times from topic: topic_string_data-retry-1
current time: Wed Feb 23 13:59:14 IST 2022
current time: Wed Feb 23 13:59:40 IST 2022
retry method invoked -> 20 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:40 IST 2022
2022-02-23 13:59:40.846 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:40 IST 2022
DLT Received: prachi from topic_string_data-dlt offset 14 -> 21 times
current time dlt: Wed Feb 23 13:59:46 IST 2022
current time: Wed Feb 23 13:59:46 IST 2022
retry method invoked -> 22 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:46 IST 2022
2022-02-23 13:59:47.466 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:47 IST 2022
DLT Received: prachi from topic_string_data-dlt offset 15 -> 23 times
current time dlt: Wed Feb 23 13:59:47 IST 2022
current time: Wed Feb 23 13:59:47 IST 2022
retry method invoked -> 24 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:47 IST 2022
2022-02-23 13:59:47.981 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:47 IST 2022
DLT Received: prachisharma from topic_string_data-dlt offset 16 -> 25 times
current time dlt: Wed Feb 23 13:59:47 IST 2022
current time: Wed Feb 23 13:59:47 IST 2022
retry method invoked -> 26 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:47 IST 2022
2022-02-23 13:59:48.493 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:48 IST 2022
DLT Received: hie from topic_string_data-dlt offset 17 -> 27 times
current time dlt: Wed Feb 23 13:59:48 IST 2022
current time: Wed Feb 23 13:59:48 IST 2022
retry method invoked -> 28 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:48 IST 2022
2022-02-23 13:59:49.011 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:49 IST 2022
DLT Received: hieeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee from topic_string_data-dlt offset 18 -> 29 times
current time dlt: Wed Feb 23 13:59:49 IST 2022
current time: Wed Feb 23 13:59:49 IST 2022
retry method invoked -> 30 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:49 IST 2022
2022-02-23 13:59:49.527 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:49 IST 2022
DLT Received: hie from topic_string_data-dlt offset 19 -> 31 times
current time dlt: Wed Feb 23 13:59:49 IST 2022
current time: Wed Feb 23 13:59:49 IST 2022
retry method invoked -> 32 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:49 IST 2022
current time dlt: Wed Feb 23 13:59:50 IST 2022
DLT Received: hi from topic_string_data-dlt offset 20 -> 33 times
current time dlt: Wed Feb 23 13:59:50 IST 2022
2022-02-23 13:59:50.039 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-2-0
current time: Wed Feb 23 13:59:50 IST 2022
retry method invoked -> 34 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:50 IST 2022
2022-02-23 13:59:50.545 INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 9 for partition topic_string_data-retry-2-0
current time dlt: Wed Feb 23 13:59:50 IST 2022
DLT Received: hi from topic_string_data-dlt offset 21 -> 35 times
current time dlt: Wed Feb 23 13:59:50 IST 2022
current time: Wed Feb 23 13:59:50 IST 2022
retry method invoked -> 36 times from topic: topic_string_data-retry-2
current time: Wed Feb 23 13:59:50 IST 2022
current time dlt: Wed Feb 23 13:59:51 IST 2022
DLT Received: hi from topic_string_data-dlt offset 22 -> 37 times
current time dlt: Wed Feb 23 13:59:51 IST 2022
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
似乎有两个不同的问题。
一是您似乎已经在主题中拥有记录,如果您将其配置为
最早
,应用程序将在启动时读取所有这些记录。您可以将ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
设置为latest
,或者,如果您在 docker 上本地运行,则可以停止 Kafka 容器并使用类似的内容修剪卷>docker system prune --volumes
(请注意,这将从所有已停止的容器中删除数据 - 明智地使用)。您可以尝试其中之一并再次测试吗?
另一个问题是框架错误地将默认 maxDelay 设置为 30 秒,尽管注释指出默认值是忽略。我将为此打开一个问题并在此处添加链接。现在您可以设置 maxDelay,例如@Backoff(delay = 600000, multiplier = 3.0, maxDelay = 5400000)
,那么应用程序应该具有您想要的 10、30 和 90 分钟的正确延迟。请告诉我这是否适合您,或者您是否还有与此问题相关的任何其他问题。
编辑:问题已打开,您可以关注那里的开发 https://github.com/ spring-projects/spring-kafka/issues/2137
应该在下一个版本中修复。编辑2:实际上
@BackOff
注释中的措辞是相当含糊,但似乎行为是正确的,您应该显式设置更大的 maxDelay。文档应在下一版本中澄清此行为。
编辑3:为了回答您在评论中的问题,可重试主题的工作方式是分区在延迟期间暂停,但消费者不断轮询代理,因此较长的延迟不会触发重新平衡。
从您的日志来看,重新平衡来自主要主题的分区,因此它不太可能与此功能有任何关系。
编辑 4:可重试主题功能在 Spring 中为 Apache Kafka 2.7.0 发布,它使用 kafka-clients 2.7.0。但是,该功能已进行了多项改进,因此我建议如果可能的话,使用最新的 Spring Kafka 版本(当前为 2.8.3)以从中受益。
It seems there are two separate problems.
One is that you seem to already have records in the topics, and if you have it configured to
earliest
the app will read all those records when it starts up. You can either setConsumerConfig.AUTO_OFFSET_RESET_CONFIG
tolatest
, or, if you're running locally on docker, you can stop the Kafka container and prune the volumes with something likedocker system prune --volumes
(note that this will erase data from all your stopped containers - use wisely).Can you try one of these and test again?
The other problem is that the framework is wrongly setting the default maxDelay of 30s even though the annotation states the default is to ignore. I'll open an issue for that and add the link here.For nowYou can set a maxDelay such as@Backoff(delay = 600000, multiplier = 3.0, maxDelay = 5400000)
, then the application should have the correct delays for 10, 30 and 90 minutes as you wanted.Let me know if that works out for you, or if you have any other problems related to this issue.
EDIT: Issue opened, you can follow the development there https://github.com/spring-projects/spring-kafka/issues/2137
It should be fixed in the next release.EDIT 2: Actually the phrasing in the
@BackOff
annotation is rather ambiguous, but seems like the behavior is correct and you should explicitly set a larger maxDelay.The documentation should clarify this behavior in the next release.
EDIT 3: To answer your question in the comments, the way retryable topics work is the partition is paused for the duration of the delay, but the consumer keeps polling the broker, so longer delays don't trigger rebalancing.
From your logs the rebalancing is from the main topic's partitions, so it's unlikely it has anything to do with this feature.
EDIT 4: The Retryable Topics feature was released in Spring for Apache Kafka 2.7.0, which uses kafka-clients 2.7.0. However, there have been several improvements to the feature, so I recommend using the latest Spring Kafka version (currently 2.8.3) if possible to benefit from those.