Executors 介绍和使用
Executors 是一个工厂,里面定义了很多的静态方法。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
Executors
创建具体线程的实例
1. newFixedThreadPool 保持固定的活跃线程的数量
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
//线程池的饱和策略
System.out.println(service.getRejectedExecutionHandler());
for (int i = 0; i < 10; i++) {
service.execute(new Task(i));
System.out.println("ActiveCount: " + service.getActiveCount());
System.out.println("CompletedTaskCount: " + service.getCompletedTaskCount());
System.out.println("QueueSize: " + service.getQueue().size());
System.out.println("*****************************************");
}
}
static class Task implements Runnable {
int id;
public Task(int id) {
this.id = id;
}
@Override
public void run() {
try {
Thread.sleep(50L);
System.out.println("完成:" + id);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果:
java.util.concurrent.ThreadPoolExecutor$AbortPolicy@568db2f2
ActiveCount: 1
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 1
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 2
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 3
*****************************************
完成:1
完成:0
完成:3
完成:2
完成:4
结论:
- 它是一个固定大小的线程池,线程数量始终维持在 nThread。
- 阻塞队列采用了 LinkedBlockingQueue,它是一个无界队列。
- 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务。
- 由于采用了无界队列,实际线程数量将永远维持在 nThreads,因此 maximumPoolSize 和 keepAliveTime 将无效。
- 活跃线程的数量不超过初始值。
- 当活跃线程达到初始值时,新的任务会放到 LinkedBlockingDeque 中,LinkedBlockingDeque 并未初始化大小,所以是无界的队列。
- 核心线程数等于最大线程数。
- newFixedThreadPool 适用于线程数量基本稳定的情况,当线程数量不稳定时,比如大量的线程会不断地进入队列中,导致内存空间被占用。(此处引用一次 血案 )
设想当我们固定 LinkedBlockingDeque 的长度时,会发生什么变化
LinkedBlockingQueue
LinkedBlockingQueue 在入队列和出队列时使用的是不同的 Lock,意味着他们之间不存在互斥关系,在多 CPU 情况下,他们能正在在同一时刻既消费,又生产,真正做到并行。
现在回过头来思考 FixedThreadPool,前面说过他的特点在于这种线程池的线程个数是固定的,而且阻塞队列是可以存储任务的,因此这种线程池不会拒绝任务,而且不会开辟新的线程,也不会因为线程的长时间不使用而销毁线程。这是典型的生产者----消费者问题,这种线程池适合用在稳定且固定的并发场景,
//创建一个固定线程的线程池
ThreadPoolExecutor service = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(1));
//饱和策略
System.out.println(service.getRejectedExecutionHandler());
for (int i = 0; i < 5; i++) {
service.execute(new Task(i));
System.out.println("ActiveCount: " + service.getActiveCount());
System.out.println("CompletedTaskCount: " + service.getCompletedTaskCount());
System.out.println("QueueSize: " + service.getQueue().size());
System.out.println("*****************************************");
}
}
结果:
java.util.concurrent.ThreadPoolExecutor$AbortPolicy@568db2f2
ActiveCount: 1
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 1
*****************************************
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.boot.example.guava.OptionT$Task@2d98a335 rejected from java.util.concurrent.ThreadPoolExecutor@16b98e56[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.boot.example.guava.OptionT.main(OptionT.java:31)
完成:0
完成:1
完成:2
通过上面的输出结果,我们发现当队列已经满时采用 AbortPolicy 这种饱和策略会抛出异常信息!
2. CachedThreadPool
public static void main(String[] args) {
//创建一个固定线程的线程池
ThreadPoolExecutor service = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30L, TimeUnit.SECONDS,
new SynchronousQueue<>());
//线程池的饱和策略
System.out.println(service.getRejectedExecutionHandler());
for (int i = 0; i < 10; i++) {
service.execute(new Task(i));
System.out.println("ActiveCount: " + service.getActiveCount());
System.out.println("CompletedTaskCount: " + service.getCompletedTaskCount());
System.out.println("QueueSize: " + service.getQueue().size());
System.out.println("*****************************************");
}
}
java.util.concurrent.ThreadPoolExecutor$AbortPolicy@378bf509
ActiveCount: 1
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 3
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 4
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 5
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 6
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 7
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 8
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 9
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 10
CompletedTaskCount: 0
QueueSize: 0
*****************************************
完成:2
完成:1
完成:0
完成:9
完成:8
完成:7
完成:6
完成:5
完成:4
完成:3
结论:
- 它是一个可以无限扩大的线程池,不断的创建的新的线程;
- 使用 SynchronousQueue,它是一个没有容量的队列,入队操作必须要等到另一个线程移出。所以,如果主线程提交任务的速度高于线程池中处理任务的速度时,会创建大量的新的线程。
- 适合耗时较小的任务。
- corePoolSize 为 0,maximumPoolSize 为无限大,意味着线程数量可以无限大;
对比 CachedThreadPool、FixedThreadPool
- FixedThreadPool 设置了核心线程数和最大线程数并且使用 LinkedBlockingDeque(无界的队列),它的特点是:活跃的线程数量不超过设置值,新的任务不断的加入队列,可能会导致 LinkedBlockingDeque 的容量不断增大。corePoolSize 和 maximumPoolSize 的大小是一样的。
- CachedThreadPool 可以无限的创建新的线程,使用 SynchronousQueue 没有容量的阻塞队列,新的任务提交时,需要等到 SynchronousQueue 中的线程移出,当没有可用线程时,则会创建新的线程。当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是 60 秒无执行)的线程。首先是无界的线程池,所以我们可以发现 maximumPoolSize 为 big big
为什么要用线程池
1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约 1MB 内存,线程开的越多,消耗的内存也就越大,最后死机)。
线程池主要参数
- corePoolSize - 池中所保存的线程数,包括空闲线程。
- maximumPoolSize-池中允许的最大线程数。
- keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
- unit - keepAliveTime 参数的时间单位。
- workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(如果当前运行的线程小于 corePoolSize,则任务根本不会存放,添加到 queue 中,而是直接抄家伙(thread)开始运行)
如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
任务拒绝策略
- 直接丢弃(DiscardPolicy)
- 丢弃队列中最老的任务(DiscardOldestPolicy)。
- 抛异常(AbortPolicy)
- 将任务分给调用线程来执行(CallerRunsPolicy)。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
下一篇: 不要相信一个熬夜的人说的每一句话
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论