使用 stomp 和 activemq.prefetchSize=1 进行主动消息传递
我遇到的情况是,我有一个 activemq 代理,有 2 个队列:Q1 和 Q2。我有两个基于 Ruby 的消费者使用 activemessaging。我们称它们为 C1 和 C2。两个消费者都订阅每个队列。我在订阅每个队列时设置 activemq.prefetchSize=1 。我还设置了ack=client。
考虑以下事件序列:
1) 触发长时间运行作业的消息被发布到队列 Q1。将此称为 M1。
2) M1 被分派给消费者 C1,开始长时间操作。
3) 触发短作业的两条消息被发布到队列Q2。称这些为 M2 和 M3。
4) M2 被分派给 C2,后者快速运行短作业。
5) M3 被调度到 C1,即使 C1 仍在运行 M1。它能够分派到 C1,因为 prefetchSize=1 是在队列订阅上设置的,而不是在连接上设置的。因此,Q1 消息已被发送的事实并不会阻止 Q2 消息的发送。
由于 activemessaging 消费者是单线程的,因此最终结果是 M3 长时间等待 C1,直到 C1 处理完 M1。因此,尽管消费者 C2 处于空闲状态(因为它很快就完成了消息 M2),但 M3 很长时间都没有被处理。
本质上,每当运行一个长的 Q1 作业,然后创建一大堆短的 Q2 作业时,其中一个短的 Q2 作业就会陷入等待长的 Q1 作业完成的消费者身上。
有没有办法在连接级别而不是订阅级别设置 prefetchSize?我真的不希望在处理 M1 时向 C1 发送任何消息。另一种选择是,我可以创建一个专用于处理 Q1 的消费者,然后让其他消费者专用于处理 Q2。但是,我不想这样做,因为 Q1 消息很少出现——Q1 的专用消费者会在一天的大部分时间里闲置,占用内存。
I have a situation where I have a single activemq broker with 2 queues, Q1 and Q2. I have two ruby-based consumers using activemessaging. Let's call them C1 and C2. Both consumers subscribe to each queue. I'm setting activemq.prefetchSize=1 when subscribing to each queue. I'm also setting ack=client.
Consider the following sequence of events:
1) A message that triggers a long-running job is published to queue Q1. Call this M1.
2) M1 is dispatched to consumer C1, kicking off a long operation.
3) Two messages that trigger short jobs are published to queue Q2. Call these M2 and M3.
4) M2 is dispatched to C2 which quickly runs the short job.
5) M3 is dispatched to C1, even though C1 is still running M1. It's able to dispatch to C1 because prefetchSize=1 is set on the queue subscription, not on the connection. So the fact that a Q1 message has already been dispatched doesn't stop one Q2 message from being dispatched.
Since activemessaging consumers are single-threaded, the net result is that M3 sits and waits on C1 for a long time until C1 finishes processing M1. So, M3 is not processed for a long time, despite the fact that consumer C2 is sitting idle (since it quickly finishes with message M2).
Essentially, whenever a long Q1 job is run and then a whole bunch of short Q2 jobs are created, exactly one of the short Q2 jobs gets stuck on a consumer waiting for the long Q1 job to finish.
Is there a way to set prefetchSize at the connection level rather than at the subscription level? I really don't want any messages dispatched to C1 while it is processing M1. The other alternative is that I could create a consumer dedicated to processing Q1 and then have other consumers dedicated to processing Q2. But, I'd rather not do that since Q1 messages are infrequent--Q1's dedicated consumers would sit idle most of the day tying up memory.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
根据 ActiveMQ 文档的扩展 stomp 标头,activemq.prefetchSize 仅适用于 SUBSCRIBE 消息,而不是 CONNECT 消息(http://activemq.apache.org/stomp.html)。以下是相关信息:
我对此的阅读和经验是,由于 M1 尚未被确认(b/c 您已打开客户端确认),因此该 M1 应该是订阅上设置的 prefetchSize=1 允许的 1 条消息。我很惊讶地听到它不起作用,但也许我需要进行更详细的测试。您的设置应该适合您想要的行为。
我从其他人那里听说过有关 activemq 调度的不稳定情况,因此这可能是您正在使用的版本的错误。
我的一个建议是嗅探网络流量以查看 M1 是否因某种原因而被确认,或者将一些 put 语句放入 ruby stomp gem 中以观察通信(这是我通常在调试 stop 问题)。
如果我有机会尝试一下,我会用自己的结果更新我的评论。
一个建议:很可能会发送多个长处理消息,如果长处理消息的数量超过了您的进程数,那么您将陷入此修复,其中快速处理消息正在等待。
我倾向于至少有一个专门的进程来完成快速的工作,或者换句话说,专用一组进程来完成更长的工作。无论调度做什么,让所有轮询器消费者进程同时监听长和短可能最终会得到次优结果。进程组是配置使用者侦听目标子集的方法:http:// /code.google.com/p/activemessaging/wiki/Configuration
The activemq.prefetchSize is only available on a SUBSCRIBE message, not a CONNECT, according to the ActiveMQ docs for their extended stomp headers (http://activemq.apache.org/stomp.html). Here is the relevant info:
My reading and experience with this, is that since M1 has not been ack'd (b/c you have client ack turned on), that this M1 should be the 1 message allowed by prefetchSize=1 set on the subscription. I am surprised to hear that it didn't work, but perhaps I need to run a more detailed test. Your settings should be correct for the behavior you want.
I have heard of flakiness from others about the activemq dispatch, so it is possible this is a bug with the version you are using.
One suggestion I would have is to either sniff the network traffic to see if the M1 is getting ack'd for some reason, or throw some puts statements into the ruby stomp gem to watch the communication (this is what I usually end up doing when debugging stomp problems).
If I get a chance to try this out, I'll update my comment with my own results.
One suggestion: It is very possible that multiple long processing messages could be sent, and if the number of long processing messages exceeds your number of processes, you'll be in this fix where quick processing messages are waiting.
I tend to have at least one dedicated process that just does quick jobs, or to put it another way, dedicate a set # of processes that just do longer jobs. Having all poller consumer processes listen to both long and short can end up with sub-optimal results no matter what dispatch does. Process groups are the way to configure a consumer to listen to a subset of destinations: http://code.google.com/p/activemessaging/wiki/Configuration
我不确定 ActiveMessaging 是否支持此功能,但您可以在长处理消息到达时取消订阅其他消费者,然后在处理后重新订阅它们。
它应该会给你带来想要的效果。
I'm not sure if ActiveMessaging supports this, but you could unsubscribe your other consumers when the long processing message arrives and then re-subscribe them after it get processed.
It should give you the desired effect.