基于 EJB3 的应用程序的最佳通信模式是什么?

发布于 2024-08-25 15:35:29 字数 5400 浏览 5 评论 0原文

我正在启动一个需要高度可扩展的 Java EE 项目。到目前为止,概念是:

  • 几个消息驱动 Bean,负责架构的不同部分
  • 每个 MDB 都注入一个会话 Bean,处理业务逻辑
  • 几个实体 Bean,提供对
  • 架构不同部分之间持久层通信的 访问通过 JMS 消息请求/回复概念的架构:
    • MDB 收到包含活动请求的消息
    • 使用其会话 bean 执行必要的业务逻辑
    • 将消息中的响应对象返回给原始请求者

这个想法是,通过消息总线将体系结构的各个部分相互解耦,这样就不会出现任何问题。限制了可扩展性。只需启动更多组件 - 只要它们连接到同一总线,我们就可以不断成长。

不幸的是,我们在请求-回复概念方面遇到了很大的问题。交易管理似乎对我们来说有很多阻碍。看来会话 bean 不应该消费消息?!

阅读 http://blogs.oracle.com/fkieviet/entry/request_reply_from_an_ejb 和 < a href="http://forums.sun.com/message.jspa?messageID=10338789" rel="nofollow noreferrer">http://forums.sun.com/message.jspa?messageID=10338789 ,我感觉人们实际上反对 EJB 的请求/回复概念。

如果是这种情况,您的 EJB 之间如何通信? (请记住,可扩展性是我所追求的)

我当前设置的详细信息:

  • MDB 1 'TestController',使用(本地)SLSB 1 'TestService' 进行业务逻辑
  • TestController.onMessage() 使 TestService 向队列 XYZ 发送消息并请求回复
    • TestService 使用 Bean 管理事务
    • TestService 建立连接&初始化时通过联合连接工厂与 JMS 代理进行会话 (@PostConstruct)
    • TestService 发送后提交事务,然后开始另一个事务并等待 10 秒响应
  • 消息到达 MDB 2 'LocationController',它使用(本地)SLSB 2业务逻辑的“LocationService”
  • LocationController.onMessage() 使 LocationService 将消息发送到请求的 JMSReplyTo 队列
    • 相同的 BMT 概念、相同的 @PostConstruct 概念
  • 都使用相同的连接工厂来访问代理

问题:第一条消息成功发送(由 SLSB 1)并接收(由 MDB 2)。返回消息的发送(通过 SLSB 2)也很好。然而,SLSB 1 从未收到任何东西 - 它只是超时。

我尝试不使用 messageSelector,没有任何变化,仍然没有收到消息。

通过会话bean消费消息不好吗?

SLSB 1 - TestService.java

@Resource(name = "jms/mvs.MVSControllerFactory")
private javax.jms.ConnectionFactory connectionFactory;

@PostConstruct
public void initialize() {
    try {
      jmsConnection = connectionFactory.createConnection();
      session = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      System.out.println("Connection to JMS Provider established");
    } catch (Exception e) { }
}

public Serializable sendMessageWithResponse(Destination reqDest, Destination respDest, Serializable request) {
    Serializable response = null;

    try {
        utx.begin();
        Random rand = new Random();
        String correlationId = rand.nextLong() + "-" + (new Date()).getTime();

        // prepare the sending message object
        ObjectMessage reqMsg = session.createObjectMessage();
        reqMsg.setObject(request);
        reqMsg.setJMSReplyTo(respDest);
        reqMsg.setJMSCorrelationID(correlationId);

        // prepare the publishers and subscribers
        MessageProducer producer = session.createProducer(reqDest);

        // send the message
        producer.send(reqMsg);
        System.out.println("Request Message has been sent!");
        utx.commit();

        // need to start second transaction, otherwise the first msg never gets sent
        utx.begin();
        MessageConsumer consumer = session.createConsumer(respDest, "JMSCorrelationID = '" + correlationId + "'");
        jmsConnection.start();
        ObjectMessage respMsg = (ObjectMessage) consumer.receive(10000L);
        utx.commit();

        if (respMsg != null) {
            response = respMsg.getObject();
            System.out.println("Response Message has been received!");
        } else {
            // timeout waiting for response
            System.out.println("Timeout waiting for response!");
        }

    } catch (Exception e) { }

    return response;
}

SLSB 2 - LocationService.Java(仅回复方法,其余同上)

public boolean reply(Message origMsg, Serializable o) {
    boolean rc = false;

    try {
        // check if we have necessary correlationID and replyTo destination
        if (!origMsg.getJMSCorrelationID().equals("") && (origMsg.getJMSReplyTo() != null)) {
            // prepare the payload
            utx.begin();
            ObjectMessage msg = session.createObjectMessage();
            msg.setObject(o);

            // make it a response
            msg.setJMSCorrelationID(origMsg.getJMSCorrelationID());
            Destination dest = origMsg.getJMSReplyTo();

            // send it
            MessageProducer producer = session.createProducer(dest);
            producer.send(msg);
            producer.close();
            System.out.println("Reply Message has been sent");
            utx.commit();

            rc = true;
        }

    } catch (Exception e) {}

    return rc;
}

sun-resources.xml

<admin-object-resource enabled="true" jndi-name="jms/mvs.LocationControllerRequest"  res-type="javax.jms.Queue"  res-adapter="jmsra">
    <property name="Name" value="mvs.LocationControllerRequestQueue"/>
</admin-object-resource>
<admin-object-resource enabled="true" jndi-name="jms/mvs.LocationControllerResponse"  res-type="javax.jms.Queue"  res-adapter="jmsra">
    <property name="Name" value="mvs.LocationControllerResponseQueue"/>
</admin-object-resource>

<connector-connection-pool name="jms/mvs.MVSControllerFactoryPool"  connection-definition-name="javax.jms.QueueConnectionFactory"  resource-adapter-name="jmsra"/>
<connector-resource enabled="true" jndi-name="jms/mvs.MVSControllerFactory" pool-name="jms/mvs.MVSControllerFactoryPool"  />

I'm starting a Java EE project that needs to be strongly scalable. So far, the concept was:

  • several Message Driven Beans, responsible for different parts of the architecture
  • each MDB has a Session Bean injected, handling the business logic
  • a couple of Entity Beans, providing access to the persistence layer
  • communication between the different parts of the architecture via Request/Reply concept via JMS messages:
    • MDB receives msg containing activity request
    • uses its session bean to execute necessary business logic
    • returns response object in msg to original requester

The idea was that by de-coupling parts of the architecture from each other via the message bus, there is no limit to the scalability. Simply start more components - as long as they are connected to the same bus, we can grow and grow.

Unfortunately, we're having massive problems with the request-reply concept. Transaction Mgmt seems to be in our way plenty. It seams that session beans are not supposed to consume messages?!

Reading http://blogs.oracle.com/fkieviet/entry/request_reply_from_an_ejb and http://forums.sun.com/message.jspa?messageID=10338789, I get the feeling that people actually recommend against the request/reply concept for EJBs.

If that is the case, how do you communicate between your EJBs? (Remember, scalability is what I'm after)

Details of my current setup:

  • MDB 1 'TestController', uses (local) SLSB 1 'TestService' for business logic
  • TestController.onMessage() makes TestService send a message to queue XYZ and requests a reply
    • TestService uses Bean Managed Transactions
    • TestService establishes a connection & session to the JMS broker via a joint connection factory upon initialization (@PostConstruct)
    • TestService commits the transaction after sending, then begins another transaction and waits 10 sec for the response
  • Message gets to MDB 2 'LocationController', which uses (local) SLSB 2 'LocationService' for business logic
  • LocationController.onMessage() makes LocationService send a message back to the requested JMSReplyTo queue
    • Same BMT concept, same @PostConstruct concept
  • all use the same connection factory to access the broker

Problem: The first message gets send (by SLSB 1) and received (by MDB 2) ok. The sending of the returning message (by SLSB 2) is fine as well. However, SLSB 1 never receives anything - it just times out.

I tried without the messageSelector, no change, still no receiving message.

Is it not ok to consume message by a session bean?

SLSB 1 - TestService.java

@Resource(name = "jms/mvs.MVSControllerFactory")
private javax.jms.ConnectionFactory connectionFactory;

@PostConstruct
public void initialize() {
    try {
      jmsConnection = connectionFactory.createConnection();
      session = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      System.out.println("Connection to JMS Provider established");
    } catch (Exception e) { }
}

public Serializable sendMessageWithResponse(Destination reqDest, Destination respDest, Serializable request) {
    Serializable response = null;

    try {
        utx.begin();
        Random rand = new Random();
        String correlationId = rand.nextLong() + "-" + (new Date()).getTime();

        // prepare the sending message object
        ObjectMessage reqMsg = session.createObjectMessage();
        reqMsg.setObject(request);
        reqMsg.setJMSReplyTo(respDest);
        reqMsg.setJMSCorrelationID(correlationId);

        // prepare the publishers and subscribers
        MessageProducer producer = session.createProducer(reqDest);

        // send the message
        producer.send(reqMsg);
        System.out.println("Request Message has been sent!");
        utx.commit();

        // need to start second transaction, otherwise the first msg never gets sent
        utx.begin();
        MessageConsumer consumer = session.createConsumer(respDest, "JMSCorrelationID = '" + correlationId + "'");
        jmsConnection.start();
        ObjectMessage respMsg = (ObjectMessage) consumer.receive(10000L);
        utx.commit();

        if (respMsg != null) {
            response = respMsg.getObject();
            System.out.println("Response Message has been received!");
        } else {
            // timeout waiting for response
            System.out.println("Timeout waiting for response!");
        }

    } catch (Exception e) { }

    return response;
}

SLSB 2 - LocationService.Java (only the reply method, rest is same as above)

public boolean reply(Message origMsg, Serializable o) {
    boolean rc = false;

    try {
        // check if we have necessary correlationID and replyTo destination
        if (!origMsg.getJMSCorrelationID().equals("") && (origMsg.getJMSReplyTo() != null)) {
            // prepare the payload
            utx.begin();
            ObjectMessage msg = session.createObjectMessage();
            msg.setObject(o);

            // make it a response
            msg.setJMSCorrelationID(origMsg.getJMSCorrelationID());
            Destination dest = origMsg.getJMSReplyTo();

            // send it
            MessageProducer producer = session.createProducer(dest);
            producer.send(msg);
            producer.close();
            System.out.println("Reply Message has been sent");
            utx.commit();

            rc = true;
        }

    } catch (Exception e) {}

    return rc;
}

sun-resources.xml

<admin-object-resource enabled="true" jndi-name="jms/mvs.LocationControllerRequest"  res-type="javax.jms.Queue"  res-adapter="jmsra">
    <property name="Name" value="mvs.LocationControllerRequestQueue"/>
</admin-object-resource>
<admin-object-resource enabled="true" jndi-name="jms/mvs.LocationControllerResponse"  res-type="javax.jms.Queue"  res-adapter="jmsra">
    <property name="Name" value="mvs.LocationControllerResponseQueue"/>
</admin-object-resource>

<connector-connection-pool name="jms/mvs.MVSControllerFactoryPool"  connection-definition-name="javax.jms.QueueConnectionFactory"  resource-adapter-name="jmsra"/>
<connector-resource enabled="true" jndi-name="jms/mvs.MVSControllerFactory" pool-name="jms/mvs.MVSControllerFactoryPool"  />

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

软甜啾 2024-09-01 15:35:29

即使使用 JMS,请求/答复模式本质上仍然是同步。呼叫者发送一条消息,然后等待回复。这不仅因为分布式事务而变得复杂,而且还意味着在等待回复时,一个或多个资源(至少在本例中是线程)被分配和浪费。您无法以这种方式进行扩展:您本质上受到线程数量的限制。

要拥有真正可扩展的 JMS 架构,一切都必须异步。换句话说:你永远不应该等待。发送和接收的消息应传递必要的信息以触发下一个活动。

如果消息太大,可以只存储一个标识符,并将相应的数据存储在数据库中。但随后数据库再次成为争论的焦点。

如果不同的消息需要知道它们参与哪个长时间运行的进程,您还可以使用关联标识符。当接收到消息时,接收方可以使用相关标识符“恢复”长时间运行的活动。这是 BPEL 的传统模式。同步请求/回复与具有相关标识符的异步消息之间的主要区别在于,可以在每个步骤之间释放资源。您可以扩展后者,但不能扩展第一个。

老实说,我对您的长帖子感到困惑,并且不明白您的设计是否相当异步(并且正确),或者与请求/回复同步(并且有问题)。但我希望我提供了一些答案。

无论如何,请访问网站企业集成模式,它是一个有价值的信息来源。

The request/reply pattern, even if using JMS, is still synchronous in essence. The caller sends a message, and then waits for the reply. This is not only complicated because of the distributed transactions, but also means that while waiting for the reply, one or several resources (the thread at least in this case) are allocated and wasted. You can not scale this way: you are inherently limited by the number of threads.

To have a truly scalable JMS architecture everything must be asynchronous. In other term: you should never wait. The message sent and receive should pass the necessary information to trigger the next activity.

If the size of the message would be too big, you can store only an identifier and store the corresponding data in a database. But then the database becomes again a point of contention.

If different messages need to know in which long-running process they participate, you can also use correlation identifiers. When a message is received, the receive can "resume" the long-running activity using the correlation identifier. That's a traditional pattern with BPEL. The main difference between synchronous request/reply and asynchronous message with correlation identifier is that the resources can be freed between each step. You can scale with the later, but not with the first.

To be honest, I was confused with your long post and didn't understood if your design was rather asynchronous (and correct), or synchronous with request/reply (and problematic). But I hope I provided some element of answer.

In any case, go visit the website Enterprise Integration Patterns, it's a valuable source of information.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文