Spring Integration:将 Web 服务挂接到 FIFO 队列

发布于 2024-07-17 13:14:10 字数 274 浏览 6 评论 0原文

我仍在与 Spring Integration 作斗争 - 这是我的场景:

  1. Web 服务从客户端获取请求 Web
  2. 服务将请求放入队列
  3. 队列使用者处理消息 FIFO 并发送响应,该响应被路由回 Web 服务
  4. Web服务将响应发送回客户端 A

将有多个 Web 服务将消息全部馈送到此队列中,我需要确保它们真正按照接收顺序进行处理。

我需要将 Spring Integration 的哪些部分连接在一起?

I'm still struggling with Spring Integration- here's my scenario:

  1. A web service gets a request from client A
  2. web service puts the request onto a queue
  3. A queue consumer processes the messages FIFO and sends a response that gets routed back to the web service
  4. Web services sends response back to client A

There will be multiple web services all feeding messages onto this queue, and I need to ensure that they are truly processed in the order that they're received.

What pieces of Spring Integration do I need to wire together?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

叫嚣ゝ 2024-07-24 13:14:10

我无法帮助您使用 Spring Integration,但也许您需要重新考虑一下您的架构。 在 ESB 系统中,当您知道消息的处理将花费相当长的时间或者不确定远程端是否已准备好(另一个原因是桥接不兼容的组件)时,您通常会将消息放入队列中。 当您将消息添加到队列中时,您立即返回给请求者,指示消息已收到,但不提供操作结果。 然后,请求者需要轮询结果,或者您也可以提供某种“推送”功能。

因此,如果处理队列中的消息需要花费大量时间,我建议修改您的架构。 Web 客户端长时间等待回复的情况并不常见,而且许多请求也可能会超时。

另一方面,如果消息的处理快速且可靠,则不需要使用队列通道。 让所有消息与中心组件(Java EE 会话 Bean、Spring Bean、Web 服务)进行通信,并自行实现队列机制。 已经有一些答案涵盖了如何做到这一点。

I can't help you with Spring Integration, but perhaps you need to give your architecture a second thought. In ESB systems you usually place a message in a Queue, when you know that the processing of the message will take considerable time or if you aren't certain that the remote end is ready (another reason is to bridge incompatible components). When you add the message to the queue, you immediately return to the requester indicating that the message is received, but not providing the result of the operation. The requester would then need to poll for the result or you could alternatively provide some sort of "push" functionality.

So, if the processing of the messages in the Queue takes a lot of time, I recommend to modify your architecture. It is not common for a Web Client to wait long times for the reply and many requests could also timeout.

If on the other hand the processing of the messages is quick and reliable, then using Queue channels is not needed. Have all your messages communicate with a central component (Java EE Session Bean, Spring Bean, web service) and implement a queue mechanism yourself. There are already answers covering how you could do this.

芯好空 2024-07-24 13:14:10

基于 QueueChannel 这是我的尝试。 这不涉及 Web 服务配置,仅涉及 Web 服务后端实现中的代码。

这是将某些内容添加到队列(您的 Web 服务)中的代码。

public class TheWebService {

  // Could also use QueueChannel, or PollableChannel here instead
  // just picked the most general one
  private org.springframework.integration.channel.MessageChannel queue;

  public void yourWebServiceMethod(SomeArg arg) {
     SomeObjectToPassThatExtendsMessage passed = someInitialProcessing(arg);
     queue.send(passed);
  }
}

这是你的接收器/处理器/出队类中的代码

public class TheProcessor {

  // Could also use QueueChannel here instead
  // just picked the most general one
  private org.springframework.integration.channel.PollableChannel queue;

  // This method needs to be setup to be called by a separate thread.
  // See http://static.springframework.org/spring/docs/2.5.x/api/org/springframework/scheduling/package-summary.html
  // and it's sub-packages.
  public void someProcessingPoller() {
     SomeObjectToPassThatExtendsMessage passed = queue.receive();
     // Do some processing with the passed object.
  }

}

Spring 配置看起来像这样

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans">

  <bean id="webService" class="mypackage.TheWebService">
      <property name="queue" ref="queue" />
  </bean>

  <bean id="processor" class="mypackage.TheProcessor ">
      <property name="queue" ref="queue" />
  </bean>

  <bean id="queue" class="org.springframework.integration.channel.QueueChannel"/>
</beans>

Based on the Javadoc for the QueueChannel here's my attempt at it. This does not address the web service configuration, just the code that would go in the web service back end implementation.

This is the code that would add something to a queue (your web service).

public class TheWebService {

  // Could also use QueueChannel, or PollableChannel here instead
  // just picked the most general one
  private org.springframework.integration.channel.MessageChannel queue;

  public void yourWebServiceMethod(SomeArg arg) {
     SomeObjectToPassThatExtendsMessage passed = someInitialProcessing(arg);
     queue.send(passed);
  }
}

This is the code that would go in your receiver/processor/dequeue class

public class TheProcessor {

  // Could also use QueueChannel here instead
  // just picked the most general one
  private org.springframework.integration.channel.PollableChannel queue;

  // This method needs to be setup to be called by a separate thread.
  // See http://static.springframework.org/spring/docs/2.5.x/api/org/springframework/scheduling/package-summary.html
  // and it's sub-packages.
  public void someProcessingPoller() {
     SomeObjectToPassThatExtendsMessage passed = queue.receive();
     // Do some processing with the passed object.
  }

}

The Spring configuration for this would look something like

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans">

  <bean id="webService" class="mypackage.TheWebService">
      <property name="queue" ref="queue" />
  </bean>

  <bean id="processor" class="mypackage.TheProcessor ">
      <property name="queue" ref="queue" />
  </bean>

  <bean id="queue" class="org.springframework.integration.channel.QueueChannel"/>
</beans>
你在我安 2024-07-24 13:14:10

请注意 Spring Integration,但 Java 5 有许多可以处理 FIFO 的 BlockingQueue。

Note sure about Spring Integration but Java 5 has a number of BlockingQueues that can handle FIFO.

歌入人心 2024-07-24 13:14:10

您应该查看 Spring Integration 中的 http(对于 REST)或 ws(对于 POX/SOAP)“入站网关”元素。 任何一个都可以通过“请求通道”属性连接到共享的、队列支持的通道(可以在后台处理通过同一网关返回回复的路由)。 我建议首先浏览示例。 此博客应该可以帮助您启动并运行:http:// blog.springsource.com/2010/09/29/new-spring-integration-samples/

希望有所帮助。
-标记

You should look at either the http (for REST) or ws (for POX/SOAP) "inbound-gateway" elements in Spring Integration. Either one can be connected to a shared, Queue-backed channel via the "request-channel" attribute (the routing of a reply back through the same gateway can be handled behind the scenes). I would recommend starting by browsing through the samples. This blog should help you get up and running: http://blog.springsource.com/2010/09/29/new-spring-integration-samples/

Hope that helps.
-Mark

喜你已久 2024-07-24 13:14:10

问题不在于春天。 我认为您需要一个队列,其中的元素包含请求并提供响应。 但响应需要阻塞,直到元素被出列并处理。 因此队列元素如下所示:

public class BlockingPair {
  private final RequestBodyType request;
  private ResponseBodyType response;

  public BlockingPair(RequestBodyType request) {
    this.request = request;
  }

  public RequestBodyType getRequest() {
    return request;
  }

  public ResponseBodyType getResponse() {
    while (response == null) {
      Thread.currentThread().sleep(10);
    }
    return response;
  }

  public void setResponse(ResponseBodyType response) {
    this.response = response;
  }
}

Web 服务排队使用其请求正文创建 BlockingPair。 然后将 BlockingPair 元素推入队列。 然后,它创建响应并从 BlockingPair 获取响应正文,但会阻塞。

消费者将一个 BlockingPair 放入队列并设置响应正文。 从那里,网络服务继续写入响应。

您需要三个 bean:webservice、阻塞队列和消费者。 Web 服务和消费者都需要队列作为 bean 属性。

队列和消费者 bean 需要在应用程序上下文中进行规划(由 ContextLoaderListener 初始化)。 队列需要一个 bean id 才能被 Web 服务引用(它有自己的上下文,但应用程序上下文作为父级,因此可以引用队列引用):

web.xml 的一部分

<context-param>
  <param-name>contextConfigLocation</param-name>
  <param-value>classpath:/WEB-INF/applicationContext.xml</param-value>
</context-param>

<listener>
  <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>

<servlet>
  <servlet-name>service</servlet-name>
  <servlet-class>org.springframework.ws.transport.http.MessageDispatcherServlet</servlet-class>
</servlet>

<servlet-mapping>
  <servlet-name>service</servlet-name>
  <url-pattern>/*</url-pattern>
</servlet-mapping>

applicationContext.xml 包含两个 bean:

<bean id="queue" class="java.util.concurrent.LinkedBlockingQueue"/>

<bean id="consumer" class="...">
  <property name="queue" ref="queue"/>
</bean>

webservice 有自己的上下文定义,此处 service-servlet.xml

<bean id="endpoint" class="org.springframework.ws.server.endpoint....PayloadEndpoint">
  <property name="queue" ref="queue"/>
</bean>

有关定义 spring ws 端点的更多信息,请参阅Spring 教程

消费者需要成为后台任务,所以我更喜欢 quartz,对于 spring 集成,请参阅此处

The problem ist not spring. I think you will need a queue with elements containing the request and offering a response. But the response need to block until the element is dequed and processed. So the queue element looks like:

public class BlockingPair {
  private final RequestBodyType request;
  private ResponseBodyType response;

  public BlockingPair(RequestBodyType request) {
    this.request = request;
  }

  public RequestBodyType getRequest() {
    return request;
  }

  public ResponseBodyType getResponse() {
    while (response == null) {
      Thread.currentThread().sleep(10);
    }
    return response;
  }

  public void setResponse(ResponseBodyType response) {
    this.response = response;
  }
}

The webservice enqueing creates the BlockingPair with its request body. Than pushes the BlockingPair element to the queue. Afterwards it creates the response getting the response body from the BlockingPair, but blocks.

The consumer deques one BlockingPair and sets the response body. From there the webservice continues writing the response.

You need three beans: webservice, a blocking queue and the consumer. Both webservice and consumer need the queue as bean property.

The queue and the consumer beans need to be planed in the application context (as initilialized by the ContextLoaderListener). The queue needs a bean id to be references by the webservice (which has its own context, but the application context as a parent so the queue reference can be referenced):

Part of the web.xml:

<context-param>
  <param-name>contextConfigLocation</param-name>
  <param-value>classpath:/WEB-INF/applicationContext.xml</param-value>
</context-param>

<listener>
  <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>

<servlet>
  <servlet-name>service</servlet-name>
  <servlet-class>org.springframework.ws.transport.http.MessageDispatcherServlet</servlet-class>
</servlet>

<servlet-mapping>
  <servlet-name>service</servlet-name>
  <url-pattern>/*</url-pattern>
</servlet-mapping>

The applicationContext.xml contains two beans:

<bean id="queue" class="java.util.concurrent.LinkedBlockingQueue"/>

<bean id="consumer" class="...">
  <property name="queue" ref="queue"/>
</bean>

The webservice has its own context definition, here service-servlet.xml:

<bean id="endpoint" class="org.springframework.ws.server.endpoint....PayloadEndpoint">
  <property name="queue" ref="queue"/>
</bean>

For more information on defining a spring ws endpoint, see the spring tutorial.

The consumer need to be a background task, so i would prefer quartz, for spring integration see here.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文