CamelCorrelationID/JMS CorrelationID 实现

发布于 2025-01-20 15:26:26 字数 9089 浏览 0 评论 0原文

我很困惑如何以每个应用程序使用消息传递系统监听来自服务器的响应的方式来实现 CamelCorrelationID/JMSCorrelationID。在这种情况下,队列是共享的。我已经实现了自己的 CorrelationID 来维护应用程序的状态。在这里,我使用两个客户端应用程序(客户端 A 和客户端 B)向服务器发送请求/从服务器接收请求

当前两个应用程序都在从服务器读取彼此的响应,而不是每个应用程序应该根据传入的 CorrelationID 监听自己的响应请求消息的标头。

需要帮助来解决这个问题。

以下是代码详细信息:

客户端 A 代码:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.12</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.camel.a</groupId>
    <artifactId>CorrelationId-A</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>CorrelationId-A</name>
    <description>Understanding CorrelationId</description>
    
    <properties>
        <java.version>1.8</java.version>
        <camel.version>3.11.0</camel.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-spring-boot-starter</artifactId>  
            <version>${camel.version}</version>     
        </dependency>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-amqp-starter</artifactId>  
            <version>${camel.version}</version>         
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

ClientARequestRoute.java

package com.camel.a.route;

import java.util.UUID;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class ClientARequestRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        from("timer://runOnce?repeatCount=1")
        .process(new Processor() {          
            @Override
            public void process(Exchange exchange) throws Exception {
                UUID uuid = UUID.randomUUID();
                String uniqueId = uuid.toString();
                String testMsg = "Sending message from Client A with Unique Id as - " + uniqueId;               
                exchange.getIn().setHeader(Exchange.CORRELATION_ID, uniqueId);
                exchange.getIn().setBody(testMsg, String.class);
            }           
        })
        .log("${body}")
        .to("amqp:queue:RequestQueue");     
    }
}

ResponseServerRoute.java

package com.camel.a.route;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class ResponseServerRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        from("amqp:queue:ResponseQueue")
        .process(new Processor() {          
            @Override
            public void process(Exchange exchange) throws Exception {
                                                
                String uniqueId = (String)exchange.getIn().getHeader(Exchange.CORRELATION_ID);
                System.out.println("Received Correlation Id - " + uniqueId);
                String msgReceived = (String) exchange.getIn().getBody(String.class);                               
                exchange.getIn().setBody(msgReceived);
            }           
        })
        .log("${body}")
        .end();     
    }
}

客户端 B 代码:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.12</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.camel.b</groupId>
    <artifactId>CorrelationId-B</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>CorrelationId-B</name>
    <description>Understanding CorrelationId</description>
    <properties>
        <java.version>1.8</java.version>
        <camel.version>3.11.0</camel.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-spring-boot-starter</artifactId>  
            <version>${camel.version}</version>     
        </dependency>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-amqp-starter</artifactId>  
            <version>${camel.version}</version>         
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

ClientBRequestRoute.java

package com.camel.b.route;

import java.util.UUID;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class ClientBRequestRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        from("timer://runOnce?repeatCount=1")
        .process(new Processor() {          
            @Override
            public void process(Exchange exchange) throws Exception {
                UUID uuid = UUID.randomUUID();
                String uniqueId = uuid.toString();
                String testMsg = "Sending message from Client B with Unique Id as - " + uniqueId;               
                exchange.getIn().setHeader(Exchange.CORRELATION_ID, uniqueId);
                exchange.getIn().setBody(testMsg, String.class);
            }           
        })
        .log("${body}")
        .to("amqp:queue:RequestQueue"); 
    }
}

ResponseServerRoute.java

package com.camel.b.route;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class ResponseServerRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {      
        from("amqp:queue:ResponseQueue")
        .process(new Processor() {          
            @Override
            public void process(Exchange exchange) throws Exception {
                                                
                String uniqueId = (String)exchange.getIn().getHeader(Exchange.CORRELATION_ID);
                System.out.println("Received Correlation Id - " + uniqueId);
                String msgReceived = (String) exchange.getIn().getBody(String.class);                               
                exchange.getIn().setBody(msgReceived);
            }           
        })
        .log("${body}")
        .end();     
    }
}

I am confused how to implement CamelCorrelationID/JMSCorrelationID in a way where each application listen to its own response from server using messaging system. The queues are shared in this scenario. I have implemented my own CorrelationID to maintain the state of application. here I am using two client applications (Client-A and Client-B) sending/receiving the request to/from server

Currently both applications are reading each other responses from server, instead each application should listens to its own response based on CorrelationID passed in the header of request message.

Need help to resolve this.

Below are the code details:

Client-A code:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.12</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.camel.a</groupId>
    <artifactId>CorrelationId-A</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>CorrelationId-A</name>
    <description>Understanding CorrelationId</description>
    
    <properties>
        <java.version>1.8</java.version>
        <camel.version>3.11.0</camel.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-spring-boot-starter</artifactId>  
            <version>${camel.version}</version>     
        </dependency>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-amqp-starter</artifactId>  
            <version>${camel.version}</version>         
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

ClientARequestRoute.java

package com.camel.a.route;

import java.util.UUID;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class ClientARequestRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        from("timer://runOnce?repeatCount=1")
        .process(new Processor() {          
            @Override
            public void process(Exchange exchange) throws Exception {
                UUID uuid = UUID.randomUUID();
                String uniqueId = uuid.toString();
                String testMsg = "Sending message from Client A with Unique Id as - " + uniqueId;               
                exchange.getIn().setHeader(Exchange.CORRELATION_ID, uniqueId);
                exchange.getIn().setBody(testMsg, String.class);
            }           
        })
        .log("${body}")
        .to("amqp:queue:RequestQueue");     
    }
}

ResponseServerRoute.java

package com.camel.a.route;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class ResponseServerRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        from("amqp:queue:ResponseQueue")
        .process(new Processor() {          
            @Override
            public void process(Exchange exchange) throws Exception {
                                                
                String uniqueId = (String)exchange.getIn().getHeader(Exchange.CORRELATION_ID);
                System.out.println("Received Correlation Id - " + uniqueId);
                String msgReceived = (String) exchange.getIn().getBody(String.class);                               
                exchange.getIn().setBody(msgReceived);
            }           
        })
        .log("${body}")
        .end();     
    }
}

Client-B code:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.12</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.camel.b</groupId>
    <artifactId>CorrelationId-B</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>CorrelationId-B</name>
    <description>Understanding CorrelationId</description>
    <properties>
        <java.version>1.8</java.version>
        <camel.version>3.11.0</camel.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-spring-boot-starter</artifactId>  
            <version>${camel.version}</version>     
        </dependency>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-amqp-starter</artifactId>  
            <version>${camel.version}</version>         
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

ClientBRequestRoute.java

package com.camel.b.route;

import java.util.UUID;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class ClientBRequestRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        
        from("timer://runOnce?repeatCount=1")
        .process(new Processor() {          
            @Override
            public void process(Exchange exchange) throws Exception {
                UUID uuid = UUID.randomUUID();
                String uniqueId = uuid.toString();
                String testMsg = "Sending message from Client B with Unique Id as - " + uniqueId;               
                exchange.getIn().setHeader(Exchange.CORRELATION_ID, uniqueId);
                exchange.getIn().setBody(testMsg, String.class);
            }           
        })
        .log("${body}")
        .to("amqp:queue:RequestQueue"); 
    }
}

ResponseServerRoute.java

package com.camel.b.route;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class ResponseServerRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {      
        from("amqp:queue:ResponseQueue")
        .process(new Processor() {          
            @Override
            public void process(Exchange exchange) throws Exception {
                                                
                String uniqueId = (String)exchange.getIn().getHeader(Exchange.CORRELATION_ID);
                System.out.println("Received Correlation Id - " + uniqueId);
                String msgReceived = (String) exchange.getIn().getBody(String.class);                               
                exchange.getIn().setBody(msgReceived);
            }           
        })
        .log("${body}")
        .end();     
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文