如何限制 C# 应用程序中来自 ActiveMQ 的消息量?
我在 .Net 程序中使用 ActiveMQ,并且充斥着消息事件。
简而言之,当我收到队列事件“onMessage(IMessage returnedMsg)”时,我将消息放入内部队列中,X 个线程从中执行其操作。
起初,我在创建会话时有:“AcknowledgementMode.AutoAcknowledge”,所以我猜测队列中的所有消息都被吸收并放入内存队列中(这是有风险的,因为崩溃后,所有内容都会丢失)。
因此,我在创建会话时使用了“AcknowledgementMode.ClientAcknowledge”,当工作人员准备好消息时,它会调用消息上的“commit()”方法。然而,所有消息仍然被从队列中吸走。
我如何配置它只处理 X 数量的消息或位于内部队列中,并且并非所有内容都立即“下载”?
I'm using ActiveMQ in a .Net program and I'm flooded with message-events.
In short when I get a queue-event 'onMessage(IMessage receivedMsg)' I put the message into an internal queue out of which X threads do their thing.
At first I had: 'AcknowledgementMode.AutoAcknowledge' when creating the session so I'm guessing that all the messages in the queue got sucked down and put into the memory queue (which is risky since with a crash, everything is lost).
So then I used: 'AcknowledgementMode.ClientAcknowledge' when creating the session, and when a worker was ready with the message it calls the 'commit()' method on the message. However, still all the messages get sucked down from the queue.
How can I configure it that ONLY an X amount of messages are being processed or are in an internal queue, and that not everything is being 'downloaded' right away?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您使用的是 .NET 4.0 吗?您可以使用 BlockingCollection 。将其设置为可能包含的最大数量。一旦线程尝试放入多余的元素,Add 操作就会阻塞,直到集合再次低于阈值。
也许这可以起到节流的作用?
Rx框架中也有一个用于节流的API,但不知道它是如何实现的。如果您将 Queue 源实现为 Observable,则您可以使用此 API,但我不知道这是否满足您的需求。
Are you on .NET 4.0? You could use a BlockingCollection . Set it to the maximum amount it may contain. As soon as a thread tries to put in an excess element, the Add operation will block until the collection falls below the threshold again.
Maybe that would do it for throttling?
There is also an API for throttling in the Rx framework, but I do not know how it is implemented. If you implement your Queue source as Observable, this API would become available for you, but I don't know if this hits your needs.
您可以设置客户端预取来控制客户端将发送多少消息。当会话处于自动确认状态时,客户端只会在消息通过 onMessage 回调或通过同步接收传递到您的应用程序后才会确认消息。默认情况下,客户端将从代理预取 1000 条消息,如果客户端出现故障,这些消息将被重新传递到另一个客户端(如果这是一个队列),否则对于主题,它们将被丢弃,因为主题是基于广播的通道。如果您将预取设置为 1,那么您的客户端只会从服务器发送一条消息,然后每次您的 onMessage 回调完成时,都会分派一条新消息,因为客户端会确认该消息,也就是说,如果会话处于自动确认状态模式。
所有选项请参考 NMS 配置页面:
http://activemq.apache.org/nms/configuring.html
问候
蒂姆。
FuseSource.com
You can set the client prefetch to control how many messages the client will be sent. When the Session is in Auto Ack, the client will only ack a message once its been delivered to your app via the onMessage callback or through a synchronous receive. By default the client will prefetch 1000 messages from the broker, if the client goes down these messages would be redelivered to another client it this was a Queue, otherwise for a topic they are just discarded as a topic is a broadcast based channel. If you set the prefetch to one then you client would only be sent one message from the sever, then each time your onMessage callback completes a new message would be dispatched as the client would ack that message, that is if the session is in Auto Ack mode.
Refer to the NMS configuration page for all the options:
http://activemq.apache.org/nms/configuring.html
Regards
Tim.
FuseSource.com