返回介绍

3.2 线程复用:线程池

发布于 2024-08-21 22:20:21 字数 29492 浏览 0 评论 0 收藏 0

多线程的软件设计方法确实可以最大限度地发挥现代多核处理器的计算能力,提高生产系统的吞吐量和性能。但是,若不加控制和管理的随意使用线程,对系统的性能反而会产生不利的影响。

一种最为简单的线程创建和回收的方法类似如下代码:

new Thread(new Runnable(){
  @Override
  public void run() {
    //do sth.
  }
}).start();

以上代码创建了一个线程,并在run()方法结束后,自动回收该线程。在简单的应用系统中,这段代码并没有太多问题。但是在真实的生产环境中,系统由于真实环境的需要,可能会开启很多线程来支撑其应用。而当线程数量过大时,反而会耗尽CPU和内存资源。

首先,虽然与进程相比,线程是一种轻量级的工具,但其创建和关闭依然需要花费时间,如果为每一个小的任务都创建一个线程,很有可能出现创建和销毁线程所占用的时间大于该线程真实工作所消耗的时间的情况,反而会得不偿失。

其次,线程本身也是要占用内存空间的,大量的线程会抢占宝贵的内存资源,如果处理不当,可能会导致Out of Memory异常。即便没有,大量的线程回收也会给GC带来很大的压力,延长GC的停顿时间。

因此,对线程的使用必须掌握一个度,在有限的范围内,增加线程的数量可以明显提高系统的吞吐量,但一旦超出了这个范围,大量的线程只会拖垮应用系统。因此,在生产环境中使用线程,必须对其加以控制和管理。

注意:在实际生产环境中,线程的数量必须得到控制。盲目的大量创建线程对系统性能是有伤害的。

3.2.1 什么是线程池

为了避免系统频繁地创建和销毁线程,我们可以让创建的线程进行复用。如果大家进行过数据库开发,对数据库连接池应该不会陌生。为了避免每次数据库查询都重新建立和销毁数据库连接,我们可以使用数据库连接池维护一些数据库连接,让他们长期保持在一个激活状态。当系统需要使用数据库时,并不是创建一个新的连接,而是从连接池中获得一个可用的连接即可。反之,当需要关闭连接时,并不真的把连接关闭,而是将这个连接“还”给连接池即可。通过这种方式,可以节约不少创建和销毁对象的时间。

线程池也是类似的概念。线程池中,总有那么几个活跃线程。当你需要使用线程时,可以从池子中随便拿一个空闲线程,当完成工作时,并不急着关闭线程,而是将这个线程退回到池子,方便其他人使用。

简而言之,在使用线程池后,创建线程变成了从线程池获得空闲线程,关闭线程变成了向池子归还线程,如图3.3所示。

图3-3 线程池的作用

3.2.2 不要重复发明轮子:JDK对线程池的支持

为了能够更好地控制多线程,JDK提供了一套Executor框架,帮助开发人员有效地进行线程控制,其本质就是一个线程池。它的核心成员如图3.4所示。

图3-4 Executor框架结构图

以上成员均在java.util.concurrent包中,是JDK并发包的核心类。其中ThreadPoolExecutor表示一个线程池。Executors类则扮演着线程池工厂的角色,通过Executors可以取得一个拥有特定功能的线程池。从UML图中亦可知,ThreadPoolExecutor类实现了Executor接口,因此通过这个接口,任何Runnable的对象都可以被ThreadPoolExecutor线程池调度。

Executor框架提供了各种类型的线程池,主要有以下工厂方法:

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

以上工厂方法分别返回具有不同工作特性的线程池。这些线程池工厂方法的具体说明如下。

· newFixedThreadPool()方法:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。

· newSingleThreadExecutor()方法:该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。

· newCachedThreadPool()方法:该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

· newSingleThreadScheduledExecutor()方法:该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能,如在某个固定的延时之后执行,或者周期性执行某个任务。

· newScheduledThreadPool()方法:该方法也返回一个ScheduledExecutorService对象,但该线程池可以指定线程数量。

1. 固定大小的线程池

这里,我们以newFixedThreadPool()为例,简单地展示线程池的使用:

01 public class ThreadPoolDemo {
02   public static class MyTask implements Runnable {
03     @Override
04     public void run() {
05       System.out.println(System.currentTimeMillis() + ":Thread ID:"
06           + Thread.currentThread().getId());
07       try {
08         Thread.sleep(1000);
09       } catch (InterruptedException e) {
10         e.printStackTrace();
11       }
12     }
13   }
14
15   public static void main(String[] args) {
16     MyTask task = new MyTask();
17     ExecutorService es = Executors.newFixedThreadPool(5);
18     for (int i = 0; i < 10; i++) {
19       es.submit(task);
20     }
21   }
22 }

上述代码中,第17行创建了固定大小的线程池,内有5个线程。在第19行,依次向线程池提交了10个任务。此后,线程池就会安排调度这10个任务。每个任务都会将自己的执行时间和执行这个线程的ID打印出来,并且在这里,安排每个任务要执行1秒钟。

执行上述代码,可以得到类似以下输出:

1426510293450:Thread ID:8
1426510293450:Thread ID:9
1426510293450:Thread ID:12
1426510293450:Thread ID:10
1426510293450:Thread ID:11
1426510294450:Thread ID:12
1426510294450:Thread ID:11
1426510294450:Thread ID:8
1426510294450:Thread ID:10
1426510294450:Thread ID:9

这个输出就表示这10个线程的执行情况。很显然,前5个任务和后5个任务的执行时间正好相差1秒钟(注意时间戳的单位是毫秒),并且前5个任务的线程ID和后5个任务也是完全一致的(都是8、9、10、11、12)。这说明在这10个任务中,是分成2批次执行的。这也完全符合一个只有5个线程的线程池的行为。

有兴趣的读者可以将其改造成newCachedThreadPool(),看看任务的分配情况会有何变化?

2. 计划任务

另外一个值得注意的方法是newScheduledThreadPool()。它返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度。它的一些主要方法如下:

public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                         long initialDelay,
                         long period,
                         TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                          long initialDelay,
                          long delay,
                          TimeUnit unit);

与其他几个线程池不同,ScheduledExecutorService并不一定会立即安排执行任务。它其实是起到了计划任务的作用。它会在指定的时间,对任务进行调度。如果大家使用过Linux下的crontab工具应该就能很容易地理解它了。

作为说明,这里给出了三个方法。方法schedule()会在给定时间,对任务进行一次调度。方法scheduleAtFixedRate()和scheduleWithFixedDelay()会对任务进行周期性的调度。但是两者有一点小小的区别,如图3.5所示。

图3.5 FixedRate和FixDelay区别

对于FixedRate方式来说,任务调度的频率是一定的。它是以上一个任务开始执行时间为起点,之后的period时间,调度下一次任务。而FixDelay则是在上一个任务结束后,再经过delay时间进行任务调度。

由于担心我的解释不够周全,我也很乐意将官方文档中的描述贴出来供大家参考,从而可以更精确地理解两者的差别:

· scheduleAtFixedRate

o Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and so on.

o 翻译:创建一个周期性任务。任务开始于给定的初始延时。后续的任务按照给定的周期进行:后续第一个任务将会在initialDelay+period时执行,后续第二个任务将在initialDelay+2*period时进行,依此类推。

· scheduleWithFixedDelay

o Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next.

o 翻译:创建并执行一个周期性任务。任务开始于初始延时时间,后续任务将会按照给定的延时进行,即上一个任务的结束时间到下一个任务的开始时间的时间差。

下面的例子使用scheduleAtFixedRate()方法调度一个任务。这个任务会执行1秒钟时间,调度周期是2秒。也就是说每2秒钟,任务就会被执行一次。

01 public class ScheduledExecutorServiceDemo {
02   public static void main(String[] args) {
03     ScheduledExecutorService ses=Executors.newScheduledThreadPool(10);
04     //如果前面的任务没有完成,则调度也不会启动
05     ses.scheduleAtFixedRate(new Runnable() {
06       @Override
07       public void run() {
08         try {
09           Thread.sleep(1000);
10           System.out.println(System.currentTimeMillis()/1000);
11         } catch (InterruptedException e) {
12           e.printStackTrace();
13         }
14       }
15     }, 0, 2, TimeUnit.SECONDS);
16   }
17 }

执行上述代码,一种输出的可能如下:

1426515345
1426515347
1426515349
1426515351

上述输出的单位是秒。可以看到,时间间隔是2秒。

这里还想说一个有意思的事情,如果任务的执行时间超过调度时间,会发生什么情况呢?比如,这里调度周期是2秒,如果任务的执行时间是8秒,是不是会出现多个任务堆叠在一起呢?

实际上,ScheduledExecutorService不会让任务堆叠出现。我们将第9行的代码改为:

Thread.sleep(8000);

再执行上述代码,你就会发现任务的执行周期不再是2秒,而是变成了8秒。如下所示,是一种可能的结果。

1426516323
1426516331
1426516339
1426516347
1426516355

也就是说,周期如果太短,那么任务就会在上一个任务结束后,立即被调用。可以想象,如果采用scheduleWithFixedDelay(),并且按照修改8秒,调度周期2秒计,那么任务的实际间隔将是10秒,大家可以自行尝试。

另外一个值得注意的问题是,调度程序实际上并不保证任务会无限期的持续调用。如果任务本身抛出了异常,那么后续的所有执行都会被中断,因此,如果你想让你的任务持续稳定的执行,那么做好异常处理就非常重要,否则,你很有可能观察到你的调度器无疾而终。

注意:如果任务遇到异常,那么后续的所有子任务都会停止调度,因此,必须保证异常被及时处理,为周期性任务的稳定调度提供条件。

3.2.3 刨根究底:核心线程池的内部实现

对于核心的几个线程池,无论是newFixedThreadPool()方法、newSingleThreadExecutor()还是newCachedThreadPool()方法,虽然看起来创建的线程有着完全不同的功能特点,但其内部实现均使用了ThreadPoolExecutor实现。下面给出了这三个线程池的实现方式:

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                  0L, TimeUnit.MILLISECONDS,
                  new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                  60L, TimeUnit.SECONDS,
                  new SynchronousQueue<Runnable>());
}

由以上线程池的实现代码可以看到,它们都只是ThreadPoolExecutor类的封装。为何ThreadPoolExecutor有如此强大的功能呢?来看一下ThreadPoolExecutor最重要的构造函数:

public ThreadPoolExecutor(int corePoolSize,
              int maximumPoolSize,
              long keepAliveTime,
              TimeUnit unit,
              BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory,
              RejectedExecutionHandler handler)

函数的参数含义如下。

· corePoolSize:指定了线程池中的线程数量。

· maximumPoolSize:指定了线程池中的最大线程数量。

· keepAliveTime:当线程池线程数量超过corePoolSize时,多余的空闲线程的存活时间。即,超过corePoolSize的空闲线程,在多长时间内,会被销毁。

· unit:keepAliveTime的单位。

· workQueue:任务队列,被提交但尚未被执行的任务。

· threadFactory:线程工厂,用于创建线程,一般用默认的即可。

· handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。

以上参数中,大部分都很简单,只有workQueue和handler需要进行详细说明。

参数workQueue指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象。根据队列功能分类,在ThreadPoolExecutor的构造函数中可使用以下几种BlockingQueue。

· 直接提交的队列:该功能由SynchronousQueue对象提供。SynchronousQueue是一个特殊的BlockingQueue。SynchronousQueue没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。如果使用SynchronousQueue,提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲的进程,则尝试创建新的进程,如果进程数量已经达到最大值,则执行拒绝策略。因此,使用SynchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。

· 有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现。ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量,如下所示。

public ArrayBlockingQueue(int capacity)

当使用有界的任务队列时,若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会优先创建新的线程,若大于corePoolSize,则会将新任务加入等待队列。若等待队列已满,无法加入,则在总线程数不大于maximumPoolSize的前提下,创建新的进程执行任务。若大于maximumPoolSize,则执行拒绝策略。可见,有界队列仅当在任务队列装满时,才可能将线程数提升到corePoolSize以上,换言之,除非系统非常繁忙,否则确保核心线程数维持在在corePoolSize。

· 无界的任务队列:无界任务队列可以通过LinkedBlockingQueue类实现。与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就不会继续增加。若后续仍有新的任务加入,而又没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。

· 优先任务队列:优先任务队列是带有执行优先级的队列。它通过PriorityBlockingQueue实现,可以控制任务的执行先后顺序。它是一个特殊的无界队列。无论是有界队列ArrayBlockingQueue,还是未指定大小的无界队列LinkedBlockingQueue都是按照先进先出算法处理任务的。而PriorityBlockingQueue则可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,也能有很好的质量保证(总是确保高优先级的任务先执行)。

回顾newFixedThreadPool()方法的实现。它返回了一个corePoolSize和maximumPoolSize大小一样的,并且使用了LinkedBlockingQueue任务队列的线程池。因为对于固定大小的线程池而言,不存在线程数量的动态变化,因此corePoolSize和maximumPoolSize可以相等。同时,它使用无界队列存放无法立即执行的任务,当任务提交非常频繁的时候,该队列可能迅速膨胀,从而耗尽系统资源。

newSingleThreadExecutor()返回的单线程线程池,是newFixedThreadPool()方法的一种退化,只是简单的将线程池线程数量设置为1。

newCachedThreadPool()方法返回corePoolSize为0,maximumPoolSize无穷大的线程池,这意味着在没有任务时,该线程池内无线程,而当任务被提交时,该线程池会使用空闲的线程执行任务,若无空闲线程,则将任务加入SynchronousQueue队列,而SynchronousQueue队列是一种直接提交的队列,它总会迫使线程池增加新的线程执行任务。当任务执行完毕后,由于corePoolSize为0,因此空闲线程又会在指定时间内(60秒)被回收。

对于newCachedThreadPool(),如果同时有大量任务被提交,而任务的执行又不那么快时,那么系统便会开启等量的线程处理,这样做法可能会很快耗尽系统的资源。

注意:使用自定义线程池时,要根据应用的具体情况,选择合适的并发队列作为任务的缓冲。当线程资源紧张时,不同的并发队列对系统行为和性能的影响均不同。

这里给出ThreadPoolExecutor线程池的核心调度代码,这段代码也充分体现了上述线程池的工作逻辑:

01 public void execute(Runnable command) {
02   if (command == null)
03     throw new NullPointerException();
04   int c = ctl.get();
05   if (workerCountOf(c) < corePoolSize) {
06     if (addWorker(command, true))
07       return;
08     c = ctl.get();
09   }
10   if (isRunning(c) && workQueue.offer(command)) {
11     int recheck = ctl.get();
12     if (! isRunning(recheck) && remove(command))
13       reject(command);
14     else if (workerCountOf(recheck) == 0)
15       addWorker(null, false);
16   }
17   else if (!addWorker(command, false))
18     reject(command);
19 }

代码第5行的workerCountOf()函数取得了当前线程池的线程总数。当线程总数小于corePoolSize核心线程数时,会将任务通过addWorker()方法直接调度执行。否则,则在第10行代码处(workQueue.offer())进入等待队列。如果进入等待队列失败(比如有界队列到达了上限,或者使用了SynchronousQueue),则会执行第17行,将任务直接提交给线程池。如果当前线程数已经达到maximumPoolSize,则提交失败,就执行第18行的拒绝策略。

调度逻辑可以总结为如图3.6所示。

图3.6 ThreadPoolExecutor的任务调度逻辑

3.2.4 超负载了怎么办:拒绝策略

ThreadPoolExecutor的最后一个参数指定了拒绝策略。也就是当任务数量超过系统实际承载能力时,该如何处理呢?这时就要用到拒绝策略了。拒绝策略可以说是系统超负荷运行时的补救措施,通常由于压力太大而引起的,也就是线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列中也已经排满了,再也塞不下新任务了。这时,我们就需要有一套机制,合理地处理这个问题。

JDK内置提供了四种拒绝策略,如图3.7所示。

图3.7 JDK内置的拒绝策略

JDK内置的拒绝策略如下。

· AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。

· CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。

· DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

· DiscardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,我觉得这可能是最好的一种方案了吧!

以上内置的策略均实现了RejectedExecutionHandler接口,若以上策略仍无法满足实际应用需要,完全可以自己扩展RejectedExecutionHandler接口。RejectedExecutionHandler的定义如下:

public interface RejectedExecutionHandler {
  void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

其中r为请求执行的任务,executor为当前的线程池。

下面的代码简单地演示了自定义线程池和拒绝策略的使用:

01 public class RejectThreadPoolDemo {
02   public static class MyTask implements Runnable {
03     @Override
04     public void run() {
05       System.out.println(System.currentTimeMillis() + ":Thread ID:"
06           + Thread.currentThread().getId());
07       try {
08         Thread.sleep(100);
09       } catch (InterruptedException e) {
10         e.printStackTrace();
11       }
12     }
13   }
14
15   public static void main(String[] args) throws InterruptedException  {
16     MyTask task = new MyTask();
17     ExecutorService es = new ThreadPoolExecutor(5, 5,
18         0L, TimeUnit.MILLISECONDS,
19         new LinkedBlockingQueue<Runnable>(10),
20         Executors.defaultThreadFactory(),
21         new RejectedExecutionHandler(){
22           @Override
23           public void rejectedExecution(Runnable r,
24               ThreadPoolExecutor executor) {
25             System.out.println(r.toString()+" is discard");
26           }
27     });
28     for (int i = 0; i < Integer.MAX_VALUE; i++) {
29       es.submit(task);
30      Thread.sleep(10);
31     }
32   }
33 }

上述代码的第17~27行自定义了一个线程池。该线程池有5个常驻线程,并且最大线程数量也是5个。这和固定大小的线程池是一样的。但是它却拥有一个只有10个容量的等待队列。因为使用无界队列很可能并不是最佳解决方案,如果任务量极大,很有可能会把内存撑爆。给出一个合理的队列大小,也是合乎常理的选择。同时,这里自定义了拒绝策略,我们不抛出异常,因为万一在任务提交端没有进行异常处理,则有可能使得整个系统都崩溃,这极有可能不是我们希望遇到的。但作为必要的信息记录,我们将任务丢弃的信息进行打印,当然,这只比内置的DiscardPolicy策略高级那么一点点。

由于在这个案例中,MyTask执行需要花费100毫秒,因此,必然会导致大量的任务被直接丢弃。执行上述代码,可能的部分输出如下:

1426597264669:Thread ID:11
1426597264679:Thread ID:12
java.util.concurrent.FutureTask@a57993 is discard
java.util.concurrent.FutureTask@1b84c92 is discard

可以看到,在执行几个任务后,拒绝策略就开始生效了。在实际应用中,我们可以将更详细的信息记录到日志中,来分析系统的负载和任务丢失的情况。

3.2.5 自定义线程创建:ThreadFactory

看了那么多有关线程池的介绍,不知道大家有没有思考过一个基本的问题:那就是线程池中的线程是从哪里来的呢?

之前我们介绍过,线程池的主要作用是为了线程复用,也就是避免了线程的频繁创建。但是,最开始的那些线程从何而来呢?答案就是ThreadFactory。

ThreadFactory是一个接口,它只有一个方法,用来创建线程:

Thread newThread(Runnable r);

当线程池需要新建线程时,就会调用这个方法。

自定义线程池可以帮助我们做不少事。比如,我们可以跟踪线程池究竟在何时创建了多少线程,也可以自定义线程的名称、组以及优先级等信息,甚至可以任性地将所有的线程设置为守护线程。总之,使用自定义线程池可以让我们更加自由地设置池子中所有线程的状态。下面的案例使用自定义的ThreadFactory,一方面记录了线程的创建,另一方面将所有的线程都设置为守护线程,这样,当主线程退出后,将会强制销毁线程池。

01 public static void main(String[] args) throws InterruptedException {
02   MyTask task = new MyTask();
03   ExecutorService es = new ThreadPoolExecutor(5, 5,
04       0L, TimeUnit.MILLISECONDS,
05       new SynchronousQueue<Runnable>(),
06       new ThreadFactory(){
07         @Override
08         public Thread newThread(Runnable r) {
09           Thread t= new Thread(r);
10           t.setDaemon(true);
11           System.out.println("create "+t);
12           return t;
13         }
14       }
15      );
16   for (int i = 0; i < 5; i++) {
17     es.submit(task);
18   }
19   Thread.sleep(2000);
20 }

3.2.6 我的应用我做主:扩展线程池

虽然JDK已经帮我们实现了这个稳定的高性能线程池。但如果我们需要对这个线程池做一些扩展,比如,我们想监控每个任务执行的开始和结束时间,或者其他一些自定义的增强功能,这时候应该怎么办呢?

一个好消息是:ThreadPoolExecutor也是一个可以扩展的线程池。它提供了beforeExecute()、afterExecute()和terminated()三个接口对线程池进行控制。

以beforeExecute()、afterExecute()为例,在ThreadPoolExecutor.Worker. runTask()方法内部提供了这样的实现:

boolean ran = false;
beforeExecute(thread, task);              //运行前
try {
  task.run();                     //运行任务
  ran = true;
  afterExecute(task, null);               //运行结束后
  ++completedTasks;
} catch (RuntimeException ex) {
  if (!ran)
    afterExecute(task, ex);             //运行结束
  throw ex;
}

ThreadPoolExecutor.Worker是ThreadPoolExecutor的内部类,它是一个实现了Runnable接口的类。ThreadPoolExecutor线程池中的工作线程也正是Worker实例。Worker.runTask()方法会被线程池以多线程模式异步调用,即Worker.runTask()会同时被多个线程访问。因此其beforeExecute()、afterExecute()接口也将同时多线程访问。

在默认的ThreadPoolExecutor实现中,提供了空的beforeExecute()和afterExecute()实现。在实际应用中,可以对其进行扩展来实现对线程池运行状态的跟踪,输出一些有用的调试信息,以帮助系统故障诊断,这对于多线程程序错误排查是很有帮助的。下面演示了对线程池的扩展,在这个扩展中,我们将记录每一个任务的执行日志。

01 public class ExtThreadPool {
02   public static class MyTask implements Runnable {
03     public String name;
04
05     public MyTask(String name) {
06       this.name = name;
07     }
08
09     @Override
10     public void run() {
11       System.out.println("正在执行" + ":Thread ID:" + Thread. currentThread().getId()
12           + ",Task Name=" + name);
13       try {
14         Thread.sleep(100);
15       } catch (InterruptedException e) {
16         e.printStackTrace();
17       }
18     }
19   }
20
21   public static void main(String[] args) throws InterruptedException {
22
23     ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
24         new LinkedBlockingQueue<Runnable>()) {
25       @Override
26       protected void beforeExecute(Thread t, Runnable r) {
27         System.out.println("准备执行:" + ((MyTask) r).name);
28       }
29
30       @Override
31       protected void afterExecute(Runnable r, Throwable t) {
32         System.out.println("执行完成:" + ((MyTask) r).name);
33       }
34
35       @Override
36       protected void terminated() {
37         System.out.println("线程池退出");
38       }
39
40     };
41     for (int i = 0; i < 5; i++) {
42       MyTask task = new MyTask("TASK-GEYM-" + i);
43       es.execute(task);
44       Thread.sleep(10);
45     }
46     es.shutdown();
47   }
48 }

上述代码在第23~40行,扩展了原有的线程池,实现了beforeExecute()、afterExecute()和terminiated()三个方法。这三个方法分别用于记录一个任务的开始、结束和整个线程池的退出。在第42~43行,向线程池提交5个任务,为了有更清晰的日志,我们为每个任务都取了一个不同的名字。第43行使用execute()方法提交任务,细心的读者一定发现,在之前代码中,我们都使用了submit()方法提交,有关两者的区别,我们将在“5.5节Future模式”中详细介绍。

在提交完成后,调用shutdown()方法关闭线程池。这是一个比较安全的方法,如果当前正有线程在执行,shutdown()方法并不会立即暴力地终止所有任务,它会等待所有任务执行完成后,再关闭线程池,但它并不会等待所有线程执行完成后再返回,因此,可以简单地理解成shutdown()只是发送了一个关闭信号而已。但在shutdown()方法执行后,这个线程池就不能再接受其他新的任务了。

执行上述代码,可以得到类似以下的输出:

准备执行:TASK-GEYM-0
正在执行:Thread ID:8,Task Name=TASK-GEYM-0
准备执行:TASK-GEYM-1
正在执行:Thread ID:9,Task Name=TASK-GEYM-1
准备执行:TASK-GEYM-2
正在执行:Thread ID:10,Task Name=TASK-GEYM-2
准备执行:TASK-GEYM-3
正在执行:Thread ID:11,Task Name=TASK-GEYM-3
准备执行:TASK-GEYM-4
正在执行:Thread ID:12,Task Name=TASK-GEYM-4
执行完成:TASK-GEYM-0
执行完成:TASK-GEYM-1
执行完成:TASK-GEYM-2
执行完成:TASK-GEYM-3
执行完成:TASK-GEYM-4
线程池退出

可以看到,所有任务的执行前、执行后的时间点以及任务的名字都已经可以捕获了。这对于应用程序的调试和诊断是非常有帮助的。

3.2.7 合理的选择:优化线程池线程数量

线程池的大小对系统的性能有一定的影响。过大或者过小的线程数量都无法发挥最优的系统性能,但是线程池大小的确定也不需要做得非常精确,因为只要避免极大和极小两种情况,线程池的大小对系统的性能并不会影响太大。一般来说,确定线程池的大小需要考虑CPU数量、内存大小等因素。在《Java Concurrency in Practice》一书中给出了一个估算线程池大小的经验公式:

Ncpu = CPU的数量
Ucpu = 目标CPU的使用率,0 ≤ Ucpu≤ 1
W/C = 等待时间与计算时间的比率

为保持处理器达到期望的使用率,最优的池的大小等于:

Nthreads = Ncpu * Ucpu * ( 1 + W/C )

在Java中,可以通过:

Runtime.getRuntime().availableProcessors()

取得可用的CPU数量。

3.2.8 堆栈去哪里了:在线程池中寻找堆栈

大家一定还记得在上一章中,我们详解介绍了一些幽灵般的错误。我想,码农的痛苦也莫过于此了。多线程本身就是非常容易引起这类错误的。如果你使用了线程池,那么这种幽灵错误可能会变得更加常见。

下面来看一个简单的案例,首先,我们有一个Runnable接口,它用来计算两个数的商:

public class DivTask implements Runnable {
  int a,b;
  public DivTask(int a,int b){
    this.a=a;
    this.b=b;
  }
  @Override
  public void run() {
    double re=a/b;
    System.out.println(re);
  }
}

如果程序运行了这个任务,那么我们期望它可以打印出给定两个数的商。现在我们构造几个这样的任务,希望程序可以为我们计算一组给定数组的商:

public static void main(String[] args) throws InterruptedException, ExecutionException {
  ThreadPoolExecutor pools=new ThreadPoolExecutor(0, Integer.MAX_VALUE,
      0L, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>());

  for(int i=0;i<5;i++){
    pools.submit(new DivTask(100,i));
  }
}

上述代码将DivTask提交到线程池,从这个for循环来看,我们应该会得到5个结果,分别是100除以给定的i后的商。但如果你真的运行程序,你得到的全部结果是:

33.0
50.0
100.0
25.0

你没有看错!只有4个输出。也就说是程序漏算了一组数据!但更不幸的是,程序没有任何日志,没有任何错误提示,就好像一切都正常一样。在这个简单的案例中,只要你稍有经验,你就能发现,作为除数的i取到了0,这个缺失的值很可能是由于除以0导致的。但在稍复杂的业务场景中,这种错误足可以让你几天萎靡不振。

因此,使用线程池虽然是件好事,但是还是得处处留意这些“坑”。线程池很有可能会“吃”掉程序抛出的异常,导致我们对程序的错误一无所知。

异常堆栈对于程序员的重要性就好像指南针对于茫茫大海上的船只。没有指南针,船只只能更艰难地寻找方向,没有异常堆栈,排查问题时,也只能像大海捞针那样,慢慢琢磨了。我的一个领导曾经说过:最鄙视那些出错不打印异常堆栈的行为!我相信,任何一个得益于异常堆栈而快速定位问题的程序员来说,一定对这句话深有体会。所以,这里我们将和大家讨论向线程池讨回异常堆栈的方法。

一种最简单的方法,就是放弃submit(),改用execute()。将上述的任务提交代码改成:

pools.execute(new DivTask(100,i));

或者你使用下面的方法改造你的submit():

Future re=pools.submit(new DivTask(100,i));
re.get();

上面两种方法都可以得到部分堆栈信息,如下所示:

Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
   at geym.conc.ch3.trace.DivTask.run(DivTask.java:11)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1142)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor. java:617)
   at java.lang.Thread.run(Thread.java:745)
33.0
100.0
50.0
25.0

注意了,我这里说的是部分。这是因为从这两个异常堆栈中我们只能知道异常是在哪里抛出的(这里是DivTask的第11行)。但是我们还希望得到另外一个更重要的信息,那就是这个任务到底是在哪里提交的?而任务的具体提交位置已经被线程池完全淹没了。顺着堆栈,我们最多只能找到线程池中的调度流程,而这对于我们几乎是没有价值的。

既然这样,我们只能自己动手,丰衣足食啦!为了今后少加几天班,我们还是非常有必要将堆栈的信息彻底挖出来!扩展我们的ThreadPoolExecutor线程池,让它在调度任务之前,先保存一下提交任务线程的堆栈信息。如下所示:

01 public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
02   public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
03       long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
04     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
05   }
06
07   @Override
08   public void execute(Runnable task) {
09     super.execute(wrap(task, clientTrace(), Thread.currentThread()
10         .getName()));
11   }
12
13   @Override
14   public Future<?> submit(Runnable task) {
15     return super.submit(wrap(task, clientTrace(), Thread.currentThread()
16         .getName()));
17   }
18
19   private Exception clientTrace() {
20     return new Exception("Client stack trace");
21   }
22
23   private Runnable wrap(final Runnable task, final Exception clientStack,
24       String clientThreadName) {
25     return new Runnable() {
26       @Override
27       public void run() {
28         try {
29           task.run();
30         } catch (Exception e) {
31           clientStack.printStackTrace();
32           throw e;
33         }
34       }
35     };
36   }
37 }

在第23行代码中,wrap()方法的第2个参数为一个异常,里面保存着提交任务的线程的堆栈信息。该方法将我们传入的Runnable任务进行一层包装,使之能处理异常信息。当任务发生异常时,这个异常会被打印。

好了,现在可以使用我们的新成员(TraceThreadPoolExecutor)来尝试执行这段代码了:

14 public static void main(String[] args) {
15   ThreadPoolExecutor pools=new TraceThreadPoolExecutor(0, Integer.MAX_VALUE,
16     0L, TimeUnit.SECONDS,
17     new SynchronousQueue<Runnable>());
18
19   /**
20    * 错误堆栈中可以看到是在哪里提交的任务
21    */
22   for(int i=0;i<5;i++){
23     pools.execute(new DivTask(100,i));
24   }
25 }

执行上述代码,就可以得到以下信息:

java.lang.Exception: Client stack trace
   at geym.conc.ch3.trace.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:28)
   at geym.conc.ch3.trace.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:17)
   at geym.conc.ch3.trace.TraceMain.main(TraceMain.java:23)
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
   at geym.conc.ch3.trace.DivTask.run(DivTask.java:11)
   at geym.conc.ch3.trace.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:37)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)
33.0
100.0
25.0
50.0

熟悉的异常又回来了!现在,我们不仅可以得到异常发生的Runnable实现内的信息,我们也知道了这个任务是在哪里提交的。如此丰富的信息,我相信可以帮助我们瞬间定位问题!

3.2.9 分而治之:Fork/Join框架

“分而治之”一直是一个非常有效地处理大量数据的方法。著名的MapReduce也是采取了分而治之的思想。简单来说,就是如果你要处理1000个数据,但是你并不具备处理1000个数据的能力,那么你可以只处理其中的10个,然后,分阶段处理100次,将100次的结果进行合成,那就是最终想要的对原始1000个数据的处理结果。

Fork一词的原始含义是吃饭用的叉子,也有分叉的意思。在Linux平台中,函数fork()用来创建子进程,使得系统进程可以多一个执行分支。在Java中也沿用了类似的命名方式。

而join()的含义在之前的章节中已经解释过,这里也是相同的意思,表示等待。也就是使用fork()后系统多了一个执行分支(线程),所以需要等待这个执行分支执行完毕,才有可能得到最终的结果,因此join()就表示等待。

在实际使用中,如果毫无顾忌地使用fork()开启线程进行处理,那么很有可能导致系统开启过多的线程而严重影响性能。所以,在JDK中,给出了一个ForkJoinPool线程池,对于fork()方法并不急着开启线程,而是提交给ForkJoinPool线程池进行处理,以节省系统资源。使用Fork/Join进行数据处理时的总体结构如图3.8所示。

图3.8 Fork/Join执行逻辑

由于线程池的优化,提交的任务和线程数量并不是一对一的关系。在绝大多数情况下,一个物理线程实际上是需要处理多个逻辑任务的。因此,每个线程必然需要拥有一个任务队列。因此,在实际执行过程中,可能遇到这么一种情况:线程A已经把自己的任务都执行完成了,而线程B还有一堆任务等着处理,此时,线程A就会“帮助”线程B,从线程B的任务队列中拿一个任务过来处理,尽可能地达到平衡。如图3.9所示,显示了这种互相帮助的精神。一个值得注意的地方是,当线程试图帮助别人时,总是从任务队列的底部开始拿数据,而线程试图执行自己的任务时,则是从相反的顶部开始拿。因此这种行为也十分有利于避免数据竞争。

图3.9 互相帮助的线程

下面我们来看一下ForkJoinPool的一个重要的接口:

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

你可以向ForkJoinPool线程池提交一个ForkJoinTask任务。所谓ForkJoinTask任务就是支持fork()分解以及join()等待的任务。ForkJoinTask有两个重要的子类,RecursiveAction和RecursiveTask。它们分别表示没有返回值的任务和可以携带返回值的任务。图3.10显示了这两个类的作用和区别。

图3.10 RecursiveAction和RecursiveTask

下面我们简单地展示Fork/Join框架的使用,这里用来计算数列求和。

01 public class CountTask extends RecursiveTask<Long>{
02   private static final int THRESHOLD = 10000;
03   private long start;
04   private long end;
05
06   public CountTask(long start,long end){
07     this.start=start;
08     this.end=end;
09   }
10
11   public Long compute(){
12     long sum=0;
13     boolean canCompute = (end-start)<THRESHOLD;
14     if(canCompute){
15       for(long i=start;i<=end;i++){
16         sum +=i;
17       }
18     }else{
19       //分成100个小任务
20       long step=(start+end)/100;
21       ArrayList<CountTask> subTasks=new ArrayList<CountTask>();
22       long pos=start;
23       for(int i=0;i<100;i++){
24         long lastOne=pos+step;
25         if(lastOne>end)lastOne=end;
26         CountTask subTask=new CountTask(pos,lastOne);
27         pos+=step+1;
28         subTasks.add(subTask);
29         subTask.fork();
30       }
31       for(CountTask  t:subTasks){
32         sum+=t.join();
33       }
34     }
35     return sum;
36   }
37
38   public static void main(String[]args){
39     ForkJoinPool forkJoinPool = new ForkJoinPool();
40     CountTask task = new CountTask(0,200000L);
41     ForkJoinTask<Long> result = forkJoinPool.submit(task);
42     try{
43       long res = result.get();
44       System.out.println("sum="+res);
45     }catch(InterruptedException e){
46       e.printStackTrace();
47     }catch(ExecutionException e){
48       e.printStackTrace();
49     }
50   }
51 }

由于计算数列的和必然是需要函数返回值的,因此选择RecursiveTask作为任务的模型。上述代码第39行,建立ForkJoinPool线程池。在第40行,构造一个计算1到200000求和的任务。在第41行将任务提交给线程池,线程池会返回一个携带结果的任务,通过get()方法可以得到最终结果(第43行)。如果在执行get()方法时,任务没有结束,那么主线程就会在get()方法时等待。

下面来看一下CountTask的实现。首先CountTask继承自RecursiveTask,可以携带返回值,这里的返回值类型设置为long。第2行定义的THRESHOLD设置了任务分解的规模,也就是如果需要求和的总数大于THRESHOLD个,那么任务就需要再次分解,否则就可以直接执行。这个判断逻辑在第14行有体现。如果任务可以直接执行,那么直接进行求和,返回结果。否则,就对任务再次分解。每次分解时,简单地将原有任务划分成100个等规模的小任务,并使用fork()提交子任务。之后,等待所有的子任务结束,并将结果再次求和(第31~33行)。

在使用ForkJoin时需要注意,如果任务的划分层次很深,一直得不到返回,那么可能出现两种情况:第一,系统内的线程数量越积越多,导致性能严重下降。第二,函数的调用层次变得很深,最终导致栈溢出。不同版本的JDK内部实现机制可能有差异,从而导致其表现不同。

下面的StackOverflowError异常就是加深本例的调用层次,在JDK 8上得到的错误。

java.util.concurrent.ExecutionException: java.lang.StackOverflowError at java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1000) at geym.conc.ch3.fork.CountTask.main(CountTask.java:51) Caused by: java.lang.StackOverflowError

此外,ForkJoin线程池使用一个无锁的栈来管理空闲线程。如果一个工作线程暂时取不到可用的任务,则可能会被挂起,挂起的线程将会被压入由线程池维护的栈中。待将来有任务可用时,再从栈中唤醒这些线程。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文