与contrentMessageListenerContainer一起控制开始/停止容器并获取所有活跃的消费者信息的问题

发布于 2025-02-07 07:55:43 字数 6866 浏览 3 评论 0 原文

@garry在中使用ACK模式手册的手动提交手册_immdediate。我使用contrentMessageListenerContainer侦听器容器,然后添加 确认符合其容器属性及其工作的媒介物类别。使用Create Rest API,我可以创建同时= 6的消费者,这对我来说是预期的结果。

但是,在使用另一个REST API执行开始时,停止,恢复,暂停并获取所有消费者信息,Kafkalistenerendpointregistry类也没有发现任何侦听器。 这对我来说很奇怪,因为在Kafkalistenerendpointregistry类中使用方法registerListenerContainer时,可以创建消费者,但工作正常。 在查看KafkalistenerendPoinTregistry class之后,此课程在MessageListenerContainer中工作,我认为这是问题所在。我试图铸造Messagelistenercontainer 同时介绍了,但它不起作用。

代码:

cutykafkacontainerregistration.java

@Component
public class CustomKafkaContainerRegistration {

@Value("${concurrent.consumer.kafka}")
private int concurrentConsumer;

@Autowired
public CustomConsumerFactory customConsumerFactory;

public void registerCustomKafkaContainer(Request request) {
    CustomContainerProperties customContainerProperties = new CustomContainerProperties(request.getTopicName(),request.getConsumerId());
    ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
            customConsumerFactory.getCustomConsumerFactory(),
            customContainerProperties.getContainerProperties());
    container.setConcurrency(concurrentConsumer);
    container.setAutoStartup(request.getConsumerActivation());
    container.getContainers();
    container.setBeanName(request.getConsumerId());
    container.start();
}

kafkaconsumerregistrycontroller.java

@Slf4j
@RestController
@RequestMapping(path = "/api/kafka/registry")
public class KafkaConsumerRegistryController {

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Autowired
private CustomKafkaContainerRegistration customKafkaContainerRegistration;



@GetMapping
public List<KafkaConsumerResponse> getConsumerIds() {
    return kafkaListenerEndpointRegistry.getListenerContainerIds()
            .stream()
            .map(this::createKafkaConsumerResponse)
            .collect(Collectors.toList());
}

@PostMapping(path = "/create")
@ResponseStatus(HttpStatus.CREATED)
public void createConsumer(@RequestBody Request request) {
    customKafkaContainerRegistration.registerCustomKafkaContainer(request);
}

@PostMapping(path = "/activate")
@ResponseStatus(HttpStatus.ACCEPTED)
public void activateConsumer(@RequestParam String consumerId) {
    ConcurrentMessageListenerContainer listenerContainer = (ConcurrentMessageListenerContainer) kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
    if (Objects.isNull(listenerContainer)) {
        throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
    } else if (listenerContainer.isRunning()) {
        throw new RuntimeException(String.format("Consumer with id %s is already running", consumerId));
    } else {
        log.info("Running a consumer with id " + consumerId);
        listenerContainer.start();
    }
}

@PostMapping(path = "/pause")
@ResponseStatus(HttpStatus.ACCEPTED)
public void pauseConsumer(@RequestParam String consumerId) {
    MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
    if (Objects.isNull(listenerContainer)) {
        throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
    } else if (!listenerContainer.isRunning()) {
        throw new RuntimeException(String.format("Consumer with id %s is not running", consumerId));
    } else if (listenerContainer.isContainerPaused()) {
        throw new RuntimeException(String.format("Consumer with id %s is already paused", consumerId));
    } else if (listenerContainer.isPauseRequested()) {
        throw new RuntimeException(String.format("Consumer with id %s is already requested to be paused", consumerId));
    } else {
        log.info("Pausing a consumer with id " + consumerId);
        listenerContainer.pause();
    }
}

@PostMapping(path = "/resume")
@ResponseStatus(HttpStatus.ACCEPTED)
public void resumeConsumer(@RequestParam String consumerId) {
    MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
    if (Objects.isNull(listenerContainer)) {
        throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
    } else if (!listenerContainer.isRunning()) {
        throw new RuntimeException(String.format("Consumer with id %s is not running", consumerId));
    } else if (!listenerContainer.isContainerPaused()) {
        throw new RuntimeException(String.format("Consumer with id %s is not paused", consumerId));
    } else {
        log.info("Resuming a consumer with id " + consumerId);
        listenerContainer.resume();
    }
}

@PostMapping(path = "/deactivate")
@ResponseStatus(HttpStatus.ACCEPTED)
public void deactivateConsumer(@RequestParam String consumerId) {
    ConcurrentMessageListenerContainer listenerContainer = (ConcurrentMessageListenerContainer) kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
    if (Objects.isNull(listenerContainer)) {
        throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
    } else if (!listenerContainer.isRunning()) {
        throw new RuntimeException(String.format("Consumer with id %s is already stop", consumerId));
    } else {
        log.info("Stopping a consumer with id " + consumerId);
        listenerContainer.stop();
    }
}

private KafkaConsumerResponse createKafkaConsumerResponse(String consumerId) {
    MessageListenerContainer listenerContainer =
            kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
    return KafkaConsumerResponse.builder()
            .consumerId(consumerId)
            .groupId(listenerContainer.getGroupId())
            .listenerId(listenerContainer.getListenerId())
            .active(listenerContainer.isRunning())
            .assignments(Optional.ofNullable(listenerContainer.getAssignedPartitions())
                    .map(topicPartitions -> topicPartitions.stream()
                            .map(this::createKafkaConsumerAssignmentResponse)
                            .collect(Collectors.toList()))
                    .orElse(null))
            .build();
}

private KafkaConsumerAssignmentResponse createKafkaConsumerAssignmentResponse(
        TopicPartition topicPartition) {
    return KafkaConsumerAssignmentResponse.builder()
            .topic(topicPartition.topic())
            .partition(topicPartition.partition())
            .build();
}
}

完整代码:

As @garry mention on Error while implement AcknowledgingMessageListener<String, String> , to create consumer which manual commit with ack mode MANUAL_IMMDEDIATE. I use ConcurrentMessageListenerContainer listener container and add
AcknowledgingMessageListener class to its container properties and its work. Using Create Rest Api , i can create consumer with concurrent = 6 and thats expected result for me.

But while using another Rest Api to perform start , stop , resume , pause and get all consumer information created , KafkaListenerEndpointRegistry class not found any listener that response.
That's strange for me, because while using method registerListenerContainer in kafkaListenerEndpointRegistry class to create consumer, its working fine.
After looking in kafkaListenerEndpointRegistry class, this class working in MessageListenerContainer and i thinks that the problem. I tried to cast MessageListenerContainer
to ConcurrentMessageListenerContainer but its not work.

CODE :

CustomKafkaContainerRegistration.java

@Component
public class CustomKafkaContainerRegistration {

@Value("${concurrent.consumer.kafka}")
private int concurrentConsumer;

@Autowired
public CustomConsumerFactory customConsumerFactory;

public void registerCustomKafkaContainer(Request request) {
    CustomContainerProperties customContainerProperties = new CustomContainerProperties(request.getTopicName(),request.getConsumerId());
    ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
            customConsumerFactory.getCustomConsumerFactory(),
            customContainerProperties.getContainerProperties());
    container.setConcurrency(concurrentConsumer);
    container.setAutoStartup(request.getConsumerActivation());
    container.getContainers();
    container.setBeanName(request.getConsumerId());
    container.start();
}

KafkaConsumerRegistryController.java

@Slf4j
@RestController
@RequestMapping(path = "/api/kafka/registry")
public class KafkaConsumerRegistryController {

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Autowired
private CustomKafkaContainerRegistration customKafkaContainerRegistration;



@GetMapping
public List<KafkaConsumerResponse> getConsumerIds() {
    return kafkaListenerEndpointRegistry.getListenerContainerIds()
            .stream()
            .map(this::createKafkaConsumerResponse)
            .collect(Collectors.toList());
}

@PostMapping(path = "/create")
@ResponseStatus(HttpStatus.CREATED)
public void createConsumer(@RequestBody Request request) {
    customKafkaContainerRegistration.registerCustomKafkaContainer(request);
}

@PostMapping(path = "/activate")
@ResponseStatus(HttpStatus.ACCEPTED)
public void activateConsumer(@RequestParam String consumerId) {
    ConcurrentMessageListenerContainer listenerContainer = (ConcurrentMessageListenerContainer) kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
    if (Objects.isNull(listenerContainer)) {
        throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
    } else if (listenerContainer.isRunning()) {
        throw new RuntimeException(String.format("Consumer with id %s is already running", consumerId));
    } else {
        log.info("Running a consumer with id " + consumerId);
        listenerContainer.start();
    }
}

@PostMapping(path = "/pause")
@ResponseStatus(HttpStatus.ACCEPTED)
public void pauseConsumer(@RequestParam String consumerId) {
    MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
    if (Objects.isNull(listenerContainer)) {
        throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
    } else if (!listenerContainer.isRunning()) {
        throw new RuntimeException(String.format("Consumer with id %s is not running", consumerId));
    } else if (listenerContainer.isContainerPaused()) {
        throw new RuntimeException(String.format("Consumer with id %s is already paused", consumerId));
    } else if (listenerContainer.isPauseRequested()) {
        throw new RuntimeException(String.format("Consumer with id %s is already requested to be paused", consumerId));
    } else {
        log.info("Pausing a consumer with id " + consumerId);
        listenerContainer.pause();
    }
}

@PostMapping(path = "/resume")
@ResponseStatus(HttpStatus.ACCEPTED)
public void resumeConsumer(@RequestParam String consumerId) {
    MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
    if (Objects.isNull(listenerContainer)) {
        throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
    } else if (!listenerContainer.isRunning()) {
        throw new RuntimeException(String.format("Consumer with id %s is not running", consumerId));
    } else if (!listenerContainer.isContainerPaused()) {
        throw new RuntimeException(String.format("Consumer with id %s is not paused", consumerId));
    } else {
        log.info("Resuming a consumer with id " + consumerId);
        listenerContainer.resume();
    }
}

@PostMapping(path = "/deactivate")
@ResponseStatus(HttpStatus.ACCEPTED)
public void deactivateConsumer(@RequestParam String consumerId) {
    ConcurrentMessageListenerContainer listenerContainer = (ConcurrentMessageListenerContainer) kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
    if (Objects.isNull(listenerContainer)) {
        throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
    } else if (!listenerContainer.isRunning()) {
        throw new RuntimeException(String.format("Consumer with id %s is already stop", consumerId));
    } else {
        log.info("Stopping a consumer with id " + consumerId);
        listenerContainer.stop();
    }
}

private KafkaConsumerResponse createKafkaConsumerResponse(String consumerId) {
    MessageListenerContainer listenerContainer =
            kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
    return KafkaConsumerResponse.builder()
            .consumerId(consumerId)
            .groupId(listenerContainer.getGroupId())
            .listenerId(listenerContainer.getListenerId())
            .active(listenerContainer.isRunning())
            .assignments(Optional.ofNullable(listenerContainer.getAssignedPartitions())
                    .map(topicPartitions -> topicPartitions.stream()
                            .map(this::createKafkaConsumerAssignmentResponse)
                            .collect(Collectors.toList()))
                    .orElse(null))
            .build();
}

private KafkaConsumerAssignmentResponse createKafkaConsumerAssignmentResponse(
        TopicPartition topicPartition) {
    return KafkaConsumerAssignmentResponse.builder()
            .topic(topicPartition.topic())
            .partition(topicPartition.partition())
            .build();
}
}

Full code : https://github.com/nonefornothing/DynamicKafkaConsumer/tree/java8-containerListener

listener container in kafkaListenerEndpointRegistry empty image

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

别念他 2025-02-14 07:55:43

您需要跟踪您在 registercustomkafkacontainer中创建的容器自己,例如,在简单的 concurrentMap&lt;?,?&gt; 中。

IE提供的功能与 kafkalistenercontainerregistry 提供的功能相似,为框架创建的容器提供了类似的功能。

目前,您只是启动它们,但在任何地方都没有对它们进行引用。

编辑

private final Map<String, MessageListenerContainer> registry = new ConcurrentHashMap<>();

public void registerCustomKafkaContainer(Request request) {
    CustomContainerProperties customContainerProperties = new CustomContainerProperties(request.getTopicName(),request.getConsumerId());
    ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
            customConsumerFactory.getCustomConsumerFactory(),
            customContainerProperties.getContainerProperties());
    container.setConcurrency(concurrentConsumer);
    container.setAutoStartup(request.getConsumerActivation());
    container.getContainers();
    container.setBeanName(request.getConsumerId());
    this.registry.put(request.getTopicName(), container);
    container.start();
}

public MessageListenerContainer getContainer(String topicName) {
    return this.registry.get(topicName);
}

public MessageListenerContainer remove(String topicName) {
    return this.registry.remove(topicName);
}

You need to keep track of containers you create in registerCustomKafkaContainer yourself, e.g. in a simple ConcurrentMap<?, ?>.

i.e. provide similar functionality to that which the KafkaListenerContainerRegistry provides for containers created by the framework.

Currently, you are just starting them but not keeping a reference to them anywhere.

EDIT

private final Map<String, MessageListenerContainer> registry = new ConcurrentHashMap<>();

public void registerCustomKafkaContainer(Request request) {
    CustomContainerProperties customContainerProperties = new CustomContainerProperties(request.getTopicName(),request.getConsumerId());
    ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
            customConsumerFactory.getCustomConsumerFactory(),
            customContainerProperties.getContainerProperties());
    container.setConcurrency(concurrentConsumer);
    container.setAutoStartup(request.getConsumerActivation());
    container.getContainers();
    container.setBeanName(request.getConsumerId());
    this.registry.put(request.getTopicName(), container);
    container.start();
}

public MessageListenerContainer getContainer(String topicName) {
    return this.registry.get(topicName);
}

public MessageListenerContainer remove(String topicName) {
    return this.registry.remove(topicName);
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文