文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
13. 同步队列 SynchronousQueue
- SynchronousQueue 内部没有容量,但是由于一个插入操作总是对应一个移除操作,反过来同样需要满足那么一个元素就不会再 SynchronousQueue 里面长时间停留,一旦有了插入线程和移除线程,元素很快就从插入线程移交给移除线程。也就是说这更像是一种信道(管道),资源从一个方向快速传递到另一方 向。显然这是一种快速传递元素的方式, 这种情况下元素总是以最快的方式从插入着(生产者)传递给移除着(消费者),这在多任务队列中是最快处理任务的方式
- 因为没有容量,所以对应 peek, contains, clear, isEmpty ... 等方法其实是无效的。例如 clear 是不执行任何操作的,contains 始终返回 false,peek 始终返回 null。
- SynchronousQueue 分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为 true 即可)。
- 在线程池里的一个典型应用是 Executors.newCachedThreadPool() 就使用了 SynchronousQueue, 这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了 60 秒后会被回收。
方法
iterator() 永远返回空,因为里面没东西。
peek() 永远返回 null。
put() 往 queue 放进去一个 element 以后就一直 wait 直到有其他 thread 进来把这个 element 取走。
offer() 往 queue 里放一个 element 后立即返回,如果碰巧这个 element 被另一个 thread 取走了,offer 方法返回 true,认为 offer 成功;否则返回 false。
offer(2000, TimeUnit.SECONDS) 往 queue 里放一个 element 但是等待指定的时间后才返回,返回的逻辑和 offer() 方法一样。
take() 取出并且 remove 掉 queue 里的 element(认为是在 queue 里的。。。),取不到东西他会一直等。
poll() 取出并且 remove 掉 queue 里的 element(认为是在 queue 里的。。。),只有到碰巧另外一个线程正在往 queue 里 offer 数据或者 put 数据的时候,该方法才会取到东西。否则立即返回 null。
poll(2000, TimeUnit.SECONDS) 等待指定的时间然后取出并且 remove 掉 queue 里的 element,其实就是再等其他的 thread 来往里塞。
isEmpty() 永远是 true。
remainingCapacity() 永远是 0。
remove() 和 removeAll() 永远是 false。
使用示例
public class SynchronousQueueTest {
public static void main(String[] args) throws InterruptedException {
final SynchronousQueue<String> queue = new SynchronousQueue<String>(true);
//put 线程
ExecutorService exec=Executors.newCachedThreadPool();
exec.execute(new Runnable() {
@Override
public void run(){
for (int i = 0; i < 5; i++) {
try {
System.out.println(Thread.currentThread().getName()+" put 开始");
queue.put("put:"+i);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
exec.shutdown();
Thread.sleep(1000);
//消费线程
ExecutorService exec2=Executors.newCachedThreadPool();
exec2.execute(new Runnable() {
@Override
public void run(){
for (int i = 0; i < 5; i++) {
try {
System.out.println(Thread.currentThread().getName()+" take 消费开始");
System.out.println(Thread.currentThread().getName() +" take 值: : " +queue.take());
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" take 消费结束");
}
}
});
exec2.shutdown();
}
}
原理解析
与其他 BlockingQueue 一样,SynchronousQueue 同样继承 AbstractQueue 和实现 BlockingQueue 接口:
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
SynchronousQueue 提供了两个构造函数:
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
// 通过 fair 值来决定公平性和非公平性
// 公平性使用 TransferQueue,非公平性采用 TransferStack
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
TransferQueue、TransferStack 继承 Transferer,Transferer 为 SynchronousQueue 的内部类,它提供了一个方法 transfer(),该方法定义了转移数据的规范,如下:
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
transfer() 方法主要用来完成转移数据的,如果 e != null,相当于将一个数据交给消费者,如果 e == null,则相当于从一个生产者接收一个消费者交出的数据。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论