使用 stomp 和 activemq.prefetchSize=1 进行主动消息传递

发布于 2024-09-04 03:35:38 字数 823 浏览 11 评论 0原文

我遇到的情况是,我有一个 activemq 代理,有 2 个队列:Q1 和 Q2。我有两个基于 Ruby 的消费者使用 activemessaging。我们称它们为 C1 和 C2。两个消费者都订阅每个队列。我在订阅每个队列时设置 activemq.prefetchSize=1 。我还设置了ack=client。

考虑以下事件序列:

1) 触发长时间运行作业的消息被发布到队列 Q1。将此称为 M1。

2) M1 被分派给消费者 C1,开始长时间操作。

3) 触发短作业的两条消息被发布到队列Q2。称这些为 M2 和 M3。

4) M2 被分派给 C2,后者快速运行短作业。

5) M3 被调度到 C1,即使 C1 仍在运行 M1。它能够分派到 C1,因为 prefetchSize=1 是在队列订阅上设置的,而不是在连接上设置的。因此,Q1 消息已被发送的事实并不会阻止 Q2 消息的发送。

由于 activemessaging 消费者是单线程的,因此最终结果是 M3 长时间等待 C1,直到 C1 处理完 M1。因此,尽管消费者 C2 处于空闲状态(因为它很快就完成了消息 M2),但 M3 很长时间都没有被处理。

本质上,每当运行一个长的 Q1 作业,然后创建一大堆短的 Q2 作业时,其中一个短的 Q2 作业就会陷入等待长的 Q1 作业完成的消费者身上。

有没有办法在连接级别而不是订阅级别设置 prefetchSize?我真的不希望在处理 M1 时向 C1 发送任何消息。另一种选择是,我可以创建一个专用于处理 Q1 的消费者,然后让其他消费者专用于处理 Q2。但是,我不想这样做,因为 Q1 消息很少出现——Q1 的专用消费者会在一天的大部分时间里闲置,占用内存。

I have a situation where I have a single activemq broker with 2 queues, Q1 and Q2. I have two ruby-based consumers using activemessaging. Let's call them C1 and C2. Both consumers subscribe to each queue. I'm setting activemq.prefetchSize=1 when subscribing to each queue. I'm also setting ack=client.

Consider the following sequence of events:

1) A message that triggers a long-running job is published to queue Q1. Call this M1.

2) M1 is dispatched to consumer C1, kicking off a long operation.

3) Two messages that trigger short jobs are published to queue Q2. Call these M2 and M3.

4) M2 is dispatched to C2 which quickly runs the short job.

5) M3 is dispatched to C1, even though C1 is still running M1. It's able to dispatch to C1 because prefetchSize=1 is set on the queue subscription, not on the connection. So the fact that a Q1 message has already been dispatched doesn't stop one Q2 message from being dispatched.

Since activemessaging consumers are single-threaded, the net result is that M3 sits and waits on C1 for a long time until C1 finishes processing M1. So, M3 is not processed for a long time, despite the fact that consumer C2 is sitting idle (since it quickly finishes with message M2).

Essentially, whenever a long Q1 job is run and then a whole bunch of short Q2 jobs are created, exactly one of the short Q2 jobs gets stuck on a consumer waiting for the long Q1 job to finish.

Is there a way to set prefetchSize at the connection level rather than at the subscription level? I really don't want any messages dispatched to C1 while it is processing M1. The other alternative is that I could create a consumer dedicated to processing Q1 and then have other consumers dedicated to processing Q2. But, I'd rather not do that since Q1 messages are infrequent--Q1's dedicated consumers would sit idle most of the day tying up memory.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

明月松间行 2024-09-11 03:35:38

根据 ActiveMQ 文档的扩展 stomp 标头,activemq.prefetchSize 仅适用于 SUBSCRIBE 消息,而不是 CONNECT 消息(http://activemq.apache.org/stomp.html)。以下是相关信息:

动词:订阅

标头:activemq.prefetchSize

类型:整数

描述:指定最大
待处理消息的数量
被发送给客户。一旦这个
已达到最大值,不再有消息
发送至客户
确认一条消息。设置为 1 的
消息的分配非常公平
跨消费者进行处理
消息可能会很慢。

我对此的阅读和经验是,由于 M1 尚未被确认(b/c 您已打开客户端确认),因此该 M1 应该是订阅上设置的 prefetchSize=1 允许的 1 条消息。我很惊讶地听到它不起作用,但也许我需要进行更详细的测试。您的设置应该适合您想要的行为。

我从其他人那里听说过有关 activemq 调度的不稳定情况,因此这可能是您正在使用的版本的错误。

我的一个建议是嗅探网络流量以查看 M1 是否因某种原因而被确认,或者将一些 put 语句放入 ruby​​ stomp gem 中以观察通信(这是我通常在调试 stop 问题)。

如果我有机会尝试一下,我会用自己的结果更新我的评论。

一个建议:很可能会发送多个长处理消息,如果长处理消息的数量超过了您的进程数,那么您将陷入此修复,其中快速处理消息正在等待。

我倾向于至少有一个专门的进程来完成快速的工作,或者换句话说,专用一组进程来完成更长的工作。无论调度做什么,让所有轮询器消费者进程同时监听长和短可能最终会得到次优结果。进程组是配置使用者侦听目标子集的方法:http:// /code.google.com/p/activemessaging/wiki/Configuration

处理器组名称,
*处理器列表

处理器组是一种运行轮询器以仅执行一部分的方法

通过传递名称来处理者
poller 命令行中的组
论据。

您将处理器的名称指定为带下划线的小写字母

版本。所以如果你有一个
FooBarProcessor 和 BarFooProcessor 中
一个处理器组,它看起来像
这个:

 ActiveMessaging::Gateway.define do |s|
      ...
      s.processor_group:my_group,:foo_bar_processor,:bar_foo_processor
    结尾

处理器组被传递到轮询器中,如下所示:

    ./script/poller 启动 --process-group=my_group

The activemq.prefetchSize is only available on a SUBSCRIBE message, not a CONNECT, according to the ActiveMQ docs for their extended stomp headers (http://activemq.apache.org/stomp.html). Here is the relevant info:

verb: SUBSCRIBE

header: activemq.prefetchSize

type: int

description: Specifies the maximum
number of pending messages that will
be dispatched to the client. Once this
maximum is reached no more messages
are dispatched until the client
acknowledges a message. Set to 1 for
very fair distribution of messages
across consumers where processing
messages can be slow.

My reading and experience with this, is that since M1 has not been ack'd (b/c you have client ack turned on), that this M1 should be the 1 message allowed by prefetchSize=1 set on the subscription. I am surprised to hear that it didn't work, but perhaps I need to run a more detailed test. Your settings should be correct for the behavior you want.

I have heard of flakiness from others about the activemq dispatch, so it is possible this is a bug with the version you are using.

One suggestion I would have is to either sniff the network traffic to see if the M1 is getting ack'd for some reason, or throw some puts statements into the ruby stomp gem to watch the communication (this is what I usually end up doing when debugging stomp problems).

If I get a chance to try this out, I'll update my comment with my own results.

One suggestion: It is very possible that multiple long processing messages could be sent, and if the number of long processing messages exceeds your number of processes, you'll be in this fix where quick processing messages are waiting.

I tend to have at least one dedicated process that just does quick jobs, or to put it another way, dedicate a set # of processes that just do longer jobs. Having all poller consumer processes listen to both long and short can end up with sub-optimal results no matter what dispatch does. Process groups are the way to configure a consumer to listen to a subset of destinations: http://code.google.com/p/activemessaging/wiki/Configuration

processor_group name,
*list_of_processors

A processor group is a way to run the poller to only execute a subset of

the processors by passing the name of
the group in the poller command line
arguments.

You specify the name of the processor as its underscored lowercase

version. So if you have a
FooBarProcessor and BarFooProcessor in
a processor group, it would look like
this:

    ActiveMessaging::Gateway.define do |s|
      ...
      s.processor_group :my_group, :foo_bar_processor, :bar_foo_processor
    end

The processor group is passed into the poller like the following:

    ./script/poller start -- process-group=my_group
无戏配角 2024-09-11 03:35:38

我不确定 ActiveMessaging 是否支持此功能,但您可以在长处理消息到达时取消订阅其他消费者,然后在处理后重新订阅它们。

它应该会给你带来想要的效果。

I'm not sure if ActiveMessaging supports this, but you could unsubscribe your other consumers when the long processing message arrives and then re-subscribe them after it get processed.

It should give you the desired effect.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文