与contrentMessageListenerContainer一起控制开始/停止容器并获取所有活跃的消费者信息的问题
但是,在使用另一个REST API执行开始时,停止,恢复,暂停并获取所有消费者信息,Kafkalistenerendpointregistry类也没有发现任何侦听器。 这对我来说很奇怪,因为在Kafkalistenerendpointregistry类中使用方法registerListenerContainer时,可以创建消费者,但工作正常。 在查看KafkalistenerendPoinTregistry class之后,此课程在MessageListenerContainer中工作,我认为这是问题所在。我试图铸造Messagelistenercontainer 同时介绍了,但它不起作用。
代码:
cutykafkacontainerregistration.java
@Component
public class CustomKafkaContainerRegistration {
@Value("${concurrent.consumer.kafka}")
private int concurrentConsumer;
@Autowired
public CustomConsumerFactory customConsumerFactory;
public void registerCustomKafkaContainer(Request request) {
CustomContainerProperties customContainerProperties = new CustomContainerProperties(request.getTopicName(),request.getConsumerId());
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
customConsumerFactory.getCustomConsumerFactory(),
customContainerProperties.getContainerProperties());
container.setConcurrency(concurrentConsumer);
container.setAutoStartup(request.getConsumerActivation());
container.getContainers();
container.setBeanName(request.getConsumerId());
container.start();
}
kafkaconsumerregistrycontroller.java
@Slf4j
@RestController
@RequestMapping(path = "/api/kafka/registry")
public class KafkaConsumerRegistryController {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private CustomKafkaContainerRegistration customKafkaContainerRegistration;
@GetMapping
public List<KafkaConsumerResponse> getConsumerIds() {
return kafkaListenerEndpointRegistry.getListenerContainerIds()
.stream()
.map(this::createKafkaConsumerResponse)
.collect(Collectors.toList());
}
@PostMapping(path = "/create")
@ResponseStatus(HttpStatus.CREATED)
public void createConsumer(@RequestBody Request request) {
customKafkaContainerRegistration.registerCustomKafkaContainer(request);
}
@PostMapping(path = "/activate")
@ResponseStatus(HttpStatus.ACCEPTED)
public void activateConsumer(@RequestParam String consumerId) {
ConcurrentMessageListenerContainer listenerContainer = (ConcurrentMessageListenerContainer) kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
if (Objects.isNull(listenerContainer)) {
throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
} else if (listenerContainer.isRunning()) {
throw new RuntimeException(String.format("Consumer with id %s is already running", consumerId));
} else {
log.info("Running a consumer with id " + consumerId);
listenerContainer.start();
}
}
@PostMapping(path = "/pause")
@ResponseStatus(HttpStatus.ACCEPTED)
public void pauseConsumer(@RequestParam String consumerId) {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
if (Objects.isNull(listenerContainer)) {
throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
} else if (!listenerContainer.isRunning()) {
throw new RuntimeException(String.format("Consumer with id %s is not running", consumerId));
} else if (listenerContainer.isContainerPaused()) {
throw new RuntimeException(String.format("Consumer with id %s is already paused", consumerId));
} else if (listenerContainer.isPauseRequested()) {
throw new RuntimeException(String.format("Consumer with id %s is already requested to be paused", consumerId));
} else {
log.info("Pausing a consumer with id " + consumerId);
listenerContainer.pause();
}
}
@PostMapping(path = "/resume")
@ResponseStatus(HttpStatus.ACCEPTED)
public void resumeConsumer(@RequestParam String consumerId) {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
if (Objects.isNull(listenerContainer)) {
throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
} else if (!listenerContainer.isRunning()) {
throw new RuntimeException(String.format("Consumer with id %s is not running", consumerId));
} else if (!listenerContainer.isContainerPaused()) {
throw new RuntimeException(String.format("Consumer with id %s is not paused", consumerId));
} else {
log.info("Resuming a consumer with id " + consumerId);
listenerContainer.resume();
}
}
@PostMapping(path = "/deactivate")
@ResponseStatus(HttpStatus.ACCEPTED)
public void deactivateConsumer(@RequestParam String consumerId) {
ConcurrentMessageListenerContainer listenerContainer = (ConcurrentMessageListenerContainer) kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
if (Objects.isNull(listenerContainer)) {
throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
} else if (!listenerContainer.isRunning()) {
throw new RuntimeException(String.format("Consumer with id %s is already stop", consumerId));
} else {
log.info("Stopping a consumer with id " + consumerId);
listenerContainer.stop();
}
}
private KafkaConsumerResponse createKafkaConsumerResponse(String consumerId) {
MessageListenerContainer listenerContainer =
kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
return KafkaConsumerResponse.builder()
.consumerId(consumerId)
.groupId(listenerContainer.getGroupId())
.listenerId(listenerContainer.getListenerId())
.active(listenerContainer.isRunning())
.assignments(Optional.ofNullable(listenerContainer.getAssignedPartitions())
.map(topicPartitions -> topicPartitions.stream()
.map(this::createKafkaConsumerAssignmentResponse)
.collect(Collectors.toList()))
.orElse(null))
.build();
}
private KafkaConsumerAssignmentResponse createKafkaConsumerAssignmentResponse(
TopicPartition topicPartition) {
return KafkaConsumerAssignmentResponse.builder()
.topic(topicPartition.topic())
.partition(topicPartition.partition())
.build();
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您需要跟踪您在
registercustomkafkacontainer中创建的容器
自己,例如,在简单的concurrentMap&lt;?,?&gt;
中。IE提供的功能与
kafkalistenercontainerregistry
提供的功能相似,为框架创建的容器提供了类似的功能。目前,您只是启动它们,但在任何地方都没有对它们进行引用。
编辑
You need to keep track of containers you create in
registerCustomKafkaContainer
yourself, e.g. in a simpleConcurrentMap<?, ?>
.i.e. provide similar functionality to that which the
KafkaListenerContainerRegistry
provides for containers created by the framework.Currently, you are just starting them but not keeping a reference to them anywhere.
EDIT