骆驼和卡夫卡是否可以使用请求 - iip(inot)?
是否可以使用Camel和Kafka实现请求 - 重新企业集成模式(EIP)? 它可以与ActiveMQ配合使用,但我无法与Kafka一起使用。在消费者收到请求之前,ProducerTemplate收到exchange.getin()
的回复。 经过进一步检查,看来kafkaproducer
没有实现任何回复侦听逻辑或replasto
标题,这些标头(jmsproducer
)中存在。
我正在使用Springboot-Camel:3.11.0
这是示例代码:
生产者测试:
@SpringBootTest(classes = MyApplication.class)
@RunWith(SpringRunner.class)
@ActiveProfiles("integration-test")
public class RequestReplySampleRouteIntegrationTest {
@Produce private ProducerTemplate producerTemplate;
@Test
public void shouldSendRequestAndReceiveReply() throws Exception {
String topic = RequestReplySampleRoute.REQUEST + "?exchangePattern=" + ExchangePattern.InOut.name();
String results = (String) producerTemplate.requestBody(topic, "a request");
assertThat(results, equalTo("Post processed: [a request]"));
}
}
消费者中的路线:
@Component
public class RequestReplySampleRoute extends RouteBuilder {
private Logger logger = LoggerFactory.getLogger(RequestReplySampleRoute.class);
public static final String REQUEST_QUEUE = "kafka:request-reply";
public static final String REPLY_QUEUE = "kafka:request-reply.reply";
@Override
public void configure() throws Exception {
from(REQUEST)
.process(exchange-> {
Message message = exchange.getMessage();
Object body = message.getBody();
message.setBody(String.format("Post Processed: [%s]", body));
})
.to(ExchangePattern.InOnly, REPLY_QUEUE)
.end()
;
}
}
pom.xml :
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-kafka-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.6.6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-dependencies</artifactId>
<version>3.11.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
Is it possible to implement the request-reply enterprise integration pattern (EIP) with Camel and Kafka?
It works well with ActiveMQ but I can't get it to work with Kafka. The ProducerTemplate receives a Reply derived from exchange.getIn()
, before the Consumer has even received the Request.
Upon further inspection, it appears that KafkaProducer
doesn't implement any of the reply listening logic nor the replyTo
headers, that are present in JmsProducer
.
I am using springboot-camel:3.11.0
Here is the sample code:
Producer test:
@SpringBootTest(classes = MyApplication.class)
@RunWith(SpringRunner.class)
@ActiveProfiles("integration-test")
public class RequestReplySampleRouteIntegrationTest {
@Produce private ProducerTemplate producerTemplate;
@Test
public void shouldSendRequestAndReceiveReply() throws Exception {
String topic = RequestReplySampleRoute.REQUEST + "?exchangePattern=" + ExchangePattern.InOut.name();
String results = (String) producerTemplate.requestBody(topic, "a request");
assertThat(results, equalTo("Post processed: [a request]"));
}
}
Route in Consumer:
@Component
public class RequestReplySampleRoute extends RouteBuilder {
private Logger logger = LoggerFactory.getLogger(RequestReplySampleRoute.class);
public static final String REQUEST_QUEUE = "kafka:request-reply";
public static final String REPLY_QUEUE = "kafka:request-reply.reply";
@Override
public void configure() throws Exception {
from(REQUEST)
.process(exchange-> {
Message message = exchange.getMessage();
Object body = message.getBody();
message.setBody(String.format("Post Processed: [%s]", body));
})
.to(ExchangePattern.InOnly, REPLY_QUEUE)
.end()
;
}
}
pom.xml:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-kafka-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.6.6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-dependencies</artifactId>
<version>3.11.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论