ActiveMQ:在没有代理的情况下启动消费者

发布于 2024-10-04 22:48:05 字数 470 浏览 0 评论 0原文

我正在编写一个从队列消费的 JMS 客户端。如果重要的话,我的经纪人是 activemq。

一项要求是,即使代理关闭,客户端也应该启动。在这种情况下,它的行为应该就像队列中没有消息一样,一旦代理启动并且消息开始出现,就会做出相应的行为。

问题是在我的代码中:

connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start()

如果代理关闭,那么它就会陷入connection.start()。虽然我想要的是 connection.start() 静静地返回,并继续尝试在后台连接并在可以的时候消耗消息,而在不能的时候保持沉默。

我怎样才能做到这一点。

I am writing a JMS client that consumes from a Queue. My broker is activemq, if it matters.

One requirement is that the client should start even if the broker is down. In that case it should behave as if there where no messages in the Queue, and once the broker is up and messages start coming behave accordingly.

The problem is that in my code:

connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start()

If the broker is down, then it gets stuck in connection.start(). While what I would like to have is connection.start() to return silently and to continue to try to connect in the background and consume messages while it can and be silent when it can't.

How can I achieve this.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

忱杏 2024-10-11 22:48:05

使用单独的线程从队列中消费并启动连接。您将需要使用并发队列实现。

线程 1:

  • 实例化队列
  • 启动线程 2
  • 尝试连接/阻止
  • 将消息添加到队列

线程 2(或某种池):

  • 启动客户端
  • 从队列/块读取
  • 处理消息

Use separate threads for consuming from the queue and starting the connection. You will need to use a concurrent queue implementation.

Thread 1:

  • Instantiate queue
  • Start Thread 2
  • Try to connect/block
  • Add messages to queue

Thread 2 (or a pool of some sort):

  • Start client
  • Read from queue/block
  • Handle messages
暗地喜欢 2024-10-11 22:48:05

有两种方法可以解决此类场景,AMQ-2114 中描述了其中一种方法:

  1. 通过使用 AMQ-2114 中建议的代理 URI。通过使用 AMQ-2114 中建议的代理 URI,创建一个嵌入式代理,并通过网络连接到远程代理。但是,正如所指出的,代理 URI 不使用故障转移连接器。

  2. 通过使用手动配置为使用代理网络的嵌入式代理。我过去曾使用过这种方法,其中 ActiveMQ 代理实例嵌入到 Java 应用程序中,并配置了与远程代理的网络连接。这将允许您的客户端连接到嵌入式代理而不会挂起,并将消息发送到嵌入式代理。消息将从嵌入式代理流向远程代理,但仅当消费者对远程代理上的消息有需求时(即,消费者连接到远程代理并请求消息)。与嵌入式代理的连接可以使用故障转移传输,并且与远程代理的网络连接也可以使用故障转移传输。

我为此解决方案给出的示例是销售人员笔记本电脑上的 Java 应用程序的用例。即使销售人员没有连接到他们的办公室网络,这样的应用程序也需要继续工作。这里的解决方案是在Java应用程序中嵌入一个代理,这样应用程序在发送消息时就不会挂起,即使它没有连接到办公网络。当应用程序再次连接到办公网络时,它将自动连接到另一个代理并且消息将流动。

布鲁斯

There are two ways to work around this type of scenario, one of which is described in AMQ-2114:

  1. By using the broker URI that was suggested in AMQ-2114. By using the broker URI that is suggested in AMQ-2114, an embedded broker is created with a network connection to a remote broker. However, as was pointed out, that broker URI does not use the failover connector.

  2. By using an embedded broker that is manually configured to use a network of brokers. I have used this in the past where an ActiveMQ broker instance is embedded inside your Java app and it is configured with a network connection to a remote broker. This will allow your client to connect to the embedded broker without hanging and will send messages to the embedded broker. Messages will flow from the embedded broker to the remote broker, but only when there is consumer demand for the messages on the remote broker (i.e., consumer connects to the remote broker and requests messages). The connection to the embedded broker can use the failover transport and the network connection to the remote broker can also use the failover transport.

The example I give for this solution is a use case is a Java app on a sales person's laptop. Such an app needs to continue working even if the sales person is not connected to their office network. The solution here is to embed a broker in the Java app so that the app will not hang when sending messages, even if it is not connected to the office network. When the app is again connected to the office network, it will automatically connect to another broker and the messages will flow.

Bruce

離人涙 2024-10-11 22:48:05

为了后代,我强烈建议使用 Spring 的 DefaultMessageListenerContainer 来消费消息,一旦使用以下配置 代码片段

<jms:listener-container>
    <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>
</jms:listener-container>

它将负责根据您的特定要求连接到 jms 队列基础设施(如果 JMS 基础设施关闭,应用程序上下文仍会加载,但重试线程会每隔几秒尝试恢复连接)。

Just for posterity, I would highly recommend using Spring's DefaultMessageListenerContainer for consuming messages, once configured using the following code snippet:

<jms:listener-container>
    <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>
</jms:listener-container>

it would take care of connecting to the jms queue infrastructure with your specific requirements(if the JMS infrastructure is down, the application context would still be loaded up, but a retry thread would attempt to recover the connection every few seconds).

流星番茄 2024-10-11 22:48:05

我认为 ActiveMQ 有协议(许多协议......)来改变连接行为。尝试在您的网址中使用“failover://”或类似内容,看看它是如何工作的。

I think ActiveMQ have protocols (many protocols....) to change connection behavior. Try using 'failover://' or something like that in your url and see how it works.

澜川若宁 2024-10-11 22:48:05

我现在也在解决这个问题。请参阅 AMQ-2114

我不太明白接受的答案,所以我建议赏金。

我正在使用 ActiveMQ 的 C++ 版本。但我相信这应该不会有太大差异。

我考虑过在单独的线程中调用connection.start(),并在TransportListener::transportResumed()中调用connection->createSession()。但是createSession挂掉了。

最重要的是,我没有可行的解决方案,我很高兴知道该问题的其他解决方案。

I'm to right now fighting this problem too. See AMQ-2114.

I don't quite understand accepted answer, so I propose bounty.

I'm using C++ version of ActiveMQ. But I believe this should not differ a lot.

I thought about calling connection.start() in separate thread and call and call connection->createSession() in TransportListener::transportResumed(). But createSession hangs up.

Bottom line, I don't have working solution and I happy to know other solution to the problem.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文