Executors 介绍和使用

发布于 2024-10-02 17:10:50 字数 8380 浏览 15 评论 0

Executors 是一个工厂,里面定义了很多的静态方法。

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

Executors

创建具体线程的实例

1. newFixedThreadPool 保持固定的活跃线程的数量

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);

//线程池的饱和策略
System.out.println(service.getRejectedExecutionHandler());

for (int i = 0; i < 10; i++) {
    service.execute(new Task(i));
    System.out.println("ActiveCount: " + service.getActiveCount());
    System.out.println("CompletedTaskCount: " + service.getCompletedTaskCount());
    System.out.println("QueueSize: " + service.getQueue().size());
    System.out.println("*****************************************");
}
}

static class Task implements Runnable {

int id;

public Task(int id) {
    this.id = id;
}

@Override
public void run() {
    try {
        Thread.sleep(50L);
        System.out.println("完成:" + id);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
}

结果:

java.util.concurrent.ThreadPoolExecutor$AbortPolicy@568db2f2
ActiveCount: 1
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 1
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 2
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 3
*****************************************
完成:1
完成:0
完成:3
完成:2
完成:4

结论:

  • 它是一个固定大小的线程池,线程数量始终维持在 nThread。
  • 阻塞队列采用了 LinkedBlockingQueue,它是一个无界队列。
  • 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务。
  • 由于采用了无界队列,实际线程数量将永远维持在 nThreads,因此 maximumPoolSize 和 keepAliveTime 将无效。
  • 活跃线程的数量不超过初始值。
  • 当活跃线程达到初始值时,新的任务会放到 LinkedBlockingDeque 中,LinkedBlockingDeque 并未初始化大小,所以是无界的队列。
  • 核心线程数等于最大线程数。
  • newFixedThreadPool 适用于线程数量基本稳定的情况,当线程数量不稳定时,比如大量的线程会不断地进入队列中,导致内存空间被占用。(此处引用一次 血案

设想当我们固定 LinkedBlockingDeque 的长度时,会发生什么变化

LinkedBlockingQueue

LinkedBlockingQueue 在入队列和出队列时使用的是不同的 Lock,意味着他们之间不存在互斥关系,在多 CPU 情况下,他们能正在在同一时刻既消费,又生产,真正做到并行。

现在回过头来思考 FixedThreadPool,前面说过他的特点在于这种线程池的线程个数是固定的,而且阻塞队列是可以存储任务的,因此这种线程池不会拒绝任务,而且不会开辟新的线程,也不会因为线程的长时间不使用而销毁线程。这是典型的生产者----消费者问题,这种线程池适合用在稳定且固定的并发场景,

//创建一个固定线程的线程池
ThreadPoolExecutor service = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(1));
//饱和策略
System.out.println(service.getRejectedExecutionHandler());

for (int i = 0; i < 5; i++) {
    service.execute(new Task(i));
    System.out.println("ActiveCount: " + service.getActiveCount());
    System.out.println("CompletedTaskCount: " + service.getCompletedTaskCount());
    System.out.println("QueueSize: " + service.getQueue().size());
    System.out.println("*****************************************");
}
}

结果:

java.util.concurrent.ThreadPoolExecutor$AbortPolicy@568db2f2
ActiveCount: 1
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 1
*****************************************
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.boot.example.guava.OptionT$Task@2d98a335 rejected from java.util.concurrent.ThreadPoolExecutor@16b98e56[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.boot.example.guava.OptionT.main(OptionT.java:31)
完成:0
完成:1
完成:2

通过上面的输出结果,我们发现当队列已经满时采用 AbortPolicy 这种饱和策略会抛出异常信息!

2. CachedThreadPool

public static void main(String[] args) {
//创建一个固定线程的线程池
ThreadPoolExecutor service =  new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30L, TimeUnit.SECONDS,
        new SynchronousQueue<>());

//线程池的饱和策略
System.out.println(service.getRejectedExecutionHandler());


for (int i = 0; i < 10; i++) {
    service.execute(new Task(i));
    System.out.println("ActiveCount: " + service.getActiveCount());
    System.out.println("CompletedTaskCount: " + service.getCompletedTaskCount());
    System.out.println("QueueSize: " + service.getQueue().size());
    System.out.println("*****************************************");
}
}
java.util.concurrent.ThreadPoolExecutor$AbortPolicy@378bf509
ActiveCount: 1
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 3
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 4
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 5
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 6
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 7
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 8
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 9
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 10
CompletedTaskCount: 0
QueueSize: 0
*****************************************
完成:2
完成:1
完成:0
完成:9
完成:8
完成:7
完成:6
完成:5
完成:4
完成:3

结论:

  • 它是一个可以无限扩大的线程池,不断的创建的新的线程;
  • 使用 SynchronousQueue,它是一个没有容量的队列,入队操作必须要等到另一个线程移出。所以,如果主线程提交任务的速度高于线程池中处理任务的速度时,会创建大量的新的线程。
  • 适合耗时较小的任务。
  • corePoolSize 为 0,maximumPoolSize 为无限大,意味着线程数量可以无限大;

对比 CachedThreadPool、FixedThreadPool

  • FixedThreadPool 设置了核心线程数和最大线程数并且使用 LinkedBlockingDeque(无界的队列),它的特点是:活跃的线程数量不超过设置值,新的任务不断的加入队列,可能会导致 LinkedBlockingDeque 的容量不断增大。corePoolSize 和 maximumPoolSize 的大小是一样的。
  • CachedThreadPool 可以无限的创建新的线程,使用 SynchronousQueue 没有容量的阻塞队列,新的任务提交时,需要等到 SynchronousQueue 中的线程移出,当没有可用线程时,则会创建新的线程。当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是 60 秒无执行)的线程。首先是无界的线程池,所以我们可以发现 maximumPoolSize 为 big big

为什么要用线程池

1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约 1MB 内存,线程开的越多,消耗的内存也就越大,最后死机)。

线程池主要参数

  • corePoolSize - 池中所保存的线程数,包括空闲线程。
  • maximumPoolSize-池中允许的最大线程数。
  • keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
  • unit - keepAliveTime 参数的时间单位。
  • workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。

如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(如果当前运行的线程小于 corePoolSize,则任务根本不会存放,添加到 queue 中,而是直接抄家伙(thread)开始运行)

如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

任务拒绝策略

  • 直接丢弃(DiscardPolicy)
  • 丢弃队列中最老的任务(DiscardOldestPolicy)。
  • 抛异常(AbortPolicy)
  • 将任务分给调用线程来执行(CallerRunsPolicy)。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

独木成林

暂无简介

0 文章
0 评论
23 人气
更多

推荐作者

Gabu-gabumon

文章 0 评论 0

qq_CgiN62

文章 0 评论 0

荔枝明

文章 0 评论 0

¤→小豸慧

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文