返回介绍

13. 同步队列 SynchronousQueue

发布于 2024-09-08 13:17:47 字数 4002 浏览 0 评论 0 收藏 0

  • SynchronousQueue 内部没有容量,但是由于一个插入操作总是对应一个移除操作,反过来同样需要满足那么一个元素就不会再 SynchronousQueue 里面长时间停留,一旦有了插入线程和移除线程,元素很快就从插入线程移交给移除线程。也就是说这更像是一种信道(管道),资源从一个方向快速传递到另一方 向。显然这是一种快速传递元素的方式, 这种情况下元素总是以最快的方式从插入着(生产者)传递给移除着(消费者),这在多任务队列中是最快处理任务的方式
  • 因为没有容量,所以对应 peek, contains, clear, isEmpty ... 等方法其实是无效的。例如 clear 是不执行任何操作的,contains 始终返回 false,peek 始终返回 null。
  • SynchronousQueue 分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为 true 即可)。
  • 在线程池里的一个典型应用是 Executors.newCachedThreadPool() 就使用了 SynchronousQueue, 这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了 60 秒后会被回收。

方法

iterator() 永远返回空,因为里面没东西。 
peek() 永远返回 null。 
put() 往 queue 放进去一个 element 以后就一直 wait 直到有其他 thread 进来把这个 element 取走。 
offer() 往 queue 里放一个 element 后立即返回,如果碰巧这个 element 被另一个 thread 取走了,offer 方法返回 true,认为 offer 成功;否则返回 false。 
offer(2000, TimeUnit.SECONDS) 往 queue 里放一个 element 但是等待指定的时间后才返回,返回的逻辑和 offer() 方法一样。 
take() 取出并且 remove 掉 queue 里的 element(认为是在 queue 里的。。。),取不到东西他会一直等。 
poll() 取出并且 remove 掉 queue 里的 element(认为是在 queue 里的。。。),只有到碰巧另外一个线程正在往 queue 里 offer 数据或者 put 数据的时候,该方法才会取到东西。否则立即返回 null。 
poll(2000, TimeUnit.SECONDS) 等待指定的时间然后取出并且 remove 掉 queue 里的 element,其实就是再等其他的 thread 来往里塞。 
isEmpty() 永远是 true。 
remainingCapacity() 永远是 0。 
remove() 和 removeAll() 永远是 false。 

使用示例

public class SynchronousQueueTest {
 
    public static void main(String[] args) throws InterruptedException {
         final SynchronousQueue<String> queue = new SynchronousQueue<String>(true);
         //put 线程
          ExecutorService exec=Executors.newCachedThreadPool();
          exec.execute(new Runnable() {
            @Override
            public void run(){
                for (int i = 0; i < 5; i++) {
                    try {
                         System.out.println(Thread.currentThread().getName()+"   put 开始");
                          queue.put("put:"+i);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }
            }
        });
          exec.shutdown();
          Thread.sleep(1000);
          //消费线程
          ExecutorService exec2=Executors.newCachedThreadPool();
          exec2.execute(new Runnable() {
            @Override
            public void run(){
                for (int i = 0; i < 5; i++) {
                    try {       
                          System.out.println(Thread.currentThread().getName()+"  take 消费开始");
                          System.out.println(Thread.currentThread().getName() +"  take 值: : " +queue.take());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                     System.out.println(Thread.currentThread().getName()+"  take 消费结束");
                }
            }
        });
          exec2.shutdown();
          
    }

}

原理解析

与其他 BlockingQueue 一样,SynchronousQueue 同样继承 AbstractQueue 和实现 BlockingQueue 接口:

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable

SynchronousQueue 提供了两个构造函数:

 public SynchronousQueue() {
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        // 通过 fair 值来决定公平性和非公平性
        // 公平性使用 TransferQueue,非公平性采用 TransferStack
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

TransferQueue、TransferStack 继承 Transferer,Transferer 为 SynchronousQueue 的内部类,它提供了一个方法 transfer(),该方法定义了转移数据的规范,如下:

 abstract static class Transferer<E> {
        abstract E transfer(E e, boolean timed, long nanos);
    }

transfer() 方法主要用来完成转移数据的,如果 e != null,相当于将一个数据交给消费者,如果 e == null,则相当于从一个生产者接收一个消费者交出的数据。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文