Thread.yield() 有更好的解决方案吗?

发布于 2024-12-11 17:21:32 字数 2426 浏览 0 评论 0原文

我创建了一个扩展 ArrayBlockingQueue 的 CloseableBlockingQueue:

private static class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> {
// Flag indicates closed state.
private volatile boolean closed = false;

public CloseableBlockingQueue(int queueLength) {
  super(queueLength);
}

/***
 * Shortcut to do nothing if closed.
 */
@Override
public boolean offer(E e) {
  return closed ? true : super.offer(e);
}

/***
 * Shortcut to do nothing if closed.
 */
@Override
public void put(E e) throws InterruptedException {
  if (!closed) {
    super.put(e);
  }
}

/***
 * Shortcut to do nothing if closed.
 */
@Override
public E poll() {
  return closed ? null : super.poll();
}

/***
 * Shortcut to do nothing if closed.
 * @throws InterruptedException 
 */
@Override
public E poll(long l, TimeUnit tu) throws InterruptedException {
  return closed ? null : super.poll(l, tu);
}

/***
 * Mark me as closed and clear the queue.
 */
void close() {
  closed = true;
  // There could be more threads blocking on my queue than there are slots 
  // in the queue. We therefore MUST loop.
  do {
    // so keep clearing
    clear();
    /* Let others in ... more specifically, any collectors blocked on the 
     * queue should now unblock and finish their put.
     * 
     * Subsequent puts should shortcut but it is the waiting ones I need
     * to clear.
     */
    Thread.yield();
    /* Debug code.
    // Did yield achieve?
    if (!super.isEmpty()) {
     * This DOES report success.
      log("! Thread.yield() successful");
    }
     * 
     */
    // Until all are done.
  } while (!super.isEmpty());
}

/***
 * isClosed
 * 
 * @return true if closed.
 */
boolean isClosed() {
  return closed;
}
}

我关心的是 close 方法,它试图将队列上阻塞的任何线程重新启动。我使用 Thread.yield() 来尝试这样做,但我看到的参考资料表明这种技术可能并不总是有效,因为不能保证任何其他被阻塞的线程会在收益期间被唤醒。

队列用于将多个线程的输出集中到单个流中。喂给它的线程很容易比队列中的槽多得多,因此队列很可能已满,并且当队列关闭时有几个线程阻塞在队列上。

我欢迎你的想法。

添加

感谢下面 Tom 的建议,我已重构为:

  • 保存可能阻塞的所有线程的集合。
  • 关闭时,中断所有这些。

顺便说一句:由于线程集合主要用于添加对象并几乎立即删除相同的对象,因此我从 http://www.java2s.com/Code/Java/Collections-Data-Structure/ConcurrentDoublyLinkedList.htm 并添加了一些方法来允许我保留添加的节点。删除的时间应该是 O(1) 而不是 O(n)。

保罗

I have created a CloseableBlockingQueue extending ArrayBlockingQueue:

private static class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> {
// Flag indicates closed state.
private volatile boolean closed = false;

public CloseableBlockingQueue(int queueLength) {
  super(queueLength);
}

/***
 * Shortcut to do nothing if closed.
 */
@Override
public boolean offer(E e) {
  return closed ? true : super.offer(e);
}

/***
 * Shortcut to do nothing if closed.
 */
@Override
public void put(E e) throws InterruptedException {
  if (!closed) {
    super.put(e);
  }
}

/***
 * Shortcut to do nothing if closed.
 */
@Override
public E poll() {
  return closed ? null : super.poll();
}

/***
 * Shortcut to do nothing if closed.
 * @throws InterruptedException 
 */
@Override
public E poll(long l, TimeUnit tu) throws InterruptedException {
  return closed ? null : super.poll(l, tu);
}

/***
 * Mark me as closed and clear the queue.
 */
void close() {
  closed = true;
  // There could be more threads blocking on my queue than there are slots 
  // in the queue. We therefore MUST loop.
  do {
    // so keep clearing
    clear();
    /* Let others in ... more specifically, any collectors blocked on the 
     * queue should now unblock and finish their put.
     * 
     * Subsequent puts should shortcut but it is the waiting ones I need
     * to clear.
     */
    Thread.yield();
    /* Debug code.
    // Did yield achieve?
    if (!super.isEmpty()) {
     * This DOES report success.
      log("! Thread.yield() successful");
    }
     * 
     */
    // Until all are done.
  } while (!super.isEmpty());
}

/***
 * isClosed
 * 
 * @return true if closed.
 */
boolean isClosed() {
  return closed;
}
}

My concern is with the close method where it is trying to kick back into life any threads that are blocked on the queue. I use Thread.yield() to attempt that but I have seen references that suggest that this technique may not always work because there is no guarantee that any other blocked threads will be woken up during a yield.

The queue is used to concentrate the output of multiple threads into a single stream. There can easily be many more threads feeding it than there are slots in the queue so it is quite possible for the queue to be full AND several threads are blocking on it when it is closed.

I welcome your thoughts.

Added

Thanks to Tom's suggestion below I have refactored to:

  • Hold a collection of all threads that may be blocking.
  • On close, interrupt all of them.

BTW: Since the thread collection is used mostly for adding an object and almost immediately removing the same object I took a copy of Doug Lea's impressive ConcurrentDoublyLinkedList from http://www.java2s.com/Code/Java/Collections-Data-Structure/ConcurrentDoublyLinkedList.htm and added a couple of methods to allow me to keep hold of the added node. Removal should then be O(1) instead of O(n).

Paul

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

绝不服输 2024-12-18 17:21:32

我认为yield()根本不会影响队列上阻塞的线程。

如果您可以跟踪等待线程(考虑到您正在包装阻塞方法,应该很简单)。当你关闭时你可以调用它们的interrupt()。

请参阅此问题: ArrayBlockingQueue - 如何“中断”正在等待 .take() 方法的线程

I don't think yield() would affect the threads blocked on the queue at all.

If you can keep track of the waiting threads (should be straightforward given you're wrapping the blocking methods). You could call interrupt() on them when you close.

See this question: ArrayBlockingQueue - How to "interrupt" a thread that is wating on .take() method

鸢与 2024-12-18 17:21:32

使用 wait/notifyAll() 或最好使用 util.concurrent 包中的同步原语之一(例如 CountDownLatchCountDownLatch< /代码>。将一个对象放在队列末尾,处理时会触发通知。让调用线程(close 方法的)等待此通知。它将休眠直到队列被清空。

Instead of the yield-while-check loop, use wait/notifyAll() or preferably one of the synchronization primitives from the util.concurrent package such as CountDownLatch. Put an object at the end of the queue that, when processed, triggers the notification. Make the calling thread (of the close method) await this notification. It will sleep until the queue has been drained.

蓝天白云 2024-12-18 17:21:32

我会把一颗毒丸放入队列中。例如空。
当等待的线程获得药片时,它会将其放回队列中。

E pill = null;

void close() {
   closed = true;
   clear();
   while(!offer(pill)); // will wake one thread.
}

public E poll() {
   if (closed) return null;
   E e = super.poll();
   if (e == pill) add(e); // will wake one thread.
   return e;
}

I would place a poison pill into the queue. e.g. null.
When a waiting thread gets the pill, it places it back in the queue.

E pill = null;

void close() {
   closed = true;
   clear();
   while(!offer(pill)); // will wake one thread.
}

public E poll() {
   if (closed) return null;
   E e = super.poll();
   if (e == pill) add(e); // will wake one thread.
   return e;
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文