骆驼和卡夫卡是否可以使用请求 - iip(inot)?

发布于 2025-01-24 07:08:07 字数 3112 浏览 3 评论 0原文

是否可以使用Camel和Kafka实现请求 - 重新企业集成模式(EIP)? 它可以与ActiveMQ配合使用,但我无法与Kafka一起使用。在消费者收到请求之前,ProducerTemplate收到exchange.getin()的回复。 经过进一步检查,看来kafkaproducer没有实现任何回复侦听逻辑或replasto标题,这些标头(jmsproducer)中存在。

我正在使用Springboot-Camel:3.11.0

这是示例代码:

生产者测试:

@SpringBootTest(classes = MyApplication.class)
@RunWith(SpringRunner.class)
@ActiveProfiles("integration-test")
public class RequestReplySampleRouteIntegrationTest {
    
    @Produce private ProducerTemplate producerTemplate;


    @Test
    public void shouldSendRequestAndReceiveReply() throws Exception {
        String topic = RequestReplySampleRoute.REQUEST + "?exchangePattern=" + ExchangePattern.InOut.name();

        String results = (String) producerTemplate.requestBody(topic, "a request");

        assertThat(results, equalTo("Post processed: [a request]"));
    }

}

消费者中的路线:

@Component
public class RequestReplySampleRoute extends RouteBuilder {
    private Logger logger = LoggerFactory.getLogger(RequestReplySampleRoute.class);
    
    public static final String REQUEST_QUEUE = "kafka:request-reply";
    public static final String REPLY_QUEUE = "kafka:request-reply.reply";

    @Override
    public void configure() throws Exception {
        from(REQUEST)
                .process(exchange-> {
                    Message message = exchange.getMessage();
                    Object body = message.getBody();
                    message.setBody(String.format("Post Processed: [%s]", body));
                })
                .to(ExchangePattern.InOnly, REPLY_QUEUE)
        .end()
        ;

    }
}

pom.xml :

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.camel.springboot</groupId>
        <artifactId>camel-kafka-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
<dependencies>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-dependencies</artifactId>
        <version>2.6.6</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.camel.springboot</groupId>
        <artifactId>camel-spring-boot-dependencies</artifactId>
        <version>3.11.0</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>
</dependencies>
</dependencyManagement>

Is it possible to implement the request-reply enterprise integration pattern (EIP) with Camel and Kafka?
It works well with ActiveMQ but I can't get it to work with Kafka. The ProducerTemplate receives a Reply derived from exchange.getIn(), before the Consumer has even received the Request.
Upon further inspection, it appears that KafkaProducer doesn't implement any of the reply listening logic nor the replyTo headers, that are present in JmsProducer.

I am using springboot-camel:3.11.0

Here is the sample code:

Producer test:

@SpringBootTest(classes = MyApplication.class)
@RunWith(SpringRunner.class)
@ActiveProfiles("integration-test")
public class RequestReplySampleRouteIntegrationTest {
    
    @Produce private ProducerTemplate producerTemplate;


    @Test
    public void shouldSendRequestAndReceiveReply() throws Exception {
        String topic = RequestReplySampleRoute.REQUEST + "?exchangePattern=" + ExchangePattern.InOut.name();

        String results = (String) producerTemplate.requestBody(topic, "a request");

        assertThat(results, equalTo("Post processed: [a request]"));
    }

}

Route in Consumer:

@Component
public class RequestReplySampleRoute extends RouteBuilder {
    private Logger logger = LoggerFactory.getLogger(RequestReplySampleRoute.class);
    
    public static final String REQUEST_QUEUE = "kafka:request-reply";
    public static final String REPLY_QUEUE = "kafka:request-reply.reply";

    @Override
    public void configure() throws Exception {
        from(REQUEST)
                .process(exchange-> {
                    Message message = exchange.getMessage();
                    Object body = message.getBody();
                    message.setBody(String.format("Post Processed: [%s]", body));
                })
                .to(ExchangePattern.InOnly, REPLY_QUEUE)
        .end()
        ;

    }
}

pom.xml:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.camel.springboot</groupId>
        <artifactId>camel-kafka-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
<dependencies>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-dependencies</artifactId>
        <version>2.6.6</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.camel.springboot</groupId>
        <artifactId>camel-spring-boot-dependencies</artifactId>
        <version>3.11.0</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>
</dependencies>
</dependencyManagement>

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文