Kafkareplytemplate投掷:支持答复需要Kafkatemplate?
这是。
现在,我得到 java.lang.illegalstateException:需要Kafkatemplate来支持答复
。我的设置现在看起来像这样。
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingKafkaTemplate(ProducerFactory<Object, KafkaExampleRecord> producerFactory,
ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> rc) {
return new ReplyingKafkaTemplate<>(producerFactory, rc);
}
@Bean
public ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> repliesContainer(
ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> containerFactory) {
ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> rc =
containerFactory.createContainer("mytopic");
rc.setAutoStartup(false);
return rc;
}
}
我是我的休息控制器,我现在有这个。
@RestController
public class TestController {
@Autowired
private KafkaTemplate<Object, KafkaExampleRecord> template;
@Autowired
private ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingTemplate;
@PostMapping("/test/produce")
public void produceToTopic(@RequestBody KafkaExampleRecord record) {
ListenableFuture<SendResult<Object, KafkaExampleRecord>> future = template.send("mytopic", record);
}
@PostMapping("/test/request")
public void requestReply(@RequestBody KafkaExampleRecord record) throws ExecutionException, InterruptedException, TimeoutException {
ProducerRecord<Object, KafkaExampleRecord> producerRecord = new ProducerRecord<>("mytopic", record);
RequestReplyFuture<Object, KafkaExampleRecord, KafkaExampleRecord> replyFuture = replyingTemplate.sendAndReceive(producerRecord);
SendResult<Object, KafkaExampleRecord> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
}
该例外是在 springapplication.run(myClass.class,args)
上抛出的。
来自 https://docs.spring.io/spring.io/spring-kafka/参考/html/#回复 - template 和 https://www.techgalery.com/2021/08/spring-kafka-how-to-use-use.html 看来我现在拥有我现在需要的一切。什么仍然缺少?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
请参阅有关服务器端的文档接收n- reply语义: https://docs.spring.io/spring-kafka/docs/current/referent/reference/html/#annotation-send-send-to-to-to-to-to-
See documentation about server side receive-n-reply semantics: https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-send-to
由于ReplyingKafkateMplate是Kafkatemplate的扩展名,因此启动将不再配置默认设备。您将需要定义一个单独的kafkatemplate&lt;对象,对象&gt; bean以及ReplyingKafkatemplate。请请查看我的另一个问题,在该问题中,我将同时维护Kafkatemplate和ReplyingKafkatemplate与带有SetReplyplytemplate的侦听器工厂一起。
Since the replyingKafkaTemplate is an extension of KafkaTemplate, Boot will no longer configure a default one . You will need to define a separate KafkaTemplate<Object, Object> bean along with replyingKafkaTemplate .Please look into my another question where I maintain both kafkatemplate and replyingKafkaTemplate separately along with listener factory with setReplyTemplate.
ExecutionException:Due to: org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out using ReplyingKafkaTemplate