如何制作一个保留 FIFO 行为的 Java PriorityBlockingQueue?

发布于 2024-11-19 13:07:45 字数 4395 浏览 5 评论 0原文

我正在尝试在 Java 中创建一个优先级阻塞队列,以维护具有相同优先级的元素的 FIFO 顺序。 Oracle 文档对此提供了一些帮助,但我仍然很困惑。

我应该指出,以下主题对我来说都是非常新的:泛型接口作为类型静态嵌套类。所有这些都在下面的类定义中发挥作用。泛型尤其令人困惑,我确信我在这里完全搞砸了它们。

我已添加注释来识别我当前遇到的编译器错误。

几个具体问题:

  1. 可以让类代表排队事件对象,而实际队列是静态类成员吗?

  2. 将 Oracle 的 FIFO 事件“包装器”作为静态嵌套类包含在内是否合理?

  3. 我是否至少在正确的轨道上,在一个外部类中完成这一切?

这是我写的课程:

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class FIFOPBQEvent {

/**
 * First we define a static nested class which, when instantiated,
 * encapsulates the "guts" of the event - a FIFOPBQEvent - along with
 * a sequence number that assures FIFO behavior of events of like priority.
 *  
 * The following is lifted ALMOST verbatim (I added "static" and some 
 * comments) from Oracle documentation on adding FIFO-ness to a Priority
 * Blocking Queue: 
 * http://download.oracle.com/javase/6/docs/api/java/util/concurrent/PriorityBlockingQueue.html
 * As the Oracle doc points out:
 * 
 * "A static nested class interacts with the instance members of its outer 
 * class (and other classes) just like any other top-level class. In 
 * effect, a static nested class is behaviorally a top-level class that 
 * has been nested in another top-level class for packaging convenience."
 *
 */
static class FIFOEntry<E extends Comparable<? super E>> implements
        Comparable<FIFOEntry<E>> {
    final static AtomicLong seq = new AtomicLong();
    final long seqNum;  // instance
    final E entry;

    public FIFOEntry(E entry) {
        seqNum = seq.getAndIncrement();
        this.entry = entry;
    }

    public E getEntry() {
        return entry;
    }
    /** Here is implementation of Comparable */
    public int compareTo(FIFOEntry<E> other) {
        int res = entry.compareTo(other.entry);
        if (res == 0 && other.entry != this.entry)
            res = (seqNum < other.seqNum ? -1 : 1);
        return res;
    }
}

/**
 * Now we declare a single (static) PBQ of FIFO entries into which 
 * PBQFIFOEvents will be added and removed.
 */

/** FLAGGED AS ERROR BY COMPILER */
// Bound mismatch: The type FIFOPBQEvent is not a valid substitute for the
// bounded parameter <E extends Comparable<? super E>> of the type 
// FIFOPBQEvent.FIFOEntry<E>

private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue =
    PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>();

/** 
 * And here are the "guts" of our event: the i.d. and state of the GUI widget 
 */
private ConsoleObject obj = ConsoleObject.UNDEFINED_OBJ; // widget that was affected
private ObjectState state = ObjectState.UNDEFINED_STATE; // the widget's new state

/** 
 * Constructor specifying the class variables 
 */
public FIFOPBQEvent(ConsoleObject theObj, ObjectState theState) {
    obj = theObj;
    state = theState;
}

/**
 * Event queuing ("sending") and dequeuing ("receiving")
 */
public void sendEvent() {

    /** FLAGGED AS ERROR BY COMPILER */
    // The method put(FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>) in the type 
    // PriorityBlockingQueue<FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>> is not 
    // applicable for the arguments (FIFOPBQEvent)

    theQueue.put(this);
}

public static FIFOPBQEvent receiveEvent() {

    /** FLAGGED AS ERROR BY COMPILER */
    // Type mismatch: cannot convert from FIFOPBQEvent.FIFOEntry<FIFOPBQEvent> 
    // to FIFOPBQEvent

    FIFOPBQEvent event = theQueue.take();
    return event;
}

/**
 * ConsoleEvent accessors
 */
public ConsoleObject getObj() {
    return this.obj;
}

public ObjectState getState() {
    return this.state;
}

/**
 * And for the first time, enums instead of public static final ints.
 */
public enum ConsoleObject {
    UNDEFINED_OBJ,
    RESERVED,

    /** Console keys */
    RESET,
    DISPLAY_MAR,
    SAVE,
    INSERT,
    RELEASE,
    START,
    SIE,
    SCE,

    /** Console toggle switches */
    POWER,
    PARITY_CHECK,
    IO_CHECK,
    OVERFLOW_CHECK,
    SENSE_SWITCH_1,
    SENSE_SWITCH_2,
    SENSE_SWITCH_3,
    SENSE_SWITCH_4
}

public enum ObjectState {
    UNDEFINED_STATE,

    /** Toggle switches */
    OFF,
    ON,

    /** Console keys */
    PRESSED,
}
}

I'm trying to create a Priority Blocking Queue in Java that maintains FIFO order for elements with the same priority. The Oracle doc provides some help with that, but I'm still very tangled up.

I should note that the following topics are all very new to me: Generics, Interfaces as Types, and static nested classes. All of these come into play in the following class definition. Generics, especially, are confusing, and I'm sure I've totally messed up with them here.

I have included comments to identify the compiler errors I am currently getting.

Several specific questions:

  1. Is it okay to have the class represent the queued event object, with the actual queue being a static class member?

  2. Was it reasonable to have included Oracle's FIFO event "wrapper" as a static nested class?

  3. Am I at least on the right track here, doing it all in one outer class?

Here is the class that I've written:

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class FIFOPBQEvent {

/**
 * First we define a static nested class which, when instantiated,
 * encapsulates the "guts" of the event - a FIFOPBQEvent - along with
 * a sequence number that assures FIFO behavior of events of like priority.
 *  
 * The following is lifted ALMOST verbatim (I added "static" and some 
 * comments) from Oracle documentation on adding FIFO-ness to a Priority
 * Blocking Queue: 
 * http://download.oracle.com/javase/6/docs/api/java/util/concurrent/PriorityBlockingQueue.html
 * As the Oracle doc points out:
 * 
 * "A static nested class interacts with the instance members of its outer 
 * class (and other classes) just like any other top-level class. In 
 * effect, a static nested class is behaviorally a top-level class that 
 * has been nested in another top-level class for packaging convenience."
 *
 */
static class FIFOEntry<E extends Comparable<? super E>> implements
        Comparable<FIFOEntry<E>> {
    final static AtomicLong seq = new AtomicLong();
    final long seqNum;  // instance
    final E entry;

    public FIFOEntry(E entry) {
        seqNum = seq.getAndIncrement();
        this.entry = entry;
    }

    public E getEntry() {
        return entry;
    }
    /** Here is implementation of Comparable */
    public int compareTo(FIFOEntry<E> other) {
        int res = entry.compareTo(other.entry);
        if (res == 0 && other.entry != this.entry)
            res = (seqNum < other.seqNum ? -1 : 1);
        return res;
    }
}

/**
 * Now we declare a single (static) PBQ of FIFO entries into which 
 * PBQFIFOEvents will be added and removed.
 */

/** FLAGGED AS ERROR BY COMPILER */
// Bound mismatch: The type FIFOPBQEvent is not a valid substitute for the
// bounded parameter <E extends Comparable<? super E>> of the type 
// FIFOPBQEvent.FIFOEntry<E>

private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue =
    PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>();

/** 
 * And here are the "guts" of our event: the i.d. and state of the GUI widget 
 */
private ConsoleObject obj = ConsoleObject.UNDEFINED_OBJ; // widget that was affected
private ObjectState state = ObjectState.UNDEFINED_STATE; // the widget's new state

/** 
 * Constructor specifying the class variables 
 */
public FIFOPBQEvent(ConsoleObject theObj, ObjectState theState) {
    obj = theObj;
    state = theState;
}

/**
 * Event queuing ("sending") and dequeuing ("receiving")
 */
public void sendEvent() {

    /** FLAGGED AS ERROR BY COMPILER */
    // The method put(FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>) in the type 
    // PriorityBlockingQueue<FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>> is not 
    // applicable for the arguments (FIFOPBQEvent)

    theQueue.put(this);
}

public static FIFOPBQEvent receiveEvent() {

    /** FLAGGED AS ERROR BY COMPILER */
    // Type mismatch: cannot convert from FIFOPBQEvent.FIFOEntry<FIFOPBQEvent> 
    // to FIFOPBQEvent

    FIFOPBQEvent event = theQueue.take();
    return event;
}

/**
 * ConsoleEvent accessors
 */
public ConsoleObject getObj() {
    return this.obj;
}

public ObjectState getState() {
    return this.state;
}

/**
 * And for the first time, enums instead of public static final ints.
 */
public enum ConsoleObject {
    UNDEFINED_OBJ,
    RESERVED,

    /** Console keys */
    RESET,
    DISPLAY_MAR,
    SAVE,
    INSERT,
    RELEASE,
    START,
    SIE,
    SCE,

    /** Console toggle switches */
    POWER,
    PARITY_CHECK,
    IO_CHECK,
    OVERFLOW_CHECK,
    SENSE_SWITCH_1,
    SENSE_SWITCH_2,
    SENSE_SWITCH_3,
    SENSE_SWITCH_4
}

public enum ObjectState {
    UNDEFINED_STATE,

    /** Toggle switches */
    OFF,
    ON,

    /** Console keys */
    PRESSED,
}
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

浅浅淡淡 2024-11-26 13:07:46

第一个错误是更严重的错误。发生这种情况是因为 FIFOPBQEvent 类未实现 Comparable,它必须被视为 FIFOEntry 嵌套类的泛型类型。这是因为您限制了 E 并说它扩展了 Comparable<...>。基本上,您的 FIFOPBQEvent 类必须具有可比性才能为队列提供优先级(大概基于事件类型)。

要修复该错误,您需要:

  1. 将类的标头更改为:

    公共类 FIFOPBQEvent 实现 Comparable {
    
  2. FIFOPBQEvent 类中添加一个 compareTo 方法;像这样:

    public int CompareTo (FIFOPBQEvent other) {
        // TODO - 比较这个和其他的优先级
        返回0;
    }
    

然后您需要将您的条目包装在 sendEvent 方法中:

public void sendEvent () {
    theQueue.put(new FIFOEntry<FIFOPBQEvent> (this));
}

最后一个小错误只是您没有解开 FIFOEntry 对象。要解决此问题,请将 receiveEvent 更改为:

public static FIFOPBQEvent receiveEvent () {
    FIFOPBQEvent event = theQueue.take ().getEntry ();
    return event;
}

The first error is the more significant error. It occurs because the FIFOPBQEvent class doesn't implement Comparable, which it must to be considered as the generic type for the FIFOEntry nested class. This is because you restrict E and say that it extends Comparable<...>. Basically, your FIFOPBQEvent class must be comparable to provide the priority for the queue (presumably based on the event type).

To fix the error, you need to:

  1. Change the header of your class to:

    public class FIFOPBQEvent implements Comparable<FIFOPBQEvent> {
    
  2. add a compareTo method in the FIFOPBQEvent class; something like:

    public int compareTo (FIFOPBQEvent other) {
        // TODO - compare this and other for priority
        return 0;
    }
    

Then you need to wrap your entry in your sendEvent method:

public void sendEvent () {
    theQueue.put(new FIFOEntry<FIFOPBQEvent> (this));
}

The last, minor, error is simply that you aren't unwrapping the FIFOEntry object. To fix this, change receiveEvent to:

public static FIFOPBQEvent receiveEvent () {
    FIFOPBQEvent event = theQueue.take ().getEntry ();
    return event;
}
惯饮孤独 2024-11-26 13:07:46

让我们单步执行您的代码。

static class FIFOEntry<E extends Comparable<? super E>> implements 
  Comparable<FIFOEntry<E>> {

这定义了带有通用参数的 FIFOEntry 类。您已将泛型参数的类型限制为“任何实现自身 Comparable 的对象”。

private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue =
  PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>();

您对 PriorityBlockingQueue 的声明在这里并不不正确,但您对 FIFOEntry 的定义不正确。这是因为上述一点 - 您已将 FIFOEntry 的类型限制为实现 Comparable 本身的任何类型,即它应该是

class FIFOPBQEvent implements Comparable<FIFOPBQEvent> {

您的下一个问题是 -

public void sendEvent() {
  theQueue.put(this);
}

this 的类型是FIFOPBQEvent 但队列只接受 FIFOEntry 对象。为了匹配您的队列签名,它应该是:

public void sendEvent() {
  // Requires FIFOPBQEvent to be Comparable
  theQueue.put(new FIFOEntry<FIFOPBQEvent>(this));     
}

您在 receiveEvent() 上也有同样的问题 - 您的队列签名表明队列包含 FIFOEntry 对象,并且您正在尝试提取 FIFOPBQEvents。

Let's step through your code.

static class FIFOEntry<E extends Comparable<? super E>> implements 
  Comparable<FIFOEntry<E>> {

This defines the class FIFOEntry which takes a generic parameter. You have constrained the type of generic parameter to "Any object that implements Comparable of itself".

private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue =
  PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>();

Your declaration of PriorityBlockingQueue is not incorrect here, but your definition of FIFOEntry<FIFOPBQEvent> is incorrect. This is because of the above point - you have restricted the type of FIFOEntry to anything that implements Comparable of itself i.e. it should be

class FIFOPBQEvent implements Comparable<FIFOPBQEvent> {

Your next problem is -

public void sendEvent() {
  theQueue.put(this);
}

The type of this is FIFOPBQEvent but the queue only accepts FIFOEntry objects. To match your Queue signature it should be:

public void sendEvent() {
  // Requires FIFOPBQEvent to be Comparable
  theQueue.put(new FIFOEntry<FIFOPBQEvent>(this));     
}

You have the same problem on receiveEvent() too - your Queue signature says that the Queue contains FIFOEntry objects and you are trying to pull out FIFOPBQEvents.

美煞众生 2024-11-26 13:07:46

根据@101100的建议,我重新设计了设计,将队列与事件解耦。这似乎使它更简单、更容易理解(和重用),但遗憾的是我仍然不清楚一些概念。接下来是 PriorityFIFOEventQueue(为简洁起见,我省略了 Event 类)。我已经注意到我仍然需要一些帮助的地方:

package ibm1620;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

/**
 * This class represents a Priority Blocking Queue of FIFO entries.  Instantiating
 * it creates a queue.  It provides instance methods for sending and receiving
 * entries, a.k.a. events of type E, on the queue.
 */

以下标记有诊断:“类型 PriorityFIFOEventQueue 必须实现继承的抽象方法 Comparable>.compareTo (PriorityFIFOEventQueue)”

我很确定我不想比较 < em>队列!!仍然不确定我在这里需要什么。

public class PriorityFIFOEventQueue<E extends Comparable<? super E>> implements Comparable<PriorityFIFOEventQueue<E>> {

/**
 * FIFOEntry is a static nested class that wraps an Entry and provides support for
 * keeping the Entries in FIFO sequence.
 */
private static class FIFOEntry<E> implements Comparable<FIFOEntry<E>> {

    /**
     * There's going to be one atomic seq per ... queue?  runtime?
     */

    final static AtomicLong seq = new AtomicLong();

    /**
     * FIFOEntry is simply an entry of type E, and a sequence number
     */
    final long seqNum; 
    final E    entry;

    /**
     * FIFOEntry() constructor
     */
    FIFOEntry(E entry) {
        seqNum = seq.getAndIncrement();
        this.entry = entry;
    }

    /**
     * Accessor method for entry
     */
    E getEntry() {
        return entry;
    }

    /**
     * Here is implementation of Comparable that is called directly by PBQ.
     * We first invoke E's comparator which compares based on priority. 
     * If equal priority, order by sequence number (i.e. maintain FIFO).
     * */

在下面,包含“compareTo”的行被标记,并且诊断是
“对于类型 E 来说,compareTo(E) 方法未定义”。显然我还没有告诉编译器
“其他”FIFOEntry 实现了 Comparable。

    public int compareTo(FIFOEntry<E> other) {
        int res = entry.compareTo(other.entry); // priority-based compare
        if (res == 0 && other.entry != this.entry)
            res = (seqNum < other.seqNum ? -1 : 1); // FIFO compare
        return res;
    }
}

/**
 * Here is the PriorityBlockingQueue of FIFOEntries
 */

private PriorityBlockingQueue<FIFOEntry<E>> theQueue = 
    new PriorityBlockingQueue<FIFOEntry<E>>();

/**
 * Event queuing ("sending") and dequeuing ("receiving")
 */
public void sendEvent(E event) {
    theQueue.put(new FIFOEntry<E>(event));
}

/**
 * pollEvent() 
 * Will remove and return a ConsoleEvent from the queue, or return
 * null if queue is empty.
 */
public E pollEvent() {
    E event = null;
    FIFOEntry<E> aFIFOEvent = theQueue.poll();
    if (aFIFOEvent != null) {
        event = aFIFOEvent.getEntry();
        say("pollEvent() -> "+event);
    }
    else {
    }
    return event;
}

/**
 * receiveEvent() 
 * Will block if queue is empty.  Takes a FIFOEvent off the queue,
 * unwraps it and returns the 'E' payload.
 * 
 * @return
 */
public E receiveEvent() {
    E event = null;
    try {
        FIFOEntry<E> aFIFOEvent = theQueue.take();
        if (aFIFOEvent != null) {
            event = aFIFOEvent.getEntry();
            say("receiveEvent() -> "+event);
        }

    } catch (InterruptedException e) {
        say("InterruptedException in receiveEvent()");
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    return event;
}

// for console debugging
static void say(String s) {
    System.out.println("ConsoleEvent: " + s);
}

}

Taking @101100's recommendation, I have reworked the design, decoupling the queue from the events. That seems to make it much simpler and easier to understand (and reuse), but sadly I'm still unclear on some concepts. What follows here is the PriorityFIFOEventQueue (I've omitted the Event class for brevity). And I've noted where I still need some help:

package ibm1620;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

/**
 * This class represents a Priority Blocking Queue of FIFO entries.  Instantiating
 * it creates a queue.  It provides instance methods for sending and receiving
 * entries, a.k.a. events of type E, on the queue.
 */

The following is flagged with diagnostic: "The type PriorityFIFOEventQueue must implement the inherited abstract method Comparable>.compareTo (PriorityFIFOEventQueue)"

I'm pretty sure I don't want to compare queues !! Still not sure what I need here.

public class PriorityFIFOEventQueue<E extends Comparable<? super E>> implements Comparable<PriorityFIFOEventQueue<E>> {

/**
 * FIFOEntry is a static nested class that wraps an Entry and provides support for
 * keeping the Entries in FIFO sequence.
 */
private static class FIFOEntry<E> implements Comparable<FIFOEntry<E>> {

    /**
     * There's going to be one atomic seq per ... queue?  runtime?
     */

    final static AtomicLong seq = new AtomicLong();

    /**
     * FIFOEntry is simply an entry of type E, and a sequence number
     */
    final long seqNum; 
    final E    entry;

    /**
     * FIFOEntry() constructor
     */
    FIFOEntry(E entry) {
        seqNum = seq.getAndIncrement();
        this.entry = entry;
    }

    /**
     * Accessor method for entry
     */
    E getEntry() {
        return entry;
    }

    /**
     * Here is implementation of Comparable that is called directly by PBQ.
     * We first invoke E's comparator which compares based on priority. 
     * If equal priority, order by sequence number (i.e. maintain FIFO).
     * */

In the following, the line containing "compareTo" is flagged, and the diagnostic is
"The method compareTo(E) is undefined for the type E". Apparently I haven't told the compiler
that the "other" FIFOEntry implements Comparable.

    public int compareTo(FIFOEntry<E> other) {
        int res = entry.compareTo(other.entry); // priority-based compare
        if (res == 0 && other.entry != this.entry)
            res = (seqNum < other.seqNum ? -1 : 1); // FIFO compare
        return res;
    }
}

/**
 * Here is the PriorityBlockingQueue of FIFOEntries
 */

private PriorityBlockingQueue<FIFOEntry<E>> theQueue = 
    new PriorityBlockingQueue<FIFOEntry<E>>();

/**
 * Event queuing ("sending") and dequeuing ("receiving")
 */
public void sendEvent(E event) {
    theQueue.put(new FIFOEntry<E>(event));
}

/**
 * pollEvent() 
 * Will remove and return a ConsoleEvent from the queue, or return
 * null if queue is empty.
 */
public E pollEvent() {
    E event = null;
    FIFOEntry<E> aFIFOEvent = theQueue.poll();
    if (aFIFOEvent != null) {
        event = aFIFOEvent.getEntry();
        say("pollEvent() -> "+event);
    }
    else {
    }
    return event;
}

/**
 * receiveEvent() 
 * Will block if queue is empty.  Takes a FIFOEvent off the queue,
 * unwraps it and returns the 'E' payload.
 * 
 * @return
 */
public E receiveEvent() {
    E event = null;
    try {
        FIFOEntry<E> aFIFOEvent = theQueue.take();
        if (aFIFOEvent != null) {
            event = aFIFOEvent.getEntry();
            say("receiveEvent() -> "+event);
        }

    } catch (InterruptedException e) {
        say("InterruptedException in receiveEvent()");
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    return event;
}

// for console debugging
static void say(String s) {
    System.out.println("ConsoleEvent: " + s);
}

}
宫墨修音 2024-11-26 13:07:46

这是 PriorityBlockingQueue 的实际替代品,它维护具有相同优先级的项目的 FIFO 排序。它为用户透明地完成所有包装/展开。

此代码是为 1.4 JVM 编写的,并使用 juc 向后移植。在较新的 JVM 中使用它并添加泛型应该很简单。

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;

import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.PriorityBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;

/**
 * A queue with all properties of a {@link PriorityBlockingQueue} but which additionally 
 * returns items with equal priority 
 * in a FIFO order.
 * In this respect, {@link PriorityBlockingQueue} explicitly gives no return order guarantees for equal priority elements.
 * 
 * This queue only accepts {@link Comparable} items. A custom {@link Comparator} cannot
 * be specified at construction time.
 * 
 *
 */
public final class FIFOPriorityBlockingQueue implements BlockingQueue {
    private final PriorityBlockingQueue q;

    public FIFOPriorityBlockingQueue() {
        q = new PriorityBlockingQueue();
    }

    public FIFOPriorityBlockingQueue(int initialCapacity) {
        q = new PriorityBlockingQueue(initialCapacity);
    }

    public boolean isEmpty() {
        return q.isEmpty();
    }

    public boolean addAll(Collection c) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    boolean modified = false;
    Iterator e = c.iterator();
    while (e.hasNext()) {
        if (add(e.next()))
        modified = true;
    }
    return modified;
    }

    /**
     * Always returns <code>null</code> as this {@link BlockingQueue} only accepts
     * {@link Comparable} objects and doesn't allow setting an own {@link Comparator} 
     * @return
     */
    public Comparator comparator() {
        return null;
    }

    public boolean containsAll(Collection c) {
        Iterator e = c.iterator();
        while (e.hasNext())
            if(!contains(e.next()))
            return false;

        return true;
    }

    public int size() {
        return q.size();
    }

    public int remainingCapacity() {
        return q.remainingCapacity();
    }

    public boolean removeAll(Collection c) {
        boolean modified = false;
        Iterator e = iterator();
        while (e.hasNext()) {
            if(c.contains(e.next())) {
            e.remove();
            modified = true;
            }
        }
        return modified;
    }

    public boolean retainAll(Collection c) {
        boolean modified = false;
        Iterator e = iterator();
        while (e.hasNext()) {
            if(!c.contains(e.next())) {
            e.remove();
            modified = true;
            }
        }
        return modified;
    }

    public Object remove() {
        return ((FIFOEntry)q.remove()).entry;
    }

    public Object element() {
        return ((FIFOEntry)q.element()).entry;
    }

    public boolean add(Object e) {
        return q.add(new FIFOEntry((Comparable)e));
    }

    public boolean offer(Object e) {
        return q.offer(new FIFOEntry((Comparable)e));
    }

    public void put(Object e) {
        q.put(new FIFOEntry((Comparable)e));
    }

    public boolean offer(Object e, long timeout, TimeUnit unit) {
        return q.offer(new FIFOEntry((Comparable)e), timeout, unit);
    }

    public Object poll() {
        return ((FIFOEntry)q.poll()).entry;
    }

    public Object take() throws InterruptedException {
        return ((FIFOEntry)q.take()).entry;
    }

    public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
        return ((FIFOEntry)q.poll(timeout, unit)).entry;
    }

    public Object peek() {
        return ((FIFOEntry)q.peek()).entry;
    }

    /**
     * If more than one equal objects are held by the queue, remove() will
     * remove any one of them, not necessarily the first or last inserted.
     */
    public boolean remove(Object o) {
        return q.remove(new FIFOEntry((Comparable)o));
    }

    public boolean contains(Object o) {
        return q.contains(new FIFOEntry((Comparable)o));
    }

    public Object[] toArray() {
        Object[] a = q.toArray();
        for (int i = 0; i < a.length; i++) { // unpacking
            a[i] = ((FIFOEntry)a[i]).entry;
        }
        return a;
    }

    public String toString() {
        return q.toString(); // ok, because each FIFOEntry.toString returns the toString of the entry 
    }

    public int drainTo(Collection c) {
        ArrayList tmp = new ArrayList(size());
        int n = q.drainTo(tmp);
        for (Iterator it = tmp.iterator(); it.hasNext();) {
            FIFOEntry en = (FIFOEntry) it.next();
            c.add(en.entry);
        }
        return n;
    }



    public int drainTo(Collection c, int maxElements) {
        ArrayList tmp = new ArrayList(size());
        int n = q.drainTo(tmp, maxElements);
        for (Iterator it = tmp.iterator(); it.hasNext();) {
            FIFOEntry en = (FIFOEntry) it.next();
            c.add(en.entry);
        }
        return n;
    }

    public void clear() {
        q.clear();
    }



    public Object[] toArray(Object[] a) {
        Object[] res = q.toArray(a);
        for (int i = 0; i < res.length; i++) { // unpacking
            res[i] = ((FIFOEntry)res[i]).entry;
        }
        return res;
    }



    public Iterator iterator() {
        final Iterator it = q.iterator();
        return new Iterator() {
            public void remove() throws UnsupportedOperationException, IllegalStateException, ConcurrentModificationException {
                it.remove();
            }

            public Object next() throws NoSuchElementException, ConcurrentModificationException {
                return ((FIFOEntry)it.next()).entry;
            }

            public boolean hasNext() {
                return it.hasNext();
            }
        };
    }

    public int hashCode() {
        return q.hashCode();
    }

    public boolean equals(Object obj) {
        return q.equals(obj);
    }


    /**
     * Wrapping class which adds creation ordering to a {@link Comparable} object.
     * Based on the code in the javadoc for {@link PriorityBlockingQueue}
     * 
     *
     */
    private static class FIFOEntry implements Comparable {
        private final static AtomicLong seq = new AtomicLong();
        private final long seqNum;
        private final Comparable entry;
        public FIFOEntry(Comparable entry) {
            seqNum = seq.getAndIncrement();
            this.entry = entry;
        }

        public int compareTo(Object other) {
            FIFOEntry o = (FIFOEntry) other;
            int res = entry.compareTo(o.entry);
            if (res == 0 && o.entry != this.entry)
                res = (seqNum < o.seqNum ? -1 : 1);
            return res;
        }
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((entry == null) ? 0 : entry.hashCode());
            return result;
        }
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            FIFOEntry other = (FIFOEntry) obj;
            if (entry == null) {
                if (other.entry != null)
                    return false;
            } else if (!entry.equals(other.entry))
                return false;
            return true;
        }

        public String toString() {
            return entry.toString();
        }
    }

}

Here is an actual replacement for PriorityBlockingQueue which maintains FIFO ordering for items with equal priority. It does all the wrapping/unwrapping transparently for the user.

This code was written for a 1.4 JVM and usese the j.u.c. backport. Using it in a newer JVM and adding generics should be straightforward.

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;

import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.PriorityBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;

/**
 * A queue with all properties of a {@link PriorityBlockingQueue} but which additionally 
 * returns items with equal priority 
 * in a FIFO order.
 * In this respect, {@link PriorityBlockingQueue} explicitly gives no return order guarantees for equal priority elements.
 * 
 * This queue only accepts {@link Comparable} items. A custom {@link Comparator} cannot
 * be specified at construction time.
 * 
 *
 */
public final class FIFOPriorityBlockingQueue implements BlockingQueue {
    private final PriorityBlockingQueue q;

    public FIFOPriorityBlockingQueue() {
        q = new PriorityBlockingQueue();
    }

    public FIFOPriorityBlockingQueue(int initialCapacity) {
        q = new PriorityBlockingQueue(initialCapacity);
    }

    public boolean isEmpty() {
        return q.isEmpty();
    }

    public boolean addAll(Collection c) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    boolean modified = false;
    Iterator e = c.iterator();
    while (e.hasNext()) {
        if (add(e.next()))
        modified = true;
    }
    return modified;
    }

    /**
     * Always returns <code>null</code> as this {@link BlockingQueue} only accepts
     * {@link Comparable} objects and doesn't allow setting an own {@link Comparator} 
     * @return
     */
    public Comparator comparator() {
        return null;
    }

    public boolean containsAll(Collection c) {
        Iterator e = c.iterator();
        while (e.hasNext())
            if(!contains(e.next()))
            return false;

        return true;
    }

    public int size() {
        return q.size();
    }

    public int remainingCapacity() {
        return q.remainingCapacity();
    }

    public boolean removeAll(Collection c) {
        boolean modified = false;
        Iterator e = iterator();
        while (e.hasNext()) {
            if(c.contains(e.next())) {
            e.remove();
            modified = true;
            }
        }
        return modified;
    }

    public boolean retainAll(Collection c) {
        boolean modified = false;
        Iterator e = iterator();
        while (e.hasNext()) {
            if(!c.contains(e.next())) {
            e.remove();
            modified = true;
            }
        }
        return modified;
    }

    public Object remove() {
        return ((FIFOEntry)q.remove()).entry;
    }

    public Object element() {
        return ((FIFOEntry)q.element()).entry;
    }

    public boolean add(Object e) {
        return q.add(new FIFOEntry((Comparable)e));
    }

    public boolean offer(Object e) {
        return q.offer(new FIFOEntry((Comparable)e));
    }

    public void put(Object e) {
        q.put(new FIFOEntry((Comparable)e));
    }

    public boolean offer(Object e, long timeout, TimeUnit unit) {
        return q.offer(new FIFOEntry((Comparable)e), timeout, unit);
    }

    public Object poll() {
        return ((FIFOEntry)q.poll()).entry;
    }

    public Object take() throws InterruptedException {
        return ((FIFOEntry)q.take()).entry;
    }

    public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
        return ((FIFOEntry)q.poll(timeout, unit)).entry;
    }

    public Object peek() {
        return ((FIFOEntry)q.peek()).entry;
    }

    /**
     * If more than one equal objects are held by the queue, remove() will
     * remove any one of them, not necessarily the first or last inserted.
     */
    public boolean remove(Object o) {
        return q.remove(new FIFOEntry((Comparable)o));
    }

    public boolean contains(Object o) {
        return q.contains(new FIFOEntry((Comparable)o));
    }

    public Object[] toArray() {
        Object[] a = q.toArray();
        for (int i = 0; i < a.length; i++) { // unpacking
            a[i] = ((FIFOEntry)a[i]).entry;
        }
        return a;
    }

    public String toString() {
        return q.toString(); // ok, because each FIFOEntry.toString returns the toString of the entry 
    }

    public int drainTo(Collection c) {
        ArrayList tmp = new ArrayList(size());
        int n = q.drainTo(tmp);
        for (Iterator it = tmp.iterator(); it.hasNext();) {
            FIFOEntry en = (FIFOEntry) it.next();
            c.add(en.entry);
        }
        return n;
    }



    public int drainTo(Collection c, int maxElements) {
        ArrayList tmp = new ArrayList(size());
        int n = q.drainTo(tmp, maxElements);
        for (Iterator it = tmp.iterator(); it.hasNext();) {
            FIFOEntry en = (FIFOEntry) it.next();
            c.add(en.entry);
        }
        return n;
    }

    public void clear() {
        q.clear();
    }



    public Object[] toArray(Object[] a) {
        Object[] res = q.toArray(a);
        for (int i = 0; i < res.length; i++) { // unpacking
            res[i] = ((FIFOEntry)res[i]).entry;
        }
        return res;
    }



    public Iterator iterator() {
        final Iterator it = q.iterator();
        return new Iterator() {
            public void remove() throws UnsupportedOperationException, IllegalStateException, ConcurrentModificationException {
                it.remove();
            }

            public Object next() throws NoSuchElementException, ConcurrentModificationException {
                return ((FIFOEntry)it.next()).entry;
            }

            public boolean hasNext() {
                return it.hasNext();
            }
        };
    }

    public int hashCode() {
        return q.hashCode();
    }

    public boolean equals(Object obj) {
        return q.equals(obj);
    }


    /**
     * Wrapping class which adds creation ordering to a {@link Comparable} object.
     * Based on the code in the javadoc for {@link PriorityBlockingQueue}
     * 
     *
     */
    private static class FIFOEntry implements Comparable {
        private final static AtomicLong seq = new AtomicLong();
        private final long seqNum;
        private final Comparable entry;
        public FIFOEntry(Comparable entry) {
            seqNum = seq.getAndIncrement();
            this.entry = entry;
        }

        public int compareTo(Object other) {
            FIFOEntry o = (FIFOEntry) other;
            int res = entry.compareTo(o.entry);
            if (res == 0 && o.entry != this.entry)
                res = (seqNum < o.seqNum ? -1 : 1);
            return res;
        }
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((entry == null) ? 0 : entry.hashCode());
            return result;
        }
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            FIFOEntry other = (FIFOEntry) obj;
            if (entry == null) {
                if (other.entry != null)
                    return false;
            } else if (!entry.equals(other.entry))
                return false;
            return true;
        }

        public String toString() {
            return entry.toString();
        }
    }

}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文