返回介绍

3.1 多线程的团队协作:同步控制

发布于 2024-08-21 22:20:21 字数 28923 浏览 0 评论 0 收藏 0

同步控制是并发程序必不可少的重要手段。之前介绍的synchronized关键字就是一种最简单的控制方法。它决定了一个线程是否可以访问临界区资源。同时,Object.wait()和Object.notify()方法起到了线程等待和通知的作用。这些工具对于实现复杂的多线程协作起到了重要的作用。在本节中,我们首先将介绍synchronized、Object.wait()和Object.notify()方法的替代品(或者说是增强版)——重入锁。

3.1.1 synchronized的功能扩展:重入锁

重入锁可以完全替代synchronized关键字。在JDK 5.0的早期版本中,重入锁的性能远远好于synchronized,但从JDK 6.0开始,JDK在synchronized上做了大量的优化,使得两者的性能差距并不大。

重入锁使用java.util.concurrent.locks.ReentrantLock类来实现。下面是一段最简单的重入锁使用案例:

01 public class ReenterLock implements Runnable{
02   public static ReentrantLock lock=new ReentrantLock();
03   public static int i=0;
04   @Override
05   public void run() {
06     for(int j=0;j<10000000;j++){
07       lock.lock();
08       try{
09         i++;
10       }finally{
11         lock.unlock();
12       }
13     }
14   }
15   public static void main(String[] args) throws InterruptedException {
16     ReenterLock tl=new ReenterLock();
17     Thread t1=new Thread(tl);
18     Thread t2=new Thread(tl);
19     t1.start();t2.start();
20     t1.join();t2.join();
21     System.out.println(i);
22   }
23 }

上述代码第7~12行,使用重入锁保护临界区资源i,确保多线程对i操作的安全性。从这段代码可以看到,与synchronized相比,重入锁有着显示的操作过程。开发人员必须手动指定何时加锁,何时释放锁。也正因为这样,重入锁对逻辑控制的灵活性要远远好于synchronized。但值得注意的是,在退出临界区时,必须记得释放锁(代码第11行),否则,其他线程就没有机会再访问临界区了。

有些同学可能会对重入锁的名字感到奇怪。锁就叫锁呗,为什么要加上“重入”两个字呢?从类的命名上看,Re- Entrant-Lock翻译成重入锁也是非常贴切的。之所以这么叫,那是因为这种锁是可以反复进入的。当然,这里的反复仅仅局限于一个线程。上述代码的第7~12行,可以写成下面的形式:

lock.lock();
lock.lock();
try{
  i++;
}finally{
  lock.unlock();
  lock.unlock();
}

在这种情况下,一个线程连续两次获得同一把锁。这是允许的!如果不允许这么操作,那么同一个线程在第2次获得锁时,将会和自己产生死锁。程序就会“卡死”在第2次申请锁的过程中。但需要注意的是,如果同一个线程多次获得锁,那么在释放锁的时候,也必须释放相同次数。如果释放锁的次数多,那么会得到一个java.lang.IllegalMonitorStateException异常,反之,如果释放锁的次数少了,那么相当于线程还持有这个锁,因此,其他线程也无法进入临界区。

除了使用上的灵活性外,重入锁还提供了一些高级功能。比如,重入锁可以提供中断处理的能力。

• 中断响应

对于synchronized来说,如果一个线程在等待锁,那么结果只有两种情况,要么它获得这把锁继续执行,要么它就保持等待。而使用重入锁,则提供另外一种可能,那就是线程可以被中断。也就是在等待锁的过程中,程序可以根据需要取消对锁的请求。有些时候,这么做是非常有必要的。比如,如果你和朋友约好一起去打球,如果你等了半小时,朋友还没有到,突然接到一个电话,说由于突发情况,不能如约了。那么你一定就扫兴地打道回府了。中断正式提供了一套类似的机制。如果一个线程正在等待锁,那么它依然可以收到一个通知,被告知无须再等待,可以停止工作了。这种情况对于处理死锁是有一定帮助的。

下面的代码产生了一个死锁,但得益于锁中断,我们可以很轻易地解决这个死锁。

01 public class IntLock implements Runnable {
02   public static ReentrantLock lock1 = new ReentrantLock();
03   public static ReentrantLock lock2 = new ReentrantLock();
04   int lock;
05   /**
06    * 控制加锁顺序,方便构造死锁
07    * @param lock
08    */
09   public IntLock(int lock) {
10     this.lock = lock;
11   }
12
13   @Override
14   public void run() {
15     try {
16       if (lock == 1) {
17         lock1.lockInterruptibly();
18         try{
19           Thread.sleep(500);
20         }catch(InterruptedException e){}
21         lock2.lockInterruptibly();
22       } else {
23         lock2.lockInterruptibly();
24         try{
25           Thread.sleep(500);
26         }catch(InterruptedException e){}
27         lock1.lockInterruptibly();
28       }
29
30     } catch (InterruptedException e) {
31       e.printStackTrace();
32     } finally {
33       if (lock1.isHeldByCurrentThread())
34         lock1.unlock();
35       if (lock2.isHeldByCurrentThread())
36         lock2.unlock();
37       System.out.println(Thread.currentThread().getId()+":线程退出");
38     }
39   }
40
41   public static void main(String[] args) throws InterruptedException {
42     IntLock r1 = new IntLock(1);
43     IntLock r2 = new IntLock(2);
44     Thread t1 = new Thread(r1);
45     Thread t2 = new Thread(r2);
46     t1.start();t2.start();
47     Thread.sleep(1000);
48     //中断其中一个线程
49     t2.interrupt();
50   }
51 }

线程t1和t2启动后,t1先占用lock1,再占用lock2;t2先占用lock2,再请求lock1。因此,很容易形成t1和t2之间的相互等待。在这里,对锁的请求,统一使用lockInterruptibly()方法。这是一个可以对中断进行响应的锁申请动作,即在等待锁的过程中,可以响应中断。

在代码第47行,主线程main处于休眠,此时,这两个线程处于死锁的状态,在代码第49行,由于t2线程被中断,故t2会放弃对lock1的申请,同时释放已获得lock2。这个操作导致t1线程可以顺利得到lock2而继续执行下去。

执行上述代码,将输出:

java.lang.InterruptedException
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.
doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.
acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
  at java.util.concurrent.locks.ReentrantLock.lockInterruptibly
(ReentrantLock.java:335)
  at geym.conc.ch3.synctrl.IntLock.run(IntLock.java:31)
  at java.lang.Thread.run(Thread.java:745)
9:线程退出
8:线程退出

可以看到,中断后,两个线程双双退出。但真正完成工作的只有t1。而t2线程则放弃其任务直接退出,释放资源。

• 锁申请等待限时

除了等待外部通知之外,要避免死锁还有另外一种方法,那就是限时等待。依然以约朋友打球为例,如果朋友迟迟不来,又无法联系到他。那么,在等待1~2个小时后,我想大部分人都会扫兴离去。对线程来说也是这样。通常,我们无法判断为什么一个线程迟迟拿不到锁。也许是因为死锁了,也许是因为产生了饥饿。但如果给定一个等待时间,让线程自动放弃,那么对系统来说是有意义的。我们可以使用tryLock()方法进行一次限时的等待。

下面这段代码展示了限时等待锁的使用。

01 public class TimeLock implements Runnable{
02   public static ReentrantLock lock=new ReentrantLock();
03   @Override
04   public void run() {
05     try {
06       if(lock.tryLock(5, TimeUnit.SECONDS)){
07         Thread.sleep(6000);
08       }else{
09         System.out.println("get lock failed");
10       }
11     } catch (InterruptedException e) {
12       e.printStackTrace();
13     }finally{if(lock.isHeldByCurrentThread()) lock.unlock();}
14   }
15   public static void main(String[] args) {
16     TimeLock tl=new TimeLock();
17     Thread t1=new Thread(tl);
18     Thread t2=new Thread(tl);
19     t1.start();
20     t2.start();
21   }
22 }

在这里,tryLock()方法接收两个参数,一个表示等待时长,另外一个表示计时单位。这里的单位设置为秒,时长为5,表示线程在这个锁请求中,最多等待5秒。如果超过5秒还没有得到锁,就会返回false。如果成功获得锁,则返回true。

在本例中,由于占用锁的线程会持有锁长达6秒,故另一个线程无法在5秒的等待时间内获得锁,因此,请求锁会失败。

ReentrantLock.tryLock()方法也可以不带参数直接运行。在这种情况下,当前线程会尝试获得锁,如果锁并未被其他线程占用,则申请锁会成功,并立即返回true。如果锁被其他线程占用,则当前线程不会进行等待,而是立即返回false。这种模式不会引起线程等待,因此也不会产生死锁。下面演示了这种使用方式:

01 public class TryLock implements Runnable {
02   public static ReentrantLock lock1 = new ReentrantLock();
03   public static ReentrantLock lock2 = new ReentrantLock();
04   int lock;
05
06   public TryLock(int lock) {
07     this.lock = lock;
08   }
09
10   @Override
11   public void run() {
12     if (lock == 1) {
13       while (true) {
14         if (lock1.tryLock()) {
15           try {
16             try {
17               Thread.sleep(500);
18             } catch (InterruptedException e) {
19             }
20             if (lock2.tryLock()) {
21               try {
22                 System.out.println(Thread.currentThread()
23                     .getId() + ":My Job done");
24                 return;
25               } finally {
26                 lock2.unlock();
27               }
28             }
29           } finally {
30             lock1.unlock();
31           }
32         }
33       }
34     } else {
35       while (true) {
36         if (lock2.tryLock()) {
37           try {
38             try {
39               Thread.sleep(500);
40             } catch (InterruptedException e) {
41             }
42             if (lock1.tryLock()) {
43               try {
44                 System.out.println(Thread.currentThread()
45                     .getId() + ":My Job done");
46                 return;
47               } finally {
48                 lock1.unlock();
49               }
50             }
51           } finally {
52             lock2.unlock();
53           }
54         }
55       }
56     }
57   }
58
59   public static void main(String[] args) throws InterruptedException {
60     TryLock r1 = new TryLock(1);
61     TryLock r2 = new TryLock(2);
62     Thread t1 = new Thread(r1);
63     Thread t2 = new Thread(r2);
64     t1.start();
65     t2.start();
66   }
67 }

上述代码中,采用了非常容易死锁的加锁顺序。也就是先让t1获得lock1,再让t2获得lock2,接着做反向请求,让t1申请lock2,t2申请lock1。在一般情况下,这会导致t1和t2相互等待,从而引起死锁。

但是使用tryLock()后,这种情况就大大改善了。由于线程不会傻傻地等待,而是不停地尝试,因此,只要执行足够长的时间,线程总是会得到所有需要的资源,从而正常执行(这里以线程同时获得lock1和lock2两把锁,作为其可以正常执行的条件)。在同时获得lock1和lock2后,线程就打印出标志着任务完成的信息“My Job done”。

执行上述代码,等待一会儿(由于线程中包含休眠500毫秒的代码)。最终你还是可以欣喜地看到程序执行完毕,并产生如下输出,表示两个线程双双正常执行。

9:My Job done
8:My Job done

• 公平锁

在大多数情况下,锁的申请都是非公平的。也就是说,线程1首先请求了锁A,接着线程2也请求了锁A。那么当锁A可用时,是线程1可以获得锁还是线程2可以获得锁呢?这是不一定的。系统只是会从这个锁的等待队列中随机挑选一个。因此不能保证其公平性。这就好比买票不排队,大家都乱哄哄得围在售票窗口前,售票员忙得焦头烂额,也顾不及谁先谁后,随便找个人出票就完事了。而公平的锁,则不是这样,它会按照时间的先后顺序,保证先到者先得,后到者后得。公平锁的一大特点是:它不会产生饥饿现象。只要你排队,最终还是可以等到资源的。如果我们使用synchronized关键字进行锁控制,那么产生的锁就是非公平的。而重入锁允许我们对其公平性进行设置。它有一个如下的构造函数:

public ReentrantLock(boolean fair)

当参数fair为true时,表示锁是公平的。公平锁看起来很优美,但是要实现公平锁必然要求系统维护一个有序队列,因此公平锁的实现成本比较高,性能相对也非常低下,因此,默认情况下,锁是非公平的。如果没有特别的需求,也不需要使用公平锁。公平锁和非公平锁在线程调度表现上也是非常不一样的。下面的代码可以很好地突出公平锁的特点:

01 public class FairLock implements Runnable {
02   public static ReentrantLock fairLock = new ReentrantLock(true);
03
04   @Override
05   public void run() {
06     while(true){
07     try{
08       fairLock.lock();
09       System.out.println(Thread.currentThread().getName()+" 获得锁");
10     }finally{
11       fairLock.unlock();
12     }
13     }
14   }
15
16   public static void main(String[] args) throws InterruptedException {
17     FairLock r1 = new FairLock();
18     Thread t1=new Thread(r1,"Thread_t1");
19     Thread t2=new Thread(r1,"Thread_t2");
20     t1.start();t2.start();
21   }
22 }

上述代码第2行,指定锁是公平的。接着,由两个线程t1和t2分别请求这把锁,并且在得到锁后,进行一个控制台的输出,表示自己得到了锁。在公平锁的情况下,得到输出通常如下所示:

Thread_t1 获得锁
Thread_t2 获得锁
Thread_t1 获得锁
Thread_t2 获得锁
Thread_t1 获得锁
Thread_t2 获得锁
Thread_t1 获得锁
Thread_t2 获得锁
Thread_t1 获得锁

由于代码会产生大量输出,这里只截取部分进行说明。在这个输出中,很明显可以看到,两个线程基本上是交替获得锁的,几乎不会发生同一个线程连续多次获得锁的可能,从而公平性也得到了保证。如果不使用公平锁,那么情况会完全不一样,下面是使用非公平锁时的部分输出:

前面还有一大段t1连续获得锁的输出
Thread_t1 获得锁
Thread_t1 获得锁
Thread_t1 获得锁
Thread_t1 获得锁
Thread_t2 获得锁
Thread_t2 获得锁
Thread_t2 获得锁
Thread_t2 获得锁
Thread_t2 获得锁
后面还有一大段t2连续获得锁的输出

可以看到,根据系统的调度,一个线程会倾向于再次获取已经持有的锁,这种分配方式是高效的,但是无公平性可言。

对上面ReentrantLock的几个重要方法整理如下。

· lock():获得锁,如果锁已经被占用,则等待。

· lockInterruptibly():获得锁,但优先响应中断。

· tryLock():尝试获得锁,如果成功,返回true,失败返回false。该方法不等待,立即返回。

· tryLock(long time, TimeUnit unit):在给定时间内尝试获得锁。

· unlock():释放锁。

就重入锁的实现来看,它主要集中在Java层面。在重入锁的实现中,主要包含三个要素:

第一,是原子状态。原子状态使用CAS操作(在第4章进行详细讨论)来存储当前锁的状态,判断锁是否已经被别的线程持有。

第二,是等待队列。所有没有请求到锁的线程,会进入等待队列进行等待。待有线程释放锁后,系统就能从等待队列中唤醒一个线程,继续工作。

第三,是阻塞原语park()和unpark(),用来挂起和恢复线程。没有得到锁的线程将会被挂起。有关park()和unpark()的详细介绍,可以参考3.1.7线程阻塞工具类:LockSupport。

3.1.2 重入锁的好搭档:Condition条件

如果大家理解了Object.wait()和Object.notify()方法的话,那么就能很容易地理解Condition对象了。它和wait()和notify()方法的作用是大致相同的。但是wait()和notify()方法是和synchronized关键字合作使用的,而Condtion是与重入锁相关联的。通过Lock接口(重入锁就实现了这一接口)的Condition newCondition()方法可以生成一个与当前重入锁绑定的Condition实例。利用Condition对象,我们就可以让线程在合适的时间等待,或者在某一个特定的时刻得到通知,继续执行。

Condition接口提供的基本方法如下:

void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();

以上方法的含义如下:

· await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()或者signalAll()方法时,线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。

· awaitUninterruptibly()方法与await()方法基本相同,但是它并不会在等待过程中响应中断。

· singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。这和Obejct.notify()方法很类似。

下面的代码简单地演示了Condition的功能:

01 public class ReenterLockCondition implements Runnable{
02   public static ReentrantLock lock=new ReentrantLock();
03   public static Condition condition = lock.newCondition();
04   @Override
05   public void run() {
06     try {
07       lock.lock();
08       condition.await();
09       System.out.println("Thread is going on");
10     } catch (InterruptedException e) {
11       e.printStackTrace();
12     }finally{
13       lock.unlock();
14     }
15   }
16   public static void main(String[] args) throws InterruptedException {
17     ReenterLockCondition tl=new ReenterLockCondition();
18     Thread t1=new Thread(tl);
19     t1.start();
20     Thread.sleep(2000);
21     //通知线程t1继续执行
22     lock.lock();
23     condition.signal();
24     lock.unlock();
25   }
26 }

代码第3行,通过lock生成一个与之绑定的Condition对象。代码第8行,要求线程在Condition对象上进行等待。代码第23行,由主线程main发出通知,告知等待在Condition上的线程可以继续执行了。

和Object.wait()和notify()方法一样,当线程使用Condition.await()时,要求线程持有相关的重入锁,在Condition.await()调用后,这个线程会释放这把锁。同理,在Condition.signal()方法调用时,也要求线程先获得相关的锁。在signal()方法调用后,系统会从当前Condition对象的等待队列中,唤醒一个线程。一旦线程被唤醒,它会重新尝试获得与之绑定的重入锁,一旦成功获取,就可以继续执行了。因此,在signal()方法调用之后,一般需要释放相关的锁,谦让给被唤醒的线程,让它可以继续执行。比如,在本例中,第24行代码就释放了重入锁,如果省略第24行,那么,虽然已经唤醒了线程t1,但是由于它无法重新获得锁,因而也就无法真正的继续执行。

在JDK内部,重入锁和Condition对象被广泛地使用,以ArrayBlockingQueue为例(可以参阅“3.3 JDK并发容器”一节),它的put()方法实现如下:

//在ArrayBlockingQueue中的一些定义
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();        //生成一个与lock绑定的Condition
notFull =  lock.newCondition();

//put()方法的实现
public void put(E e) throws InterruptedException {
  if (e == null) throw new NullPointerException();
  final E[] items = this.items;
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();        //对put()方法做同步
  try {
    try {
      while (count == items.length)  //如果当前队列已满
        notFull.await();       //等待队列有足够的空间
    } catch (InterruptedException ie) {
      notFull.signal();
      throw ie;
    }
    insert(e);               //当notFull被通知时,说明有足够空间
  } finally {
    lock.unlock();
  }
}
private void insert(E x) {
  items[putIndex] = x;
  putIndex = inc(putIndex);
  ++count;
  notEmpty.signal();             //通知需要take()的线程,队列已有数据
}

同理,对应take()方法实现如下:

public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();        //对take()方法做同步
  try {
    try {
      while (count == 0)         //如果队列为空
        notEmpty.await();      //则消费者队列要等待一个非空的信号
    } catch (InterruptedException ie) {
      notEmpty.signal();
      throw ie;
    }
    E x = extract();
    return x;
  } finally {
    lock.unlock();
  }
}
private E extract() {
  final E[] items = this.items;
  E x = items[takeIndex];
  items[takeIndex] = null;
  takeIndex = inc(takeIndex);
  --count;
  notFull.signal();            //通知put()线程队列已有空闲空间
  return x;
}

3.1.3 允许多个线程同时访问:信号量(Semaphore)

信号量为多线程协作提供了更为强大的控制方法。广义上说,信号量是对锁的扩展。无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源。信号量主要提供了以下构造函数:

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)  //第二个参数可以指定是否公平

在构造信号量对象时,必须要指定信号量的准入数,即同时能申请多少个许可。当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程可以访问某一个资源。信号量的主要逻辑方法有:

public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()

acquire()方法尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。acquireUninterruptibly()方法和acquire()方法类似,但是不响应中断。tryAcquire()尝试获得一个许可,如果成功返回true,失败则返回false,它不会进行等待,立即返回。release()用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。

在JDK的官方Javadoc中,就有一个有关信号量使用的简单实例,有兴趣的读者可以自行翻阅,这里我给出一个更加傻瓜化的例子:

01 public class SemapDemo implements Runnable{
02   final Semaphore semp = new Semaphore(5);
03   @Override
04   public void run() {
05     try {
06       semp.acquire();
07       //模拟耗时操作
08       Thread.sleep(2000);
09       System.out.println(Thread.currentThread().getId()+":done!");
10       semp.release();
11     } catch (InterruptedException e) {
12       e.printStackTrace();
13     }
14   }
15
16   public static void main(String[] args) {
17     ExecutorService exec = Executors.newFixedThreadPool(20);
18     final SemapDemo demo=new SemapDemo();
19     for(int i=0;i<20;i++){
20       exec.submit(demo);
21     }
22   }
23 }

上述代码中,第7~9行为临界区管理代码,程序会限制执行这段代码的线程数。这里在第2行,申明了一个包含5个许可的信号量。这就意味着同时可以有5个线程进入代码段第7~9行。申请信号量使用acquire()操作,在离开时,务必使用release()释放信号量(代码第10行)。这就和释放锁是一个道理。如果不幸发生了信号量的泄露(申请了但没有释放),那么可以进入临界区的线程数量就会越来越少,直到所有的线程均不可访问。在本例中,同时开启20个线程。观察这段程序的输出,你就会发现系统以5个线程一组为单位,依次输出带有线程ID的提示文本。

3.1.4 ReadWriteLock读写锁

ReadWriteLock是JDK5中提供的读写分离锁。读写分离锁可以有效地帮助减少锁竞争,以提升系统性能。用锁分离的机制来提升性能非常容易理解,比如线程A1、A2、A3进行写操作,B1、B2、B3进行读操作,如果使用重入锁或者内部锁,则理论上说所有读之间、读与写之间、写和写之间都是串行操作。当B1进行读取时,B2、B3则需要等待锁。由于读操作并不对数据的完整性造成破坏,这种等待显然是不合理。因此,读写锁就有了发挥功能的余地。

在这种情况下,读写锁允许多个线程同时读,使得B1、B2、B3之间真正并行。但是,考虑到数据完整性,写写操作和读写操作间依然是需要相互等待和持有锁的。总的来说,读写锁的访问约束如表3.1所示。

表3.1 读写锁的访问约束情况

非阻塞

阻塞

阻塞

阻塞

· 读-读不互斥:读读之间不阻塞。

· 读-写互斥:读阻塞写,写也会阻塞读。

· 写-写互斥:写写阻塞。

如果在系统中,读操作次数远远大于写操作,则读写锁就可以发挥最大的功效,提升系统的性能。这里我给出一个稍微夸张点的案例,来说明读写锁对性能的帮助。

01 public class ReadWriteLockDemo {
02   private static Lock lock=new ReentrantLock();
03   private static ReentrantReadWriteLock readWriteLock=new
ReentrantReadWriteLock();
04   private static Lock readLock = readWriteLock.readLock();
05   private static Lock writeLock = readWriteLock.writeLock();
06   private int value;
07
08   public Object handleRead(Lock lock) throws InterruptedException{
09     try{
10       lock.lock();         //模拟读操作
11       Thread.sleep(1000);      //读操作的耗时越多,读写锁的优势就越明显
12       return value;
13     }finally{
14     lock.unlock();
15     }
16   }
17
18   public void handleWrite(Lock lock,int index) throws InterruptedException{
19     try{
20       lock.lock();        //模拟写操作
21       Thread.sleep(1000);
22       value=index;
23     }finally{
24     lock.unlock();
25     }
26   }
27
28   public static void main(String[] args) {
29     final ReadWriteLockDemo demo=new ReadWriteLockDemo();
30     Runnable readRunnale=new Runnable() {
31       @Override
32       public void run() {
33         try {
34           demo.handleRead(readLock);
35 //          demo.handleRead(lock);
36         } catch (InterruptedException e) {
37           e.printStackTrace();
38         }
39       }
40     };
41     Runnable writeRunnale=new Runnable() {
42       @Override
43       public void run() {
44         try {
45           demo.handleWrite(writeLock,new Random().nextInt());
46 //          demo.handleWrite(lock,new Random().nextInt());
47         } catch (InterruptedException e) {
48           e.printStackTrace();
49         }
50       }
51     };
52
53     for(int i=0;i<18;i++){
54       new Thread(readRunnale).start();
55     }
56
57     for(int i=18;i<20;i++){
58       new Thread(writeRunnale).start();
59     }
60   }
61 }

上述代码中,第11行和第21行分别模拟了一个非常耗时的操作,让线程耗时1秒钟。它们分别对应读耗时和写耗时。代码第34和45行,分别是读线程和写线程。在这里,第34行使用读锁,第35行使用写锁。第53~55行开启18个读线程,第57~59行,开启两个写线程。由于这里使用了读写分离,因此,读线程完全并行,而写会阻塞读,因此,实际上这段代码运行大约2秒多就能结束(写线程之间是实际串行的)。而如果使用第35行代替第34行,使用第46行代替第45行执行上述代码,即,使用普通的重入锁代替读写锁。那么所有的读和写线程之间都必须相互等待,因此整个程序的执行时间将长达20余秒。

3.1.5 倒计时器:CountDownLatch

CountDownLatch是一个非常实用的多线程控制工具类。“Count Down”在英文中意为倒计数,Latch为门闩的意思。如果翻译成为倒计数门闩,我想大家都会觉得不知所云吧!因此,这里简单地称之为倒计数器。在这里,门闩的含义是:把门锁起来,不让里面的线程跑出来。因此,这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。

对于倒计时器,一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检查。只有等所有的检查完毕后,引擎才能点火。这种场景就非常适合使用CountDownLatch。它可以使得点火线程等待所有检查线程全部完工后,再执行。

CountDownLatch的构造函数接收一个整数作为参数,即当前这个计数器的计数个数。

public CountDownLatch(int count)

下面这个简单的示例,演示了CountDownLatch的使用。

01 public class CountDownLatchDemo implements Runnable {
02   static final CountDownLatch end = new CountDownLatch(10);
03   static final CountDownLatchDemo demo=new CountDownLatchDemo();
04   @Override
05   public void run() {
06     try {
07       //模拟检查任务
08       Thread.sleep(new Random().nextInt(10)*1000);
09       System.out.println("check complete");
10       end.countDown();
11     } catch (InterruptedException e) {
12       e.printStackTrace();
13     }
14   }
15   public static void main(String[] args) throws InterruptedException {
16     ExecutorService exec = Executors.newFixedThreadPool(10);
17     for(int i=0;i<10;i++){
18       exec.submit(demo);
19     }
20     //等待检查
21     end.await();
22     //发射火箭
23     System.out.println("Fire!");
24     exec.shutdown();
25   }
26 }

上述代码第2行,生成一个CountDownLatch实例。计数数量为10。这表示需要有10个线程完成任务,等待在CountDownLatch上的线程才能继续执行。代码第10行,使用了CountDownLatch.countdown()方法,也就是通知CountDownLatch,一个线程已经完成了任务,倒计时器可以减1啦。第21行,使用CountDownLatch.await()方法,要求主线程等待所有10个检查任务全部完成。待10个任务全部完成后,主线程才能继续执行。

上述案例的执行逻辑可以用图3.1简单表示。

图3.1 CountDownLatch示意图

主线程在CountDownLatch上等待,当所有检查任务全部完成后,主线程方能继续执行。

3.1.6 循环栅栏:CyclicBarrier

CyclicBarrier是另外一种多线程并发控制实用工具。和CountDownLatch非常类似,它也可以实现线程间的计数等待,但它的功能比CountDownLatch更加复杂且强大。

CyclicBarrier可以理解为循环栅栏。栅栏就是一种障碍物,比如,通常在私人宅邸的周围就可以围上一圈栅栏,阻止闲杂人等入内。这里当然就是用来阻止线程继续执行,要求线程在栅栏处等待。前面Cyclic意为循环,也就是说这个计数器可以反复使用。比如,假设我们将计数器设置为10,那么凑齐第一批10个线程后,计数器就会归零,然后接着凑齐下一批10个线程,这就是循环栅栏内在的含义。

CyclicBarrier的使用场景也很丰富。比如,司令下达命令,要求10个士兵一起去完成一项任务。这时,就会要求10个士兵先集合报道,接着,一起雄赳赳气昂昂地去执行任务。当10个士兵把自己手头的任务都执行完成了,那么司令才能对外宣布,任务完成!

比CountDownLatch略微强大一些,CyclicBarrier可以接收一个参数作为barrierAction。所谓barrierAction就是当计数器一次计数完成后,系统会执行的动作。如下构造函数,其中,parties表示计数总数,也就是参与的线程总数。

public CyclicBarrier(int parties, Runnable barrierAction)

下面的示例使用CyclicBarrier演示了上述司令命令士兵完成任务的场景。

01 public class CyclicBarrierDemo {
02   public static class Soldier implements Runnable {
03     private String soldier;
04     private final CyclicBarrier cyclic;
05
06     Soldier(CyclicBarrier cyclic, String soldierName) {
07       this.cyclic = cyclic;
08       this.soldier = soldierName;
09     }
10
11     public void run() {
12       try {
13         //等待所有士兵到齐
14         cyclic.await();
15         doWork();
16         //等待所有士兵完成工作
17         cyclic.await();
18       } catch (InterruptedException e) {
19         e.printStackTrace();
20       } catch (BrokenBarrierException e) {
21         e.printStackTrace();
22       }
23     }
24
25     void doWork() {
26       try {
27         Thread.sleep(Math.abs(new Random().nextInt()%10000));
28       } catch (InterruptedException e) {
29         e.printStackTrace();
30       }
31       System.out.println(soldier + ":任务完成");
32     }
33   }
34
35   public static class BarrierRun implements Runnable {
36     boolean flag;
37     int N;
38     public BarrierRun(boolean flag, int N) {
39       this.flag = flag;
40       this.N = N;
41     }
42
43     public void run() {
44       if (flag) {
45         System.out.println("司令:[士兵" + N + "个,任务完成!]");
46       } else {
47         System.out.println("司令:[士兵" + N + "个,集合完毕!]");
48         flag = true;
49       }
50     }
51   }
52
53   public static void main(String args[]) throws InterruptedException {
54     final int N = 10;
55     Thread[] allSoldier=new Thread[N];
56     boolean flag = false;
57     CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
58     //设置屏障点,主要是为了执行这个方法
59     System.out.println("集合队伍!");
60     for (int i = 0; i < N; ++i) {
61       System.out.println("士兵 "+i+" 报道!");
62       allSoldier[i]=new Thread(new Soldier(cyclic, "士兵 " + i));
63       allSoldier[i].start();
64     }
65   }
66 }

上述代码第57行,创建了CyclicBarrier实例,并将计数器设置为10,并要求在计数器达到指标时,执行第43行的run()方法。每一个士兵线程会执行第11行定义的run()方法。在第14行,每一个士兵线程都会等待,直到所有的士兵都集合完毕。集合完毕后,意味着CyclicBarrier的一次计数完成,当再一次调用CyclicBarrier.await()时,会进行下一次计数。第15行,模拟了士兵的任务。当一个士兵任务执行完毕后,他就会要求CyclicBarrier开始下一次计数,这次计数主要目的是监控是否所有的士兵都已经完成了任务。一旦任务全部完成,第35行定义的BarrierRun就会被调用,打印相关信息。

上述代码的执行输出如下:

集合队伍!
士兵 0 报道!
//篇幅有限,省略其他几个士兵
士兵 9 报道!
司令:[士兵10个,集合完毕!]
士兵 0:任务完成
//篇幅有限,省略其他几个士兵
士兵 4:任务完成
司令:[士兵10个,任务完成!]

整个工作过程的图示如图3.2所示。

图3.2 CyclicBarrier工作示意图

CyclicBarrier.await()方法可能会抛出两个异常。一个是InterruptedException,也就是在等待过程中,线程被中断,应该说这是一个非常通用的异常。大部分迫使线程等待的方法都可能会抛出这个异常,使得线程在等待时依然可以响应外部紧急事件。另外一个异常则是CyclicBarrier特有的BrokenBarrierException。一旦遇到这个异常,则表示当前的CyclicBarrier已经破损了,可能系统已经没有办法等待所有线程到齐了。如果继续等待,可能就是徒劳无功的,因此,还是就地散货,打道回府吧!上述代码第18~22行处理了这2种异常。

如果我们在上述代码的第63行后,插入以下代码,使得第5个士兵线程产生中断:

if(i==5){
  allSoldier[0].interrupt();
}

如果这样做,我们很可能就会得到1个InterruptedException和9个BrokenBarrierException。这个InterruptedException就是被中断线程抛出的。而其他9个BrokenBarrierException,则是等待在当前CyclicBarrier上的线程抛出的。这个异常可以避免其他9个线程进行永久的、无谓的等待(因为其中一个线程已经被中断,等待是没有结果的)。

3.1.7 线程阻塞工具类:LockSupport

LockSupport是一个非常方便实用的线程阻塞工具,它可以在线程内任意位置让线程阻塞。和Thread.suspend()相比,它弥补了由于resume()在前发生,导致线程无法继续执行的情况。和Object.wait()相比,它不需要先获得某个对象的锁,也不会抛出InterruptedException异常。

LockSupport的静态方法park()可以阻塞当前线程,类似的还有parkNanos()、parkUntil()等方法。它们实现了一个限时的等待。

大家应该还记得,我们在第2章中提到的那个有关suspend()永久卡死线程的例子吧!现在,我们可以用LockSupport重写这个程序:

01 public class LockSupportDemo {
02   public static Object u = new Object();
03   static ChangeObjectThread t1 = new ChangeObjectThread("t1");
04   static ChangeObjectThread t2 = new ChangeObjectThread("t2");
05
06   public static class ChangeObjectThread extends Thread {
07     public ChangeObjectThread(String name){
08       super.setName(name);
09     }
10     @Override
11     public void run() {
12       synchronized (u) {
13         System.out.println("in "+getName());
14         LockSupport.park();
15       }
16     }
17   }
18
19   public static void main(String[] args) throws InterruptedException {
20     t1.start();
21     Thread.sleep(100);
22     t2.start();
23     LockSupport.unpark(t1);
24     LockSupport.unpark(t2);
25     t1.join();
26     t2.join();
27   }
28 }

注意,这里只是将原来的suspend()和resume()方法用park()和unpark()方法做了替换。当然,我们依然无法保证unpark()方法发生在park()方法之后。但是执行这段代码,你会发现,它自始至终都可以正常的结束,不会因为park()方法而导致线程永久性的挂起。

这是因为LockSupport类使用类似信号量的机制。它为每一个线程准备了一个许可,如果许可可用,那么park()函数会立即返回,并且消费这个许可(也就是将许可变为不可用),如果许可不可用,就会阻塞。而unpark()则使得一个许可变为可用(但是和信号量不同的是,许可不能累加,你不可能拥有超过一个许可,它永远只有一个)。

这个特点使得:即使unpark()操作发生在park()之前,它也可以使下一次的park()操作立即返回。这也就是上述代码可顺利结束的主要原因。

同时,处于park()挂起状态的线程不会像suspend()那样还给出一个令人费解的Runnable的状态。它会非常明确地给出一个WAITING状态,甚至还会标注是park()引起的:

"t1" #8 prio=5 os_prio=0 tid=0x00b1a400 nid=0x1994 waiting on condition [0x1619f000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
    at geym.conc.ch3.ls.LockSupportDemo$ChangeObjectThread.run(LockSupportDemo.java:18)
    - locked <0x048b2680> (a java.lang.Object)

这使得分析问题时格外方便。此外,如果你使用park(Object)函数,还可以为当前线程设置一个阻塞对象。这个阻塞对象会出现在线程Dump中。这样在分析问题时,就更加方便了。

比如,如果我们将上述代码第14行的park()改为:

LockSupport.park(this);

那么在线程Dump时,你可能会看到如下信息:

"t1" #8 prio=5 os_prio=0 tid=0x0117ac00 nid=0x2034 waiting on condition [0x15d0f000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x048b4738> (a geym.conc.ch3.ls.LockSupport-
Demo$ChangeObjectThread)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at geym.conc.ch3.ls.LockSupportDemo$ChangeObjectThread.run
(LockSupportDemo.java:18)
    - locked <0x048b2808> (a java.lang.Object)

注意,在堆栈中,我们甚至还看到了当前线程等待的对象,这里就是ChangeObjectThread实例。

除了有定时阻塞的功能外,LockSupport.park()还能支持中断影响。但是和其他接收中断的函数很不一样,LockSupport.park()不会抛出InterruptedException异常。它只是会默默的返回,但是我们可以从Thread.interrupted()等方法获得中断标记。

01 public class LockSupportIntDemo {
02   public static Object u = new Object();
03   static ChangeObjectThread t1 = new ChangeObjectThread("t1");
04   static ChangeObjectThread t2 = new ChangeObjectThread("t2");
05
06   public static class ChangeObjectThread extends Thread {
07     public ChangeObjectThread(String name){
08       super.setName(name);
09     }
10     @Override
11     public void run() {
12       synchronized (u) {
13         System.out.println("in "+getName());
14         LockSupport.park();
15         if(Thread.interrupted()){
16           System.out.println(getName()+" 被中断了");
17         }
18       }
19       System.out.println(getName()+"执行结束");
20     }
21   }
22
23   public static void main(String[] args) throws InterruptedException {
24     t1.start();
25     Thread.sleep(100);
26     t2.start();
27     t1.interrupt();
28     LockSupport.unpark(t2);
29   }
30 }

注意上述代码在第27行,中断了处于park()状态的t1。之后,t1可以马上响应这个中断,并且返回。之后在外面等待的t2才可以进入临界区,并最终由LockSupport.unpark(t2)操作使其运行结束。

in t1
t1 被中断了
t1 执行结束
in t2
t2执行结束

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文