Spring和RabbitMQ如何声明和使用队列?
我正在开发一个使用 Spring 和 RabbitMQ 的 Web 项目。这是一个基本系统,其中主节点将任务发送到从节点(使用直接交换),从节点使用简单的队列响应主节点。
新的奴隶可能随时来来去去,因此他们使用匿名队列。也可以有多个主节点,在这种情况下,每个主节点都有自己的回复队列。当 Master 向 Slave 发送任务时,该任务包含发送该任务的 Master 的回复队列名称。发送消息是使用 AmqpTemplate 完成的。
我正在使用 XML 配置所有这些,这是到目前为止我所得到的:
对于从站:
<!-- Message listener -->
<bean id="taskListener" class="tgi.docgen.amqp.TypedListener" />
<bean id="slave" class="tgi.docgen.node.Slave" />
<bean id="handler" class="tgi.docgen.task.TaskHandler" />
<!-- Rabbit infrastructure -->
<!-- Slave reading queue, bound to the exchange -->
<rabbit:queue id="receiveQueue" auto-delete="true" durable="false" />
<!-- Master -> Slave exchange -->
<rabbit:direct-exchange name="docgenExchange" auto-delete="true" durable="false">
<rabbit:bindings>
<rabbit:binding queue="receiveQueue" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- taskListener bean listens to the queue -->
<rabbit:listener-container connection-factory="rabbitFactory">
<rabbit:listener queues="receiveQueue" ref="taskListener" />
</rabbit:listener-container>
对于主站:
<!-- Message listener -->
<bean id="taskListener" class="tgi.docgen.amqp.TypedListener" />
<bean id="master" class="tgi.docgen.node.Master">
<property name="docgenExchange" value="???" />
<property name="receiveQueue" value="???" />
</bean>
<!-- Rabbit infrastructure -->
<!-- Anonymous reception queue -->
<rabbit:queue id="receiveQueue" auto-delete="true" durable="false" />
<!-- Master -> Slave exchange -->
<rabbit:direct-exchange name="docgenExchange" auto-delete="true" durable="false" />
<!-- taskListener listens to reception queue -->
<rabbit:listener-container connection-factory="rabbitFactory">
<rabbit:listener queues="receiveQueue" ref="taskListener" />
</rabbit:listener-container>
这是我想知道的
1) 如何避免在两个 XML 配置文件中重复交换的名称?主项目依赖于从项目,因此我可以轻松地在它们之间共享配置,但我不知道如何提取交换名称并在两个文件中使用它。
2)正如我所说,Master会向exchange发送任务,并且必须在任务中写入其回复队列的名称。如何在 Master bean 中注入队列和交换的名称?也就是说:我如何将两个 ???
分别替换为“docgenExchange”和上面配置文件中匿名回复队列的生成名称?
I am working on a web project that uses Spring and RabbitMQ. This is a basic system with a Master node sending tasks to Slave nodes (using a direct exchange) and Slave nodes responding to the Master using a simple queue.
New slaves may come and go at any time so they use anonymous queues. There can also be multiple Master nodes, in which case each of them will have their own reply queue. When a Master sends a task to slaves, the task contains the name of the reply queue of the master who sent it. Sending messages is done using AmqpTemplate.
I'm configuring all of this using XML, here's what I've got so far:
For the Slave:
<!-- Message listener -->
<bean id="taskListener" class="tgi.docgen.amqp.TypedListener" />
<bean id="slave" class="tgi.docgen.node.Slave" />
<bean id="handler" class="tgi.docgen.task.TaskHandler" />
<!-- Rabbit infrastructure -->
<!-- Slave reading queue, bound to the exchange -->
<rabbit:queue id="receiveQueue" auto-delete="true" durable="false" />
<!-- Master -> Slave exchange -->
<rabbit:direct-exchange name="docgenExchange" auto-delete="true" durable="false">
<rabbit:bindings>
<rabbit:binding queue="receiveQueue" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- taskListener bean listens to the queue -->
<rabbit:listener-container connection-factory="rabbitFactory">
<rabbit:listener queues="receiveQueue" ref="taskListener" />
</rabbit:listener-container>
For the Master:
<!-- Message listener -->
<bean id="taskListener" class="tgi.docgen.amqp.TypedListener" />
<bean id="master" class="tgi.docgen.node.Master">
<property name="docgenExchange" value="???" />
<property name="receiveQueue" value="???" />
</bean>
<!-- Rabbit infrastructure -->
<!-- Anonymous reception queue -->
<rabbit:queue id="receiveQueue" auto-delete="true" durable="false" />
<!-- Master -> Slave exchange -->
<rabbit:direct-exchange name="docgenExchange" auto-delete="true" durable="false" />
<!-- taskListener listens to reception queue -->
<rabbit:listener-container connection-factory="rabbitFactory">
<rabbit:listener queues="receiveQueue" ref="taskListener" />
</rabbit:listener-container>
Here's what I want to know
1) how do I avoid repeating the name of the exchange in both XML config files ? The Master project depends on the Slave project so I can easily share configuration between them, but I don't see how I can extract the exchange name and use it in both files.
2) As I've said, the Master will send tasks to the exchange, and must write the name of its reply queue in the tasks. How can I inject the name of the queue and the exchange in the Master bean ? That is: how can i replace the two ???
with respectively "docgenExchange" and the generated name of the anonymous reply queue in my config file above ?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
那不是 Spring Integration 2 吗?
不管怎样,几天前我已经为自己做了一个“概念验证”,做的事情正是你想要存档的。请参阅 https://github.com/knalli/task-worker
关于您的第一个问题:嗯,这取决于。这取决于你到底想要什么。我认为在两个项目(主项目和从项目)中配置中央队列的名称没有问题。当然,这不应该是硬编码的,而是一个配置选项。您可以为两个正在运行的程序提供相同的外部属性,这些属性将用作 Spring 配置上下文中的属性占位符 Bean。
第二个有点奇怪:也许你有一个旧的例子,或者这是 Spring Ingegration 1.x.. 我不知道。无论如何:您不需要提供回复名称。因为如果您使用 AMPQ Gateway Beans,Spring Integration 将自动为您处理此问题。
我引用了我的守护进程的入站 AMQP 网关配置的配置端口(这是您的“从属”):
每个提到的通道 - requestChannel、replyChannel、errorChannel - 仅通过一个简单的方法在 Spring Integration Context 中注册
对于 AMQP(即消息代理配置),除了
项之外,不需要更多配置部分。此外,我将使用 Java 注释将消息连接到服务接口 (@Gateway) 和服务类 (@ServiceActivator)。这样就减少了很多配置。
That's not Spring Integration 2?
Anyway, I've made a "proof of concept" for myself a few days ago doing stuff exactly what you want to archive. See at https://github.com/knalli/task-worker
Regarding to your first question: Well, it depends. It depends what do you exactly want. I see no problem to configure in both projects -- project master and project slave -- the name of the central queue. Of course, this should not be hardcoded but a configuration option. And you can provide both running programs the same external properties which will used as a Property Placeholder Bean within the Spring Configuration Context.
The second is a little bit weird: Perhaps you had an old example or this is Spring Ingegration 1.x.. I don't know. Anyway: You don't need to provide a reply name. Because this will be handled automatically by Spring Integration for you if you use the AMPQ Gateway Beans.
I cite a port of the configuration of the Inbound AMQP Gateway configuration of my daemon (this is your "slave"):
Each of the mentioned channels -- requestChannel, replyChannel, errorChannel -- are only registered in the Spring Integration Context via a simple
For AMQP -- meaning Message Broker configuration -- there is no need for more configuration parts except the
<rabbit:queue>
item.Furthermore, I'd use Java annotations to wire the messages to services interfaces (@Gateway) and service classes (@ServiceActivator). This reduces the configuration a lot.