根据标头属性(例如路由键)处理入站 AMQP 消息

发布于 2025-01-20 00:27:49 字数 611 浏览 2 评论 0 原文

我有一项服务,可以接收AMQP消息。该服务与队列绑定,该队列接收所有与一组路由密钥匹配的消息。

我的设置如下:

...

private SomeController controller;

@Autowired
private SimpleMessageListenerContainer receiverContainer;

@Bean
public IntegrationFlow inboundFlow(){
     var adpater = Amqp.inboundAdapter(receiverContainer);
     return IntegrationFlows.from(adapter)
               // some transformations
               .handle(controller, "processMessage")
               .get();
}


这已经很好。但是,现在我想根据标头属性处理不同控制器的消息。在这种情况下,我想为每个路由键提供一个控制器。使用一个带有多个路由键的单个队列只是为每个密钥以不同的方式处理它也是一个好主意吗?

I have a service, that receives AMQP messages. This service is bound to a queue, which receives all messages which match a set of routing keys.

My set up is as follows:

...

private SomeController controller;

@Autowired
private SimpleMessageListenerContainer receiverContainer;

@Bean
public IntegrationFlow inboundFlow(){
     var adpater = Amqp.inboundAdapter(receiverContainer);
     return IntegrationFlows.from(adapter)
               // some transformations
               .handle(controller, "processMessage")
               .get();
}


This already works fine. However, now I want to handle a message with different controllers, depending on a header attribute. In this case I'd like to have a controller for each routing key. Is it also a good idea to use a single queue with multiple routing keys only to handle it differently for each key?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

差↓一点笑了 2025-01-27 00:27:49

在交换和单个队列之间拥有多个绑定确实是合法的。

在本教程中查看更多信息: https://wwwww.rabbitmq.com /tutorials/tutorial-four-spring-amqp.html

amqp.inboundadapter()依赖于 defaultAmqpheaderMapper.inboundmapper() 默认情况下,该 by默认情况下为我们填充了 amqpheaders.received_routing_routing_key meesse header。因此,您确实可以使用 route(message.class,m-> m.getheaders()。路由键值。

It is really legit to have several bindings between an exchange and a single queue.

See more info in this tutorial: https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html.

The Amqp.inboundAdapter() relies on the DefaultAmqpHeaderMapper.inboundMapper() by default which populates for us an AmqpHeaders.RECEIVED_ROUTING_KEY message header before producing. So, you indeed can use a route(Message.class, m -> m.getHeaders().get(AmqpHeaders.RECEIVED_ROUTING_KEY)) with appropriate channelMapping() for the routing key value.

粉红×色少女 2025-01-27 00:27:49

我只是想添加一个代码示例,并入 artem bilan 's(正确)答案,因为此外,我必须合并一个网关(由Artem Bilan暗示了“适当的 channelmapping())。

更多有关您为什么需要网关或某些情况下的桥梁,请参阅

我的初始代码snippit变成了以下内容:


...

@Autowired
private FirstController firstController;

@Autowired
private SecondController secondController;

@Autowired
private SimpleMessageListenerContainer receiverContainer;

@Bean
public IntegrationFlow inboundFlow(){
     var adpater = Amqp.inboundAdapter(receiverContainer);
     return IntegrationFlows.from(adapter)
               // some transformations
               .route(Message.class, getMessageRoutingKey(m),
                        m -> m.subFlowMapping("routingKey1", firstFlow())
                                // after the first subFlow, all further integrationflows are wrapped in a gateway
                                .subFlowMapping("routingKey2", sf -> sf.gateway(secondFlow())))
               .get();
}


@Bean
public IntegrationFlow firstFlow() {
      return f -> f
               // e.g. additional transformations
               .handle(firstController, "processMessageInFirstFashion");
}


@Bean
public IntegrationFlow secondFlow() {
      return f -> f
               // e.g. additional transformations
               .handle(secondController, "processMessageInSecondFashion");
}


private static String getMessageRoutingKey(final Message<?> message) {
       return message.getHeaders().get(AmqpHeaders.RECEIVED_ROUTING_KEY).toString();
}

I just wanted to add a code example, incorporating Artem Bilan's (correct) answer, because additionally to that, I had to incorporate a gateway (hinted by Artem Bilan with "appropriate channelMapping()).

More about why you need a gateway or in some cases a bridge, refer to this part of the documentation.

My initial code snippit becomes something like the following:


...

@Autowired
private FirstController firstController;

@Autowired
private SecondController secondController;

@Autowired
private SimpleMessageListenerContainer receiverContainer;

@Bean
public IntegrationFlow inboundFlow(){
     var adpater = Amqp.inboundAdapter(receiverContainer);
     return IntegrationFlows.from(adapter)
               // some transformations
               .route(Message.class, getMessageRoutingKey(m),
                        m -> m.subFlowMapping("routingKey1", firstFlow())
                                // after the first subFlow, all further integrationflows are wrapped in a gateway
                                .subFlowMapping("routingKey2", sf -> sf.gateway(secondFlow())))
               .get();
}


@Bean
public IntegrationFlow firstFlow() {
      return f -> f
               // e.g. additional transformations
               .handle(firstController, "processMessageInFirstFashion");
}


@Bean
public IntegrationFlow secondFlow() {
      return f -> f
               // e.g. additional transformations
               .handle(secondController, "processMessageInSecondFashion");
}


private static String getMessageRoutingKey(final Message<?> message) {
       return message.getHeaders().get(AmqpHeaders.RECEIVED_ROUTING_KEY).toString();
}

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文