如何防止ActiveMQ优先级队列上的低优先级消息饥饿?

发布于 2024-11-15 18:58:06 字数 237 浏览 3 评论 0原文

我正在开发一个需要实现优先队列的系统。我们有不同优先级的消息,我们需要根据优先级处理消息。现在,出于多种原因,我们希望使用 ActiveMQ 作为我们的队列技术,其中之一是支持优先级队列。

对于 ActiveMQ 中的优先级队列,处理饥饿问题的最佳方法是什么?具体来说,我们需要确保即使较高优先级的消息继续充斥队列,低优先级的消息最终也能得到处理。 ActiveMQ 有内置的东西吗?或者我们是否需要构建自己的东西来随着消息的老化而提高优先级?

I am working on a system where we need to implement a prioritized queue. We have messages with different priorities and we need to process messages based on priority. Right now, we are looking to use ActiveMQ as our queuing technology for many reasons, one of which is that is supports priority queues.

With a priority queue in ActiveMQ, what is the best way of dealing with starvation? To be specific, we need to ensure that even a low priority message eventually gets processed even if higher priority messages continue to flood the queue. Does ActiveMQ have something built-in? Or do we need to build something of our own to increase the priority as the message ages?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

情栀口红 2024-11-22 18:58:06

做到这一点的基本方法是当消息变旧时提高优先级,

这样,一小时前的低优先级消息的优先级高于

public class Message implements Comparable<Message>{

    private final long time;//timestamp from creation (can be altered to insertion in queue) in millis the lower this value the older the message (and more important that it needs to be handled)
    private final int pr;//priority the higher the value the higher the priority

    /**
     * the offset that the priority brings currently set for 3 hours 
     *
     * meaning a message with pr==1 has equal priority than a message with pr==0 from 3 hours ago
     */
    private static final long SHIFT=3*60*60*1000; 

    public Message(int priority){
        this.pr=priority;
        this.time = System.currentTimeMillis();
    }

    //I'm assuming here the priority sorting is done with natural ordering
    public boolean compareTo(Message other){
        long th = this.time-this.pr*SHIFT;
        long ot = other.time-other.pr*SHIFT;
        if(th<ot)return 1;
        if(th>ot)return -1;
        return 0;
    }

}

评论中指出的新的高优先级消息,但是来自低优先级消息的洪水几个小时前将暂时缺乏新的高优先级消息,并且将这些消息正确地隔开将需要更复杂的方法,


另一种方法是使用多个队列,每个优先级一个,并从低优先级队列中取出多个队列。优先级队列

最后一种方法仅适用于少量情况优先级,而我提供的第一个方法可以处理任意数量的优先级

a basic way to do this is to bump up the priority when the message gets older

this way a low priority message from say an hour ago is higher priority then a new high priority message

public class Message implements Comparable<Message>{

    private final long time;//timestamp from creation (can be altered to insertion in queue) in millis the lower this value the older the message (and more important that it needs to be handled)
    private final int pr;//priority the higher the value the higher the priority

    /**
     * the offset that the priority brings currently set for 3 hours 
     *
     * meaning a message with pr==1 has equal priority than a message with pr==0 from 3 hours ago
     */
    private static final long SHIFT=3*60*60*1000; 

    public Message(int priority){
        this.pr=priority;
        this.time = System.currentTimeMillis();
    }

    //I'm assuming here the priority sorting is done with natural ordering
    public boolean compareTo(Message other){
        long th = this.time-this.pr*SHIFT;
        long ot = other.time-other.pr*SHIFT;
        if(th<ot)return 1;
        if(th>ot)return -1;
        return 0;
    }

}

as noted in the comments however a flood from low prio messages from several hours ago will temporarily starve the new high prio messages and to space those properly out will require a more sophisticated method


another method is using multiple queues, one for each priority and taking several out of the higher priority queue for each taken out of the low priority queue

this last method is only really viable for a low amount of priorities while the first method I provided can handle an arbitrary amount of priorities

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文