JMS和ThreadPool问题?
我希望 jms 在一个线程处理消息(threadPool 提交可调用)时收到一条消息。 消息由主线程接收。 下面哪种方式更好:
我使用 spring 3.0.5 :
ApplicationContext context = new ClassPathXmlApplicationContext(
"application-context.xml");
jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");
destination = (Destination) context.getBean("destination");
_log4j.debug("ThreadSize in xml\t"
+ appConfig.getThumbCreatorThreadSize());
在主线程中方式1:
while (countFlag < 0) {
try {
TextMessage msg = (TextMessage) jmsTemplate
.receive(destination);
// prehandle ,then give to sub workers.
if (msg != null) {
_log4j.debug("JMSMessageID:\t" + msg.getJMSMessageID()
+ "\t" + msg.getText());
IConsumer thumbConsumerImpl = null;
thumbConsumerImpl = new ThumbConsumerTaskImpl(msg);
Future<List<ThumbCreatorInfo>> result = threadPool
.submit((Callable<List<ThumbCreatorInfo>>) thumbConsumerImpl);
}
} catch (IllegalArgumentException e) {
_log4j.warn(e.getMessage(), e);
} catch (JMSException e) {
_log4j.error("Please check the queue server!JMSException!", e);
} catch (Exception e) {
_log4j.error("", e);
}
}
在主线程中方式2:
TextMessage msg = (TextMessage) jmsTemplate.receive(destination);
do {
try {
// prehandle ,then give to sub workers.
if (msg != null) {
_log4j.debug("JMSMessageID:\t" + msg.getJMSMessageID()
+ "\t" + msg.getText());
IConsumer thumbConsumerImpl = null;
thumbConsumerImpl = new ThumbConsumerTaskImpl(msg);
Future<List<ThumbCreatorInfo>> result = threadPool
.submit((Callable<List<ThumbCreatorInfo>>) thumbConsumerImpl);
}
msg = (TextMessage) jmsTemplate.receive(destination);
} catch (IllegalArgumentException e) {
_log4j.warn(e.getMessage(), e);
} catch (JMSException e) {
_log4j.error("Please check the queue server!JMSException!", e);
} catch (Exception e) {
_log4j.error("", e);
}
} while (countFlag < 0);
I want that jms receives a message when one thread has handled a message (threadPool submits a callable).
The messages are received by a master thread.
Which way is better below:
I use spring 3.0.5 :
ApplicationContext context = new ClassPathXmlApplicationContext(
"application-context.xml");
jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");
destination = (Destination) context.getBean("destination");
_log4j.debug("ThreadSize in xml\t"
+ appConfig.getThumbCreatorThreadSize());
in master thread Way 1:
while (countFlag < 0) {
try {
TextMessage msg = (TextMessage) jmsTemplate
.receive(destination);
// prehandle ,then give to sub workers.
if (msg != null) {
_log4j.debug("JMSMessageID:\t" + msg.getJMSMessageID()
+ "\t" + msg.getText());
IConsumer thumbConsumerImpl = null;
thumbConsumerImpl = new ThumbConsumerTaskImpl(msg);
Future<List<ThumbCreatorInfo>> result = threadPool
.submit((Callable<List<ThumbCreatorInfo>>) thumbConsumerImpl);
}
} catch (IllegalArgumentException e) {
_log4j.warn(e.getMessage(), e);
} catch (JMSException e) {
_log4j.error("Please check the queue server!JMSException!", e);
} catch (Exception e) {
_log4j.error("", e);
}
}
in master thread Way 2:
TextMessage msg = (TextMessage) jmsTemplate.receive(destination);
do {
try {
// prehandle ,then give to sub workers.
if (msg != null) {
_log4j.debug("JMSMessageID:\t" + msg.getJMSMessageID()
+ "\t" + msg.getText());
IConsumer thumbConsumerImpl = null;
thumbConsumerImpl = new ThumbConsumerTaskImpl(msg);
Future<List<ThumbCreatorInfo>> result = threadPool
.submit((Callable<List<ThumbCreatorInfo>>) thumbConsumerImpl);
}
msg = (TextMessage) jmsTemplate.receive(destination);
} catch (IllegalArgumentException e) {
_log4j.warn(e.getMessage(), e);
} catch (JMSException e) {
_log4j.error("Please check the queue server!JMSException!", e);
} catch (Exception e) {
_log4j.error("", e);
}
} while (countFlag < 0);
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我不确定我是否明白你想要做什么。如果您尝试同时处理多条消息,请远离 JmsTemplate 并使用 DefaultMessageListenerContainer 与 并发消费者。也可通过 JMS 获取命名空间。
例如,您似乎可以丢弃问题中显示的所有代码并使用以下代码:
这将自动生成最多 10 个线程以进行并发消息处理。当消息传入时,它将使用其中一个工作线程来调用 fooService.handleNewFoo(),其中 fooService 是 Spring 上下文中的一个 bean。
编辑:我在 github 上创建了一个示例项目,显示了基本的 Spring JMS 设置。您可以在 https://github.com/zzantozz/testbed 浏览源代码/tree/master/basic-spring-jms 或者只是克隆并运行它:
有一个主类启动 JMS 代理并启动 Spring。当 Spring 启动时,它会初始化一个开始发送 JMS 消息的 bean。正如我上面所描述的,还有一个 Spring 消息监听器,它使用消息并将它们传递到生成消息的同一个 bean,该 bean 将它们打印到 stdout。
I'm not sure I get what you're trying to do. If you're trying to have multiple messages processed concurrently, get away from the JmsTemplate and use a DefaultMessageListenerContainer with concurrentConsumers. Also available via the JMS namespace.
For example, it seems that you could throw away all the code that you're showing in your question and use this instead:
That will automatically spawn up to 10 threads for concurrent message processing. When a message comes in, it will use one of the worker threads to call fooService.handleNewFoo(), where fooService is a bean in your Spring context.
Edit: I've created a sample project on github showing a basic Spring JMS setup. You can browse the source at https://github.com/zzantozz/testbed/tree/master/basic-spring-jms or just clone and run it:
There's a main class that starts a JMS broker and starts Spring. When Spring starts, it inits a bean that begins sending JMS messages. There's also a Spring message listener as I described above that consumes messages and passes them to the same bean that produces messages, which prints them to stdout.
为什么不直接使用 MDP?看起来你正在重新创建 Spring 功能。
MDP 示例:
Why aren't you just using an MDP? Seems like you're recreating Spring functionality.
Example MDP: