如何在使用 apache qpid 时处理 jms 消息传递中的生产者流控制

发布于 2024-12-26 12:09:22 字数 1529 浏览 9 评论 0原文

我正在尝试处理生产者端的流量控制情况。 我在 qpid-broker 上有一个队列,并设置了最大队列大小。还在队列上设置 flow_stop_count 和 flow_resume_count。

现在生产者继续不断地生产消息,直到达到这个 flow_stop_count 。一旦违反此计数,就会引发异常,并由异常侦听器处理。 现在,稍后队列中的消费者将赶上并且将达到 flow_resume_count 。问题是生产者如何知道这个事件。

的示例代码

    connection connection = connectionFactory.createConnection();
    connection.setExceptionListenr(new MyExceptionListerner());
    connection.start();
    Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
    Queue queue = (Queue)context.lookup("Test");
    MessageProducer producer = session.createProducer(queue);
    while(notStopped){
        while(suspend){//---------------------------how to resume this flag???
            Thread.sleep(1000);
        }
        TextMessage message = session.createTextMessage();
        message.setText("TestMessage");
        producer.send(message);
    }
    session.close();
    connection.close();

下面是生产者和异常侦听器

    private class MyExceptionListener implements ExceptionListener {
    public void onException(JMSException e) {
        System.out.println("got exception:" + e.getMessage());
        suspend=true;
    }
}

现在,异常侦听器是异常的通用侦听器,因此通过它暂停生产者流不应该是一个好主意。

我需要的可能是生产者级别的某种方法,例如 produer.isFlowStopped() ,我可以在发送消息之前使用它进行检查。 qpid api.js 中是否存在这样的功能?

qpid 网站上有一些文档表明可以做到这一点。但我在任何地方都找不到任何这样做的例子。

是否有一些标准方法来处理这种情况。

I am trying to handle flow control situation on producer end.
I have a queue on a qpid-broker with a max queue-size set. Also have flow_stop_count and flow_resume_count set on the queue.

now at the producer keeps on continuously producing messages until this flow_stop_count is reached. Upon breach of this count, an exception is thrown which is handled by Exception listener.
Now sometime later the consumer on queue will catch up and the flow_resume_count will be reached. The question is how does the producer know of this event.

Here's a sample code of the producer

    connection connection = connectionFactory.createConnection();
    connection.setExceptionListenr(new MyExceptionListerner());
    connection.start();
    Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
    Queue queue = (Queue)context.lookup("Test");
    MessageProducer producer = session.createProducer(queue);
    while(notStopped){
        while(suspend){//---------------------------how to resume this flag???
            Thread.sleep(1000);
        }
        TextMessage message = session.createTextMessage();
        message.setText("TestMessage");
        producer.send(message);
    }
    session.close();
    connection.close();

and for the exception listener

    private class MyExceptionListener implements ExceptionListener {
    public void onException(JMSException e) {
        System.out.println("got exception:" + e.getMessage());
        suspend=true;
    }
}

Now the exceptionlistener is a generic listener for exceptions, so it should not be a good idea to suspend the producer flow through that.

What I need is perhaps some method on the producer level , something like produer.isFlowStopped() which I can use to check before sending a message. Does such a functionality exist in qpid api.

There is some documentation on the qpid website which suggest this can be done. But I couldn't find any examples of this being done anywhere.

Is there some standard way of handling this kind of scenario.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

月棠 2025-01-02 12:09:22

从我从 Apache QPid 文档中读到的内容来看,flow_resume_count 和 flow_stop_count 似乎会导致生产者开始被阻止。

因此,唯一的选择是软件明智地定期轮询,直到消息再次开始流动。

摘自此处

如果生产者发送到的队列已满,代理将通过指示客户端不再发送任何消息进行响应。这样做的影响是,任何未来的发送尝试都将被阻塞,直到代理撤销流量控制命令。

在阻塞时,客户端将定期记录它在等待流量控制时被阻塞的事实。

警告 AMQSession - 已强制执行代理强制流量控制
WARN AMQSession - 由于代理强制执行流量控制,消息发送延迟了 5 秒
WARN AMQSession - 由于代理强制执行流量控制,消息发送延迟了 10 秒
经过设定的时间后,发送将超时并向调用代码抛出 JMSException。

错误 AMQSession - 由于等待代理强制流量控制超时,消息发送失败。

从该文档中可以看出,管理生产者的软件必须进行自我管理。因此,基本上,当您收到队列已满的异常时,您将需要后退,并且很可能需要轮询并重新尝试发送消息。

From what I have read from the Apache QPid documentation it seems that the flow_resume_count and flow_stop_count will cause the producers to start getting blocked.

Therefore the only option would be to software wise to poll at regular intervals until the messages start flowing again.

Extract from here.

If a producer sends to a queue which is overfull, the broker will respond by instructing the client not to send any more messages. The impact of this is that any future attempts to send will block until the broker rescinds the flow control order.

While blocking the client will periodically log the fact that it is blocked waiting on flow control.

WARN AMQSession - Broker enforced flow control has been enforced
WARN AMQSession - Message send delayed by 5s due to broker enforced flow control
WARN AMQSession - Message send delayed by 10s due to broker enforced flow control
After a set period the send will timeout and throw a JMSException to the calling code.

ERROR AMQSession - Message send failed due to timeout waiting on broker enforced flow control.

From this documentation it implicates that the software managing the producer would then have to self manage. So basically when you receive an exception that the queue is overfull you will need to back off and most likely poll and reattempt to send your messages.

洛阳烟雨空心柳 2025-01-02 12:09:22

您可以尝试设置队列的容量(队列被认为已满的字节大小)和flowResumeCapacity(生产者不流动的队列大小)属性。

如果大小超过容量值,则 send() 将被阻塞。

你可以看看这个测试用例文件 在存储库中以获得一个想法。

You can try setting the capacity (size in bytes at which the queue is thought to be full ) and flowResumeCapacity (the queue size at which producers are unflowed) properties for a queue.

send() will then be blocked if the size exceeds the capacity value.

You can have a look at this test case file in the repo to get an idea.

黑寡妇 2025-01-02 12:09:22

JMS 客户端上尚未实现生产者流控制。
请参阅https://issues.apache.org/jira/browse/QPID-3388

Producer flow control is not yet implemented on the JMS client.
See https://issues.apache.org/jira/browse/QPID-3388

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文