返回介绍

6.7 原子类的增强

发布于 2024-08-21 22:20:21 字数 8683 浏览 0 评论 0 收藏 0

在之前的章节中已经提到了原子类的使用,无锁的原子类操作使用系统的CAS指令,有着远远超越锁的性能。那是否有可能在性能上更上一层楼呢?答案是肯定的。在Java 8中引入了LongAdder类,这个类也在java.util.concurrent.atomic包下,因此,可以推测,它也是使用了CAS指令。

6.7.1 更快的原子类:LongAdder

大家对AtomicInteger的基本实现机制应该比较了解。它们都是在一个死循环内,不断尝试修改目标值,直到修改成功。如果竞争不激烈,那么修改成功的概率就很高,否则,修改失败的概率就很高。在大量修改失败时,这些原子操作就会进行多次循环尝试,因此性能就会受到影响。

那么当竞争激烈的时候,我们应该如何进一步提高系统的性能呢?一种基本方案就是可以使用热点分离,将竞争的数据进行分解,基于这个思路,大家应该可以想到一种对传统AtomicInteger等原子类的改进方法。虽然在CAS操作中没有锁,但是像减小锁粒度这种分离热点的思想依然可以使用。一种可行的方案就是仿造ConcurrentHashMap,将热点数据分离。比如,可以将AtomicInteger的内部核心数据value分离成一个数组,每个线程访问时,通过哈希等算法映射到其中一个数字进行计数,而最终的计数结果,则为这个数组的求和累加,如图6.5所示,显示了这种优化思路。其中,热点数据value被分离成多个单元cell,每个cell独自维护内部的值,当前对象的实际值由所有的cell累计合成,这样,热点就进行了有效的分离,提高了并行度。LongAdder正是使用了这种思想。

图6.5 原子类的优化思路

在实际的操作中,LongAdder并不会一开始就动用数组进行处理,而是将所有数据都先记录在一个称为base的变量中。如果在多线程条件下,大家修改base都没有冲突,那么也没有必要扩展为cell数组。但是,一旦base修改发生冲突,就会初始化cell数组,使用新的策略。如果使用cell数组更新后,发现在某一个cell上的更新依然发生冲突,那么系统就会尝试创建新的cell,或者将cell的数量加倍,以减少冲突的可能。

下面我们简单分析一下increment()方法(该方法会将LongAdder自增1)的内部实现:

01 public void increment() {
02   add(1L);
03 }
04 public void add(long x) {
05   Cell[] as; long b, v; int m; Cell a;
06   if ((as = cells) != null || !casBase(b = base, b + x)) {
07     boolean uncontended = true;
08     if (as == null || (m = as.length - 1) < 0 ||
09       (a = as[getProbe() & m]) == null ||
10       !(uncontended = a.cas(v = a.value, v + x)))
11       longAccumulate(x, null, uncontended);
12   }
13 }

它的核心是第4行的add()方法。最开始cells为null,因此数据会向base增加(第6行)。但是如果对base的操作冲突,则会进入第7行,并设置冲突标记uncontended为true。接着,如果判断cells数组不可用,或者当前线程对应的cell为null,则直接进入longAccumulate()方法。否则会尝试使用CAS方法更新对应的cell数据,如果成功,则退出,失败则进入longAccumulate()方法。

由于longAccumulate()方法比较复杂,限于篇幅,这里不再展开讨论,其大致内容是根据需要创建新的cell或者对cell数组进行扩容,以减少冲突。

下面,让我们简单地对LongAddr、原子类以及同步锁进行性能测试。测试方法是使用多个线程对同一个整数进行累加,观察使用3种不同方法时所消耗的时间。

首先,我们定义一些辅助变量:

private static final int MAX_THREADS = 3;        //线程数
private static final int TASK_COUNT = 3;         //任务数
private static final int TARGET_COUNT = 10000000;    //目标总数

private AtomicLong acount =new AtomicLong(0L);       //无锁的原子操作
private LongAdder lacount=new LongAdder();
private long count=0;

static CountDownLatch cdlsync=new CountDownLatch(TASK_COUNT);
static CountDownLatch cdlatomic=new CountDownLatch(TASK_COUNT);
static CountDownLatch cdladdr=new CountDownLatch(TASK_COUNT);

上述代码中,指定了测试线程数量、目标总数以及3个初始值为0的整型变量acount、lacount和count。它们分别表示使用AtomicLong、LongAdder和锁进行同步时的操作对象。

下面是使用同步锁时的测试代码:

01 protected synchronized long inc(){           //有锁的加法
02   return ++count;
03 }
04
05 protected synchronized long getCount(){        //有锁的操作
06   return count;
07 }
08
09
10 public class SyncThread implements Runnable{
11   protected String name;
12   protected long starttime;
13   LongAdderDemo out;
14   public SyncThread(LongAdderDemo o,long starttime){
15     out=o;
16     this.starttime=starttime;
17   }
18   @Override
19   public void run() {
20     long v=out.getCount();
21     while(v<TARGET_COUNT){            //在到达目标值前,不停循环
22       v=out.inc();
23     }
24     long endtime=System.currentTimeMillis();
25     System.out.println("SyncThread spend:"+(endtime-starttime)+"ms"+" v="+v);
26     cdlsync.countDown();
27   }
28 }
29
30 public void testSync() throws InterruptedException{
31   ExecutorService exe=Executors.newFixedThreadPool(MAX_THREADS);
32   long starttime=System.currentTimeMillis();
33   SyncThread sync=new SyncThread(this,starttime);
34   for(int i=0;i<TASK_COUNT;i++){
35     exe.submit(sync);                //提交线程开始计算
36   }
37   cdlsync.await();
38   exe.shutdown();
39 }

上述代码第10行,定义线程SyncThread,它使用加锁方式增加count的值。在第30行定义的testSync()方法中,使用线程池控制多线程进行累加操作。

使用类似的方法实现原子类累加计时统计:

01 public class AtomicThread implements Runnable{
02   protected String name;
03   protected long starttime;
04   public AtomicThread(long starttime){
05     this.starttime=starttime;
06   }
07   @Override
08   public void run() {                   //在到达目标值前,不停循环
09     long v=acount.get();
10     while(v<TARGET_COUNT){
11       v=acount.incrementAndGet();           //无锁的加法
12     }
13     long endtime=System.currentTimeMillis();
14     System.out.println("AtomicThread spend:"+(endtime-starttime)+"ms"+" v="+v);
15     cdlatomic.countDown();
16   }
17 }
18
19 public void testAtomic() throws InterruptedException{
20   ExecutorService exe=Executors.newFixedThreadPool(MAX_THREADS);
21   long starttime=System.currentTimeMillis();
22   AtomicThread atomic=new AtomicThread(starttime);
23   for(int i=0;i<TASK_COUNT;i++){
24     exe.submit(atomic);                 //提交线程开始计算
25   }
26   cdlatomic.await();
27   exe.shutdown();
28 }

同理,以下代码使用LongAddr实现类似的功能:

01 public class LongAddrThread implements Runnable{
02   protected String name;
03   protected long starttime;
04   public LongAddrThread(long starttime){
05     this.starttime=starttime;
06   }
07   @Override
08   public void run() {
09     long v=lacount.sum();
10     while(v<TARGET_COUNT){
11       lacount.increment();
12       v=lacount.sum();
13     }
14     long endtime=System.currentTimeMillis();
15     System.out.println("LongAdder spend:"+(endtime-starttime)+"ms"+" v="+v);
16     cdladdr.countDown();
17   }
18 }
19
20 public void testAtomicLong() throws InterruptedException{
21   ExecutorService exe=Executors.newFixedThreadPool(MAX_THREADS);
22   long starttime=System.currentTimeMillis();
23   LongAddrThread atomic=new LongAddrThread(starttime);
24   for(int i=0;i<TASK_COUNT;i++){
25     exe.submit(atomic);                 //提交线程开始计算
26   }
27   cdladdr.await();
28   exe.shutdown();
29 }

注意,由于LongAddr中,将单个数值分解为多个不同的段。因此,在进行累加后,上述代码中第11行的increment()函数并不能返回当前的数值。要取得当前的实际值,需要使用第12行的sum()函数重新计算。这个计算是需要有额外的成本的,但即使加上这个额外成本,LongAddr的表现还是比AtomicLong要好。

执行这些代码,就可以得到锁、原子类和LongAddr三者的性能比较数据,如下所示:

SyncThread spend:1784ms v=10000002
SyncThread spend:1784ms v=10000000
SyncThread spend:1784ms v=10000001
AtomicThread spend:695ms v=10000001
AtomicThread spend:695ms v=10000000
AtomicThread spend:695ms v=10000002
LongAdder spend:227ms v=10000002
LongAdder spend:227ms v=10000002
LongAdder spend:227ms v=10000002

可以看到,就计数性能而言,LongAdder已经超越了普通的原子操作了。其中,锁操作耗时约1784ms,普通原子操作耗时约695ms,而LongAddr仅需要227ms左右。

LongAddr的另外一个优化手段是避免了伪共享。大家可以先回顾一下第5章中有关伪共享的问题。但是,需要注意的是,LongAddr中并不是直接使用padding这种看起来比较碍眼的做法,而是引入了一种新的注释“@sun.misc.Contended”。

对于LongAddr中的每一个Cell,它的定义如下所示:

@sun.misc.Contended
static final class Cell {
  volatile long value;
  Cell(long x) { value = x; }
  final boolean cas(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
  }
  省略其他不必要的信息

可以看到,在上述代码第1行申明了Cell类为sun.misc.Contended。这将会使得Java虚拟机自动为Cell解决伪共享问题。

当然,在我们自己的代码中也可以使用sun.misc.Contended来解决伪共享问题,但是需要额外使用虚拟机参数-XX:-RestrictContended,否则,这个注释将被忽略。

大家应该还记得第5章中有关伪共享的案例吧!限于篇幅,这里不再贴出完整代码,只给出关键部分的改动。我们将VolatileLong修改如下:

@sun.misc.Contended
public final static class VolatileLong {
  public volatile long value = 0L;
}

在这里,我们去除了那些看起来不太雅观的padding,同时增加了sun.misc.Contended申明,这个就告诉虚拟机我们希望在这个类上解决伪共享问题。然后,我们就可以测试这段代码了。当然了,千万不要忘记指定虚拟机参数-XX:-RestrictContended,否则,你的这个优化将被无视。

跑一下优化后的程序,是不是比传统的方式快很多呢?

6.7.2 LongAdder的功能增强版:LongAccumulator

LongAccumulator是LongAdder的亲兄弟,它们有公共的父类Striped64。因此,LongAccumulator内部的优化方式和LongAdder是一样的。它们都将一个long型整数进行分割,存储在不同的变量中,以防止多线程竞争。两者的主要逻辑是类似的,但是LongAccumulator是LongAdder的功能扩展,对于LongAdder来说,它只是每次对给定的整数执行一次加法,而LongAccumulator则可以实现任意函数操作。

可以使用下面的构造函数创建一个LongAccumulator实例:

public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity)

第1个参数accumulatorFunction就是需要执行的二元函数(接收两个long形参数并返回long),第2个参数是初始值。

下面这个例子展示了LongAccumulator的使用,它将通过多线程访问若干个整数,并返回遇到的最大的那个数字。

01 public static void main(String[] args) throws Exception {
02   LongAccumulator accumulator = new LongAccumulator(Long::max, Long.MIN_VALUE);
03   Thread[] ts = new Thread[1000];
04
05   for (int i = 0; i < 1000; i++) {
06     ts[i] = new Thread(() -> {
07       Random random = new Random();
08       long value = random.nextLong();
09       accumulator.accumulate(value);
10     });
11     ts[i].start();
12   }
13   for (int i = 0; i < 1000; i++) {
14     ts[i].join();
15   }
16   System.out.println(accumulator.longValue());
17 }

上述代码第2行,构造了LongAccumulator实例。因为我们要过滤最大值,因此传入Long::max函数句柄。当有数据通过accumulate()方法传入LongAccumulator后(第9行),LongAccumulator会通过Long::max识别最大值并且保存在内部(很可能是cell数组内,也可能是base)。在代码第16行,通过longValue()函数对所有的cell进行Long::max操作,得到最大值。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文