返回介绍

4.4 无锁

发布于 2024-08-21 22:20:21 字数 26558 浏览 0 评论 0 收藏 0

就人的性格而言,我们可以分为乐天派和悲观派。对于乐天派来说,总是会把事情往好的方面想。他们认为所有事情总是不太容易发生问题,出错是小概率的,所以我们可以肆无忌惮地做事。如果真的不幸遇到了问题,则有则改之无则加勉。而对于悲观的人群来说,他们总是担惊受怕,认为出错是一种常态,所以无论巨细,都考虑得面面俱到,滴水不漏,确保为人处世,万无一失。

对于并发控制而言,锁是一种悲观的策略。它总是假设每一次的临界区操作会产生冲突,因此,必须对每次操作都小心翼翼。如果有多个线程同时需要访问临界区资源,就宁可牺牲性能让线程进行等待,所以说锁会阻塞线程执行。而无锁是一种乐观的策略,它会假设对资源的访问是没有冲突的。既然没有冲突,自然不需要等待,所以所有的线程都可以在不停顿的状态下持续执行。那遇到冲突怎么办呢?无锁的策略使用一种叫做比较交换的技术(CAS Compare And Swap)来鉴别线程冲突,一旦检测到冲突产生,就重试当前操作直到没有冲突为止。

4.4.1 与众不同的并发策略:比较交换(CAS)

与锁相比,使用比较交换(下文简称CAS)会使程序看起来更加复杂一些。但由于其非阻塞性,它对死锁问题天生免疫,并且,线程间的相互影响也远远比基于锁的方式要小。更为重要的是,使用无锁的方式完全没有锁竞争带来的系统开销,也没有线程间频繁调度带来的开销,因此,它要比基于锁的方式拥有更优越的性能。

CAS算法的过程是这样:它包含三个参数CAS(V,E,N)。V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。最后,CAS返回当前V的真实值。CAS操作是抱着乐观的态度进行的,它总是认为自己可以成功完成操作。当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作即使没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。

简单地说,CAS需要你额外给出一个期望值,也就是你认为这个变量现在应该是什么样子的。如果变量不是你想象的那样,那说明它已经被别人修改过了。你就重新读取,再次尝试修改就好了。

在硬件层面,大部分的现代处理器都已经支持原子化的CAS指令。在JDK 5.0以后,虚拟机便可以使用这个指令来实现并发操作和并发数据结构,并且,这种操作在虚拟机中可以说是无处不在。

4.4.2 无锁的线程安全整数:AtomicInteger

为了让Java程序员能够受益于CAS等CPU指令,JDK并发包中有一个atomic包,里面实现了一些直接使用CAS操作的线程安全的类型。

其中,最常用的一个类,应该就是AtomicInteger。你可以把它看做是一个整数。但是与Integer不同,它是可变的,并且是线程安全的。对其进行修改等任何操作,都是用CAS指令进行的。这里简单列举一下AtomicInteger的一些主要方法,对于其他原子类,操作也是非常类似的:

public final int get()                 //取得当前值
public final void set(int newValue)          //设置当前值
public final int getAndSet(int newValue)         //设置新值,并返回旧值
public final boolean compareAndSet(int expect, int u)  //如果当前值为expect,则设置为u
public final int getAndIncrement()           //当前值加1,返回旧值
public final int getAndDecrement()           //当前值减1,返回旧值
public final int getAndAdd(int delta)          //当前值增加delta,返回旧值
public final int incrementAndGet()           //当前值加1,返回新值
public final int decrementAndGet()           //当前值减1,返回新值
public final int addAndGet(int delta)          //当前值增加delta,返回新值

就内部实现上来说,AtomicInteger中保存一个核心字段:

private volatile int value;

它就代表了AtomicInteger的当前实际取值。此外还有一个:

private static final long valueOffset;

它保存着value字段在AtomicInteger对象中的偏移量。后面你会看到,这个偏移量是实现AtomicInteger的关键。

AtomicInteger的使用非常简单,这里给出一个示例:

01 public class AtomicIntegerDemo {
02   static AtomicInteger i=new AtomicInteger();
03   public static class AddThread implements Runnable{
04     public void run(){
05      for(int k=0;k<10000;k++)
06        i.incrementAndGet();
07     }
08   }
09   public static void main(String[] args) throws InterruptedException {
10     Thread[] ts=new Thread[10];
11     for(int k=0;k<10;k++){
12       ts[k]=new Thread(new AddThread());
13     }
14     for(int k=0;k<10;k++){ts[k].start();}
15     for(int k=0;k<10;k++){ts[k].join();}
16     System.out.println(i);
17   }
18 }

第6行的AtomicInteger.incrementAndGet()方法会使用CAS操作将自己加1,同时也会返回当前值(这里忽略了当前值)。如果你执行这段代码,你会看到程序输出了100000。这说明程序正常执行,没有错误。如果不是线程安全,i的值应该会小于100000才对。

使用AtomicInteger会比使用锁具有更好的性能。出于篇幅限制,这里不再给出AtomicInteger和锁的性能对比的测试代码,相信写一段简单的小代码测试两者的性能应该不是难事。这里让我们关注一下incrementAndGet()的内部实现(我们基于JDK 1.7分析,JDK 1.8与1.7的实现有所不同)。

1 public final int incrementAndGet() {
2   for (;;) {
3     int current = get();
4     int next = current + 1;
5     if (compareAndSet(current, next))
6       return next;
7   }
8 }

其中get()方法非常简单,就是返回内部数据value。

public final int get() {
  return value;
}

这里让人映像深刻的,应该是incrementAndGet()方法的第2行for循环吧!如果你是初次看到这样的代码,可能会觉得很奇怪,为什么连设置一个值那么简单的操作都需要一个死循环呢?原因就是:CAS操作未必是成功的,因此对于不成功的情况,我们就需要进行不断的尝试。第3行的get()取得当前值,接着加1后得到新值next。这里,我们就得到了CAS必需的两个参数:期望值以及新值。使用compareAndSet()方法将新值next写入,成功的条件是在写入的时刻,当前的值应该要等于刚刚取得的current。如果不是这样,就说明AtomicInteger的值在第3行到第5行代码之间,又被其他线程修改过了。当前线程看到的状态就是一个过期状态。因此,compareAndSet返回失败,需要进行下一次重试,直到成功。

以上就是CAS操作的基本思想。在后面我们会看到,无论程序多么复杂,其基本原理总是不变的。

和AtomicInteger类似的类还有AtomicLong用来代表long型,AtomicBoolean表示boolean型,AtomicReference表示对象引用。

4.4.3 Java中的指针:Unsafe类

如果你对技术有着不折不挠的追求,应该还会特别在意incrementAndGet()方法中compareAndSet()的实现。现在,就让我们更进一步看一下它吧!

public final boolean compareAndSet(int expect, int update) {
  return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

在这里,我们看到一个特殊的变量unsafe,它是sun.misc.Unsafe类型。从名字看,这个类应该是封装了一些不安全的操作。那什么操作是不安全的呢?学习过C或者C++的话,大家应该知道,指针是不安全的,这也是在Java中把指针去除的重要原因。如果指针指错了位置,或者计算指针偏移量时出错,结果可能是灾难性的,你很有可能会覆盖别人的内存,导致系统崩溃。

而这里的Unsafe就是封装了一些类似指针的操作。compareAndSwapInt()方法是一个navtive方法,它的几个参数含义如下:

public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x);

第一个参数o为给定的对象,offset为对象内的偏移量(其实就是一个字段到对象头部的偏移量,通过这个偏移量可以快速定位字段),expected表示期望值,x表示要设置的值。如果指定的字段的值等于expected,那么就会把它设置为x。

不难看出,compareAndSwapInt()方法的内部,必然是使用CAS原子指令来完成的。此外,Unsafe类还提供了一些方法,主要有以下几个(以Int操作为例,其他数据类型是类似的):

//获得给定对象偏移量上的int值
public native int getInt(Object o, long offset);
//设置给定对象偏移量上的int值
public native void putInt(Object o, long offset, int x);
//获得字段在对象中的偏移量
public native long objectFieldOffset(Field f);
//设置给定对象的int值,使用volatile语义
public native void putIntVolatile(Object o, long offset, int x);
//获得给定对象对象的int值,使用volatile语义
public native int   getIntVolatile(Object o, long offset);
//和putIntVolatile()一样,但是它要求被操作字段就是volatile类型的
public native void putOrderedInt(Object o, long offset, int x);

如果大家还记得“3.3.4深度剖析ConcurrentLinkedQueue”一节中描述的ConcurrentLinkedQueue实现,应该对ConcurrentLinkedQueue中的Node还有些印象。Node的一些CAS操作也都是使用Unsafe类来实现的。大家可以回顾一下,以加深对Unsafe类的印象。

这里就可以看到,虽然Java抛弃了指针。但是在关键时刻,类似指针的技术还是必不可少的。这里底层的Unsafe实现就是最好的例子。但是很不幸,JDK的开发人员并不希望大家使用这个类。获得Unsafe实例的方法是调动其工厂方法getUnsafe()。但是,它的实现却是这样:

public static Unsafe getUnsafe() { Class cc = Reflection.getCallerClass(); if (cc.getClassLoader() != null) throw new SecurityException("Unsafe"); return theUnsafe; }

注意加粗部分的代码,它会检查调用getUnsafe()函数的类,如果这个类的ClassLoader不为null,就直接抛出异常,拒绝工作。因此,这也使得我们自己的应用程序无法直接使用Unsafe类。它是一个JDK内部使用的专属类。

注意:根据Java类加载器的工作原理,应用程序的类由App Loader加载。而系统核心类,如rt.jar中的类由Bootstrap类加载器加载。Bootstrap加载器没有Java对象的对象,因此试图获得这个类加载器会返回null。所以,当一个类的类加载器为null时,说明它是由Bootstrap加载的,而这个类也极有可能是rt.jar中的类。

4.4.4 无锁的对象引用:AtomicReference

AtomicReference和AtomicInteger非常类似,不同之处就在于AtomicInteger是对整数的封装,而AtomicReference则对应普通的对象引用。也就是它可以保证你在修改对象引用时的线程安全性。在介绍AtomicReference的同时,我希望同时提出一个有关原子操作的逻辑上的不足。

之前我们说过,线程判断被修改对象是否可以正确写入的条件是对象的当前值和期望值是否一致。这个逻辑从一般意义上来说是正确的。但有可能出现一个小小的例外,就是当你获得对象当前数据后,在准备修改为新值前,对象的值被其他线程连续修改了两次,而经过这两次修改后,对象的值又恢复为旧值。这样,当前线程就无法正确判断这个对象究竟是否被修改过。如图4.2所示,显示了这种情况。

图4.2 对象值被反复修改回原数据

一般来说,发生这种情况的概率很小。而且即使发生了,可能也不是什么大问题。比如,我们只是简单地要做一个数值加法,即使在我取得期望值后,这个数字被不断的修改,只要它最终改回了我的期望值,我的加法计算就不会出错。也就是说,当你修改的对象没有过程的状态信息,所有的信息都只保存于对象的数值本身。

但是,在现实中,还可能存在另外一种场景,就是我们是否能修改对象的值,不仅取决于当前值,还和对象的过程变化有关,这时,AtomicReference就无能为力了。

打一个比方,如果有一家蛋糕店,为了挽留客户,决定为贵宾卡里余额小于20元的客户一次性赠送20元,刺激消费者充值和消费。但条件是,每一位客户只能被赠送一次。

现在,我们就来模拟这个场景,为了演示AtomicReference,我在这里使用AtomicReference实现这个功能。首先,我们模拟用户账户余额。

定义用户账户余额:

static AtomicReference<Integer> money=new AtomicReference<Integer>();
// 设置账户初始值小于20,显然这是一个需要被充值的账户
money.set(19);

接着,我们需要若干个后台线程,它们不断扫描数据,并为满足条件的客户充值。

01 //模拟多个线程同时更新后台数据库,为用户充值
02 for(int i = 0 ; i < 3 ; i++) {
03   new Thread() {
04     public void run() {
05       while(true){
06         while(true){
07           Integer m=money.get();
08           if(m<20){
09             if(money.compareAndSet(m, m+20)){
10           System.out.println("余额小于20元,充值成功,余额:"+money.get()+"元");
11               break;
12             }
13           }else{
14             //System.out.println("余额大于20元,无须充值");
15             break ;
16           }
17         }
18       }
19     }
20   }.start();
21 }

上述代码第8行,判断用户余额并给予赠送金额。如果已经被其他用户处理,那么当前线程就会失败。因此,可以确保用户只会被充值一次。

此时,如果很不幸,用户正好正在进行消费,就在赠予金额到账的同时,他进行了一次消费,使得总金额又小于20元,并且正好累计消费了20元。使得消费、赠予后的金额等于消费前、赠予前的金额。这时,后台的赠予进程就会误以为这个账户还没有赠予,所以,存在被多次赠予的可能。下面模拟了这个消费线程:

01 //用户消费线程,模拟消费行为
02 new Thread() {
03   public void run() {
04     for(int i=0;i<100;i++){
05       while(true){
06         Integer m=money.get();
07         if(m>10){
08           System.out.println("大于10元");
09           if(money.compareAndSet(m, m-10)){
10             System.out.println("成功消费10元,余额:"+money.get());
11             break;
12           }
13         }else{
14           System.out.println("没有足够的金额");
15           break;
16         }
17       }
18       try {Thread.sleep(100);} catch (InterruptedException e) {}
19     }
20   }
21 }.start();

上述代码中,消费者只要贵宾卡里的钱大于10元,就会立即进行一次10元的消费。执行上述程序,得到的输出如下:

余额小于20元,充值成功,余额:39元
大于10元
成功消费10元,余额:29
大于10元
成功消费10元,余额:19
余额小于20元,充值成功,余额:39元
大于10元
成功消费10元,余额:29
大于10元
成功消费10元,余额:39
余额小于20元,充值成功,余额:39元

从这一段输出中,可以看到,这个账户被先后反复多次充值。其原因正是因为账户余额被反复修改,修改后的值等于原有的数值,使得CAS操作无法正确判断当前数据状态。

虽然说这种情况出现的概率不大,但是依然是有可能出现的。因此,当业务上确实可能出现这种情况时,我们也必须多加防范。体贴的JDK也已经为我们考虑到了这种情况,使用AtomicStampedReference就可以很好地解决这个问题。

4.4.5 带有时间戳的对象引用:AtomicStampedReference

AtomicReference无法解决上述问题的根本因为是对象在修改过程中,丢失了状态信息。对象值本身与状态被画上了等号。因此,我们只要能够记录对象在修改过程中的状态值,就可以很好地解决对象被反复修改导致线程无法正确判断对象状态的问题。

AtomicStampedReference正是这么做的。它内部不仅维护了对象值,还维护了一个时间戳(我这里把它称为时间戳,实际上它可以使任何一个整数来表示状态值)。当AtomicStampedReference对应的数值被修改时,除了更新数据本身外,还必须要更新时间戳。当AtomicStampedReference设置对象值时,对象值以及时间戳都必须满足期望值,写入才会成功。因此,即使对象值被反复读写,写回原值,只要时间戳发生变化,就能防止不恰当的写入。

AtomicStampedReference的几个API在AtomicReference的基础上新增了有关时间戳的信息:

//比较设置 参数依次为:期望值 写入新值 期望时间戳 新时间戳
public boolean compareAndSet(V expectedReference,V
newReference,int expectedStamp,int newStamp)
//获得当前对象引用
public V getReference()
//获得当前时间戳
public int getStamp()
//设置当前对象引用和时间戳
public void set(V newReference, int newStamp)

有了AtomicStampedReference这个法宝,我们就再也不用担心对象被写坏啦!现在,就让我们使用AtomicStampedReference来修正那个贵宾卡充值的问题:

01 public class AtomicStampedReferenceDemo {
02 static AtomicStampedReference<Integer> money=new AtomicStampedReference<Integer>(19,0);
03   public static void main(String[] args) {
04     //模拟多个线程同时更新后台数据库,为用户充值
05     for(int i = 0 ; i < 3 ; i++) {
06       final int timestamp=money.getStamp();
07       new Thread() {
08         public void run() {
09           while(true){
10             while(true){
11               Integer m=money.getReference();
12               if(m<20){
13              if(money.compareAndSet(m, m+20,timestamp,timestamp+1)){
14       System.out.println("余额小于20元,充值成功,余额:"+money.getReference()+"元");
15                   break;
16                 }
17               }else{
18                 //System.out.println("余额大于20元,无须充值");
19                 break ;
20               }
21             }
22           }
23         }
24       }.start();
25     }
26
27     //用户消费线程,模拟消费行为
28     new Thread() {
29       public void run() {
30         for(int i=0;i<100;i++){
31           while(true){
32             int timestamp=money.getStamp();
33             Integer m=money.getReference();
34             if(m>10){
35               System.out.println("大于10元");
36               if(money.compareAndSet(m, m-10,timestamp,timestamp+1)){
37            System.out.println("成功消费10元,余额:"+money.getReference());
38                 break;
39               }
40             }else{
41               System.out.println("没有足够的金额");
42               break;
43             }
44           }
45           try {Thread.sleep(100);} catch (InterruptedException e) {}
46         }
47       }
48     }.start();
49   }
50 }

第2行,我们使用AtomicStampedReference代替原来的AtomicReference。第6行获得账户的时间戳,后续的赠予操作以这个时间戳为依据。如果赠予成功(第13行),则修改时间戳,使得系统不可能发生二次赠予的情况。消费线程也是类似,每次操作,都使得时间戳加1(第36行),使之不可能重复。

执行上述代码,可以得到以下输出:

余额小于20元,充值成功,余额:39元
大于10元
成功消费10元,余额:29
大于10元
成功消费10元,余额:19
大于10元
成功消费10元,余额:9
没有足够的金额

可以看到,账户只被赠予了一次。

4.4.6 数组也能无锁:AtomicIntegerArray

除了提供基本数据类型外,JDK还为我们准备了数组等复合结构。当前可用的原子数组有:AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray,分别表示整数数组、long型数组和普通的对象数组。

这里以AtomicIntegerArray为例,展示原子数组的使用方式。

AtomicIntegerArray本质上是对int[]类型的封装,使用Unsafe类通过CAS的方式控制int[]在多线程下的安全性。它提供了以下几个核心API:

//获得数组第i个下标的元素
public final int get(int i)
//获得数组的长度
public final int length()
//将数组第i个下标设置为newValue,并返回旧的值
public final int getAndSet(int i, int newValue)
//进行CAS操作,如果第i个下标的元素等于expect,则设置为update,设置成功返回true
public final boolean compareAndSet(int i, int expect, int update)
//将第i个下标的元素加1
public final int getAndIncrement(int i)
//将第i个下标的元素减1
public final int getAndDecrement(int i)
//将第i个下标的元素增加delta(delta可以是负数)
public final int getAndAdd(int i, int delta)

下面给出一个简单的示例,展示AtomicIntegerArray的使用:

01 public class AtomicIntegerArrayDemo {
02   static AtomicIntegerArray arr = new AtomicIntegerArray(10);
03   public static class AddThread implements Runnable{
04     public void run(){
05      for(int k=0;k<10000;k++)
06        arr.getAndIncrement(k%arr.length());
07     }
08   }
09   public static void main(String[] args) throws InterruptedException {
10     Thread[] ts=new Thread[10];
11     for(int k=0;k<10;k++){
12       ts[k]=new Thread(new AddThread());
13     }
14     for(int k=0;k<10;k++){ts[k].start();}
15     for(int k=0;k<10;k++){ts[k].join();}
16     System.out.println(arr);
17   }
18 }

上述代码第2行,申明了一个内含10个元素的数组。第3行定义的线程对数组内10个元素进行累加操作,每个元素各加1000次。第11行,开启10个这样的线程。因此,可以预测,如果线程安全,数组内10个元素的值必然都是10000。反之,如果线程不安全,则部分或者全部数值会小于10000。

程序的输出结果如下:

[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

这说明AtomicIntegerArray确实合理地保证了数组的线程安全性。

4.4.7 让普通变量也享受原子操作:AtomicIntegerFieldUpdater

有时候,由于初期考虑不周,或者后期的需求变化,一些普通变量可能也会有线程安全的需求。如果改动不大,我们可以简单地修改程序中每一个使用或者读取这个变量的地方。但显然,这样并不符合软件设计中的一条重要原则——开闭原则。也就是系统对功能的增加应该是开放的,而对修改应该是相对保守的。而且,如果系统里使用到这个变量的地方特别多,一个一个修改也是一件令人厌烦的事情(况且很多使用场景下可能只是只读的,并无线程安全的强烈要求,完全可以保持原样)。

如果你有这种困扰,在这里根本不需要担心,因为在原子包里还有一个实用的工具类AtomicIntegerFieldUpdater。它可以让你在不改动(或者极少改动)原有代码的基础上,让普通的变量也享受CAS操作带来的线程安全性,这样你可以修改极少的代码,来获得线程安全的保证。这听起来是不是让人很激动呢?

根据数据类型不同,这个Updater有三种,分别是AtomicIntegerFieldUpdater、AtomicLong- FieldUpdater和AtomicReferenceFieldUpdater。顾名思义,它们分别可以对int、long和普通对象进行CAS修改。

现在来思考这么一个场景。假设某地要进行一次选举。现在模拟这个投票场景,如果选民投了候选人一票,就记为1,否则记为0。最终的选票显然就是所有数据的简单求和。

01 public class AtomicIntegerFieldUpdaterDemo {
02   public static class Candidate{
03     int id;
04     volatile int score;
05   }
06   public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater
07     = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
08   //检查Updater是否工作正确
09   public static AtomicInteger allScore=new AtomicInteger(0);
10   public static void main(String[] args) throws InterruptedException {
11     final Candidate stu=new Candidate();
12     Thread[] t=new Thread[10000];
13     for(int i = 0 ; i < 10000 ; i++) {
14       t[i]=new Thread() {
15         public void run() {
16           if(Math.random()>0.4){
17             scoreUpdater.incrementAndGet(stu);
18             allScore.incrementAndGet();
19           }
20         }
21       };
22       t[i].start();
23     }
24     for(int i = 0 ; i < 10000 ; i++) {  t[i].join();}
25     System.out.println("score="+stu.score);
26     System.out.println("allScore="+allScore);
27   }
28 }

上述代码模拟了这个计票场景,候选人的得票数量记录在Candidate.score中。注意,它是一个普通的volatile变量。而volatile变量并不是线程安全的。第6~7行定义了AtomicIntegerFieldUpdater实例,用来对Candidate.score进行写入。而后续的allScore我们用来检查AtomicIntegerFieldUpdater的正确性。如果AtomicIntegerFieldUpdater真的保证了线程安全,那么最终Candidate.score和allScore的值必然是相等的。否则,就说明AtomicIntegerFieldUpdater根本没有确保线程安全的写入。第12~21行模拟了计票过程,这里假设有大约60%的人投赞成票,并且投票是随机进行的。第17行使用Updater修改Candidate.score(这里应该是线程安全的),第18行使用AtomicInteger计数,作为参考基准。

大家如果运行这段程序,不难发现,最终的Candidate.score总是和allScore绝对相等。这说明AtomicIntegerFieldUpdater很好地保证了Candidate.score的线程安全。

虽然AtomicIntegerFieldUpdater很好用,但是还是有几个注意事项:

第一,Updater只能修改它可见范围内的变量。因为Updater使用反射得到这个变量。如果变量不可见,就会出错。比如如果score申明为private,就是不可行的。

第二,为了确保变量被正确的读取,它必须是volatile类型的。如果我们原有代码中未申明这个类型,那么简单地申明一下就行,这不会引起什么问题。

第三,由于CAS操作会通过对象实例中的偏移量直接进行赋值,因此,它不支持static字段(Unsafe. objectFieldOffset()不支持静态变量)。

好了,通过AtomicIntegerFieldUpdater,是不是让我们可以更加随心所欲地对系统关键数据进行线程安全的保护呢?

4.4.8 挑战无锁算法:无锁的Vector实现

我们已经比较完整地介绍了有关无锁的概念和使用方法。相对于有锁的方法,使用无锁的方式编程更加考验一个程序员的耐心和智力。但是,无锁带来的好处也是显而易见的,第一,在高并发的情况下,它比有锁的程序拥有更好的性能;第二,它天生就是死锁免疫的。就凭借这两个优势,就值得我们冒险尝试使用无锁的并发。

这里,我想向大家介绍一种使用无锁方式实现的Vector。通过这个案例,我们可以更加深刻地认识无锁的算法,同时也可以学习一下有关Vector实现的细节和算法技巧(在本例中,讲述的无锁Vector来自于amino并发包)。

我们将这个无锁的Vector称为LockFreeVector。它的特点是可以根据需求动态扩展其内部空间。在这里,我们使用二维数组来表示LockFreeVector的内部存储,如下:

private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets;

变量buckets存放所有的内部元素。从定义上看,它是一个保存着数组的数组,也就是通常的二维数组。特别之处在于这些数组都是使用CAS的原子数组。为什么使用二维数组去实现一个一维的Vector呢?这是为了将来Vector进行动态扩展时可以更加方便。我们知道,AtomicReferenceArray内部使用Object[]来进行实际数据的存储,这使得动态空间增加特别的麻烦,因此使用二维数组的好处就是为了将来可以方便地增加新的元素。

此外,为了更有序的读写数组,定义一个称为Descriptor的元素。它的作用是使用CAS操作写入新数据。

01 static class Descriptor<E> {
02   public int size;
03   volatile WriteDescriptor<E> writeop;
04   public Descriptor(int size, WriteDescriptor<E> writeop) {
05     this.size = size;
06     this.writeop = writeop;
07   }
08   public void completeWrite() {
09     WriteDescriptor<E> tmpOp = writeop;
10     if (tmpOp != null) {
11       tmpOp.doIt();
12       writeop = null; // this is safe since all write to writeop use
13       // null as r_value.
14     }
15   }
16 }
17
18 static class WriteDescriptor<E> {
19   public E oldV;
20   public E newV;
21   public AtomicReferenceArray<E> addr;
22   public int addr_ind;
23
24   public WriteDescriptor(AtomicReferenceArray<E> addr, int addr_ind,
25       E oldV, E newV) {
26     this.addr = addr;
27     this.addr_ind = addr_ind;
28     this.oldV = oldV;
29     this.newV = newV;
30   }
31
32   public void doIt() {
33     addr.compareAndSet(addr_ind, oldV, newV);
34   }
35 }

上述代码第4行定义的Descriptor构造函数接收两个参数,第一个为整个Vector的长度,第2个为一个writer。最终,写入数据是通过writer进行的(通过completeWrite()方法)。

第24行,WriteDescriptor的构造函数接收四个参数。第一个参数addr表示要修改的原子数组,第二个参数为要写入的数组索引位置,第三个oldV为期望值,第四个newV为需要写入的值。

在构造LockFreeVector时,显然需要将buckets和descriptor进行初始化。

public LockFreeVector() {
  buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET);
  buckets.set(0, new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE));
  descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0,
      null));
}

在这里N_BUCKET为30,也就是说这个buckets里面可以存放一共30个数组(由于数组无法动态增长,因此数组总数也就不能超过30个)。并且将第一个数组的大小FIRST_BUCKET_SIZE设为8。到这里,大家可能会有一个疑问,如果每个数组8个元素,一共30个数组,那岂不是一共只能存放240个元素吗?

如果大家了解JDK内的Vector实现,应该知道,Vector在进行空间增长时,默认情况下,每次都会将总容量翻倍。因此,这里也借鉴类似的思想,每次空间扩张,新的数组的大小为原来的两倍(即每次空间扩展都启用一个新的数组),因此,第一个数组为8,第二个就是16,第三个就是32。依此类推,因此30个数组可以支持的总元素达到

这数值已经超过了2^33,即在80亿以上。因此,可以满足一般的应用。

当有元素需要加入LockFreeVector时,使用一个名为push_back()的方法,将元素压入Vector最后一个位置。这个操作显然就是LockFreeVector的最为核心的方法,也是最能体现CAS使用特点的方法,它的实现如下:

01 public void push_back(E e) {
02   Descriptor<E> desc;
03   Descriptor<E> newd;
04   do {
05     desc = descriptor.get();
06     desc.completeWrite();
07
08     int pos = desc.size + FIRST_BUCKET_SIZE;
09     int zeroNumPos = Integer.numberOfLeadingZeros(pos);
10     int bucketInd = zeroNumFirst - zeroNumPos;
11     if (buckets.get(bucketInd) == null) {
12       int newLen = 2 * buckets.get(bucketInd - 1).length();
13       if (debug)
14         System.out.println("New Length is:" + newLen);
15       buckets.compareAndSet(bucketInd, null,
16           new AtomicReferenceArray<E>(newLen));
17     }
18
19     int idx = (0x80000000>>>zeroNumPos) ^ pos;
20     newd = new Descriptor<E>(desc.size + 1, new WriteDescriptor<E>(
21         buckets.get(bucketInd), idx, null, e));
22   } while (!descriptor.compareAndSet(desc, newd));
23   descriptor.get().completeWrite();
24 }

可以看到,这个方法主体部分是一个do-while循环,用来不断尝试对descriptor的设置。也就是通过CAS保证了descriptor的一致性和安全性。在第23行,使用descriptor将数据真正地写入数组中。这个descriptor写入的数据由第20~21行构造的WriteDescriptor决定。

在循环最开始(第5行),使用descriptor先将数据写入数组,是为了防止上一个线程设置完descriptor后(第22行),还没来得及执行第23行的写入,因此,做一次预防性的操作。

因为限制要将元素e压入Vector,因此,我们必须首先知道这个e应该放在哪个位置。由于目前使用了二维数组,因此我们自然需要知道e所在哪个数组(buckets中的下标位置)和数组中的下标。

第8~10行通过当前Vector的大小(desc.size),计算新的元素应该落入哪个数组。这里使用了位运算进行计算。

之前说过,LockFreeVector每次都会成倍的扩容。它的第1个数组长度为8,第2个就是16,第3个就是32,依此类推。它们的二进制表示如下。

· 00000000 00000000 00000000 00001000:第一个数组大小,28个前导零。

· 00000000 00000000 00000000 00010000:第二个数组大小,27个前导零。

· 00000000 00000000 00000000 00100000:第三个数组大小,26个前导零。

· 00000000 00000000 00000000 01000000:第四个数组大小,25个前导零。

它们之和就是整个LockFreeVector的总大小,因此,如果每一个数组都恰好填满,那么总大小应该类似如下的数值(以4个数组填满为例)。

· 00000000 00000000 00000000 01111000:4个数组都恰好填满时的大小。

导致这个数字进位的最小条件,就是加上二进制的1000。而这个数字正好是8(FIRST_BUCKET_SIZE就是8)。这就是第8行代码的意义。它可以使得数组大小发生一次二进制的进位(如果不进位说明还在第一个数组中),进位后前导零的数量就会发生变化。而元素所在的数组,和pos(第8行定义的变量)的前导零直接相关。每进行一次数组扩容,它的前导零就会减1。如果从来没有扩容过,它的前导零就是28个。以后,逐级减1。这就是第9行获得pos前导零的原因。第10行,通过pos的前导零可以立即定位使用哪个数组(也就是得到了bucketInd的值)。

第11行,判断这个数组是否存在。如果不存在,则创建这个数组,大小为前一个数组的两倍,并把它设置到buckets中。

接着再看一下元素没有恰好填满的情况。

· 00000000 00000000 00000000 00001000:第一个数组大小,28个前导零。

· 00000000 00000000 00000000 00010000:第二个数组大小,27个前导零。

· 00000000 00000000 00000000 00100000:第三个数组大小,26个前导零。

· 00000000 00000000 00000000 00000001:第四个数组大小,只有一个元素。

那么总大小如下。

· 00000000 00000000 00000000 00111001:元素总个数。

总个数加上二进制1000后,得到:

· 00000000 00000000 00000000 01000001

显然,通过前导零可以定位到第4个数组。而剩余位,显然就表示元素在当前数组内的偏移量(也就是数组下标)。根据这个理论,我们就可以通过pos计算这个元素应该放在给定数组的哪个位置。通过第19行代码,获得pos的除了第一位数字1以外的其他位的数值。因此,pos的前导零可以表示元素所在的数组,而pos的后面几位,则表示元素所在这个数组中的位置。由此,第19行代码就取得了元素的所在位置idx。

到此,我们就已经得到新元素位置的全部信息,剩下的就是将这些信息传递给Descriptor让它在给定的位置把元素e安置上去即可。这里,就通过CAS操作,保证写入正确性。

下面来看一下get()操作的实现:

1 @Override
2 public E get(int index) {
3   int pos = index + FIRST_BUCKET_SIZE;
4   int zeroNumPos = Integer.numberOfLeadingZeros(pos);
5   int bucketInd = zeroNumFirst - zeroNumPos;
6   int idx = (0x80000000>>>zeroNumPos) ^ pos;
7   return buckets.get(bucketInd).get(idx);
8 }

在get()的实现中,第3~6行使用了相同的算法获得所需元素的数组以及数组中的索引下标。这里简单地通过buckets定位到对应的元素即可。

这样,对于Vector来说两个重要的方法就已经实现了。其他方法也是非常类似的,这里就不再详细讨论了。

4.4.9 让线程之间互相帮助:细看SynchronousQueue的实现

在对线程池的介绍中,提到了一个非常特殊的等待队列SynchronousQueue。SynchronousQueue的容量为0,任何一个对SynchronousQueue的写需要等待一个对SynchronousQueue的读,反之亦然。因此,SynchronousQueue与其说是一个队列,不如说是一个数据交换通道。那SynchronousQueue的奇妙功能是如何实现的呢?

既然我打算在这一节中介绍它,那么SynchronousQueue就和无锁的操作脱离不了关系。实际上SynchronousQueue内部也正是大量使用了无锁工具。

对SynchronousQueue来说,它将put()和take()两个功能截然不同的操作抽象为一个共通的方法Transferer.transfer()。从字面上看,这就是数据传递的意思。它的完整签名如下:

Object transfer(Object e, boolean timed, long nanos)

当参数e为非空时,表示当前操作传递给一个消费者,如果为空,则表示当前操作需要请求一个数据。timed参数决定是否存在timeout时间,nanos决定了timeout的时长。如果返回值非空,则表示数据已经接受或者正常提供,如果为空,则表示失败(超时或者中断)。

SynchronousQueue内部会维护一个线程等待队列。等待队列中会保存等待线程以及相关数据的信息。比如,生产者将数据放入SynchronousQueue时,如果没有消费者接收,那么数据本身和线程对象都会打包在队列中等待(因为SynchronousQueue容积为0,没有数据可以正常放入)。

Transferer.transfer()函数的实现是SynchronousQueue的核心,它大体上分为三个步骤:

1. 如果等待队列为空,或者队列中节点的类型和本次操作是一致的,那么将当前操作压入队列等待。比如,等待队列中是读线程等待,本次操作也是读,因此这两个读都需要等待。进入等待队列的线程可能会被挂起,它们会等待一个“匹配”操作。

2. 如果等待队列中的元素和本次操作是互补的(比如等待操作是读,而本次操作是写),那么就插入一个“完成”状态的节点,并且让他“匹配”到一个等待节点上。接着弹出这两个节点,并且使得对应的两个线程继续执行。

3. 如果线程发现等待队列的节点就是“完成”节点,那么帮助这个节点完成任务。其流程和步骤2是一致的。

步骤1的实现如下(代码参考JDK 7u60):

01 SNode h = head;
02 if (h == null || h.mode == mode) {           // 如果队列为空,或者模式相同
03   if (timed && nanos <= 0) {            // 不进行等待
04     if (h != null && h.isCancelled())
05       casHead(h, h.next);            // 处理取消行为
06     else
07       return null;
08   } else if (casHead(h, s = snode(s, e, h, mode))) {
09     SNode m = awaitFulfill(s, timed, nanos);   //等待,直到有匹配操作出现
10     if (m == s) {                // 等待被取消
11       clean(s);
12       return null;
13     }
14     if ((h = head) != null && h.next == s)
15       casHead(h, s.next);            // 帮助s的 fulfiller
16     return (mode == REQUEST) ? m.item : s.item;
17   }
18 }

上述代码中,第1行SNode表示等待队列中的节点。内部封装了当前线程、next节点、匹配节点、数据内容等信息。第2行,判断当前等待队列为空,或者队列中元素的模式与本次操作相同(比如,都是读操作,那么都必须要等待)。第8行,生成一个新的节点并置于队列头部,这个节点就代表当前线程。如果入队成功,则执行第9行awaitFulfill()函数。该函数会进行自旋等待,并最终挂起当前线程。直到一个与之对应的操作产生,将其唤醒。线程被唤醒后(表示已经读取到数据或者自己产生的数据已经被别的线程读取),在第14~15行尝试帮助对应的线程完成两个头部节点的出队操作(这仅仅是友情帮助)。并在最后,返回读取或者写入的数据(第16行)。

步骤2的实现如下:

01 } else if (!isFulfilling(h.mode)) {     //是否处于fulfill状态
02   if (h.isCancelled())          // 如果以前取消了
03     casHead(h, h.next);         // 弹出并重试
04   else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
05     for (;;) {              // 一直循环直到匹配(match)或者没有等待者了
06       SNode m = s.next;         // m 是 s的匹配者(match)
07       if (m == null) {        // 已经没有等待者了
08         casHead(s, null);       // 弹出fulfill节点
09         s = null;           // 下一次使用新的节点
10         break;            // 重新开始主循环
11       }
12       SNode mn = m.next;
13       if (m.tryMatch(s)) {
14         casHead(s, mn);       // 弹出s 和 m
15         return (mode == REQUEST) ? m.item : s.item;
16       } else              // match 失败
17         s.casNext(m, mn);       // 帮助删除节点
18     }
19   }
20 }

上述代码中,首先判断头部节点是否处于fulfill模式。如果是,则需要进入步骤3。否则,将视自己为对应的fulfill线程。第4行,生成一个SNode元素,设置为fulfill模式并将其压入队列头部。接着,设置m(原始的队列头部)为s的匹配节点(第13行),这个tryMatch()操作将会激活一个等待线程,并将m传递给那个线程。如果设置成功,则表示数据投递完成,将s和m两个节点弹出即可(第14行)。如果tryMatch()失败,则表示已经有其他线程帮我完成了操作,那么简单得删除m节点即可(第17行),因为这个节点的数据已经被投递,不需要再次处理,然后,再次跳转到第5行的循环体,进行下一个等待线程的匹配和数据投递,直到队列中没有等待线程为止。

步骤3的实现(如果线程在执行时,发现头部元素恰好是fulfill模式,它就会帮助这个fulfill节点尽快被执行):

} else {                    // 帮助一个 fulfiller
  SNode m = h.next;             // m 是 h的 match
  if (m == null)              // 没有等待者
    casHead(h, null);           // 弹出fulfill节点
  else {
    SNode mn = m.next;
    if (m.tryMatch(h))          // 尝试 match
      casHead(h, mn);           // 弹出 h 和 m
    else                  // match失败
      h.casNext(m, mn);         // 帮助删除节点
  }
}

上述代码的执行原理和步骤2是完全一致的。唯一的不同是步骤3不会返回,因为步骤3所进行的工作是帮助其他线程尽快投递它们的数据,而自己并没有完成对应的操作。因此,线程进入步骤3后,再次进入大循环体(代码中没有给出),从步骤1开始重新判断条件和投递数据。

从整个数据投递的过程中可以看到,在SynchronousQueue中,参与工作的所有线程不仅仅是竞争资源的关系。更重要的是,它们彼此之间还会互相帮助。在一个线程内部,可能会帮助其他线程完成它们的工作。这种模式可以更大程度上减少饥饿的可能,提高系统整体的并行度。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文