Kafkareplytemplate投掷:支持答复需要Kafkatemplate?

发布于 2025-02-11 03:23:37 字数 2934 浏览 1 评论 0 原文

这是

现在,我得到 java.lang.illegalstateException:需要Kafkatemplate来支持答复。我的设置现在看起来像这样。

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    public ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingKafkaTemplate(ProducerFactory<Object, KafkaExampleRecord> producerFactory,
                                                                                                       ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> rc) {
        return new ReplyingKafkaTemplate<>(producerFactory, rc);
    }

    @Bean
    public ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> containerFactory) {

        ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> rc =
                containerFactory.createContainer("mytopic");
        rc.setAutoStartup(false);
        return rc;
    }
}

我是我的休息控制器,我现在有这个。

@RestController
public class TestController {

    @Autowired
    private KafkaTemplate<Object, KafkaExampleRecord> template;

    @Autowired
    private ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingTemplate;

    @PostMapping("/test/produce")
    public void produceToTopic(@RequestBody KafkaExampleRecord record) {
        ListenableFuture<SendResult<Object, KafkaExampleRecord>> future = template.send("mytopic", record);
    }

    @PostMapping("/test/request")
    public void requestReply(@RequestBody KafkaExampleRecord record) throws ExecutionException, InterruptedException, TimeoutException {
        ProducerRecord<Object, KafkaExampleRecord> producerRecord = new ProducerRecord<>("mytopic", record);
        RequestReplyFuture<Object, KafkaExampleRecord, KafkaExampleRecord> replyFuture = replyingTemplate.sendAndReceive(producerRecord);

        SendResult<Object, KafkaExampleRecord> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
  
}

该例外是在 springapplication.run(myClass.class,args)上抛出的。

来自 https://docs.spring.io/spring.io/spring-kafka/参考/html/#回复 - template https://www.techgalery.com/2021/08/spring-kafka-how-to-use-use.html 看来我现在拥有我现在需要的一切。什么仍然缺少?

This is a follow up to KafkaController required a bean of type 'org.springframework.kafka.requestreply.ReplyingKafkaTemplate' that could not be found?.

Now I am getting java.lang.IllegalStateException: a KafkaTemplate is required to support replies. My setup now looks like this.

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    public ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingKafkaTemplate(ProducerFactory<Object, KafkaExampleRecord> producerFactory,
                                                                                                       ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> rc) {
        return new ReplyingKafkaTemplate<>(producerFactory, rc);
    }

    @Bean
    public ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> containerFactory) {

        ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> rc =
                containerFactory.createContainer("mytopic");
        rc.setAutoStartup(false);
        return rc;
    }
}

Im my REST controller I now have this.

@RestController
public class TestController {

    @Autowired
    private KafkaTemplate<Object, KafkaExampleRecord> template;

    @Autowired
    private ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingTemplate;

    @PostMapping("/test/produce")
    public void produceToTopic(@RequestBody KafkaExampleRecord record) {
        ListenableFuture<SendResult<Object, KafkaExampleRecord>> future = template.send("mytopic", record);
    }

    @PostMapping("/test/request")
    public void requestReply(@RequestBody KafkaExampleRecord record) throws ExecutionException, InterruptedException, TimeoutException {
        ProducerRecord<Object, KafkaExampleRecord> producerRecord = new ProducerRecord<>("mytopic", record);
        RequestReplyFuture<Object, KafkaExampleRecord, KafkaExampleRecord> replyFuture = replyingTemplate.sendAndReceive(producerRecord);

        SendResult<Object, KafkaExampleRecord> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
  
}

The exception is thrown at SpringApplication.run(MyClass.class, args).

From https://docs.spring.io/spring-kafka/reference/html/#replying-template and https://www.techgalery.com/2021/08/spring-kafka-how-to-use.html it looks like I have everything I need now. What is still missing?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

独﹏钓一江月 2025-02-18 03:23:37

请参阅有关服务器端的文档接收n- reply语义: https://docs.spring.io/spring-kafka/docs/current/referent/reference/html/#annotation-send-send-to-to-to-to-to-

为了支持 @sendto ,必须使用 kafkatemplate (在其 ReplyTemplate 属性)中提供侦听器容器工厂,这是用于发送答复。

See documentation about server side receive-n-reply semantics: https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-send-to

In order to support @SendTo, the listener container factory must be provided with a KafkaTemplate (in its replyTemplate property), which is used to send the reply.

仅此而已 2025-02-18 03:23:37

由于ReplyingKafkateMplate是Kafkatemplate的扩展名,因此启动将不再配置默认设备。您将需要定义一个单独的kafkatemplate&lt;对象,对象&gt; bean以及ReplyingKafkatemplate。请请查看我的另一个问题,在该问题中,我将同时维护Kafkatemplate和ReplyingKafkatemplate与带有SetReplyplytemplate的侦听器工厂一起。

Since the replyingKafkaTemplate is an extension of KafkaTemplate, Boot will no longer configure a default one . You will need to define a separate KafkaTemplate<Object, Object> bean along with replyingKafkaTemplate .Please look into my another question where I maintain both kafkatemplate and replyingKafkaTemplate separately along with listener factory with setReplyTemplate.

ExecutionException:Due to: org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out using ReplyingKafkaTemplate

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文