返回介绍

6.6 读写锁的改进:StampedLock

发布于 2024-08-21 22:20:21 字数 8066 浏览 0 评论 0 收藏 0

StampedLock是Java 8中引入的一种新的锁机制。简单的理解,可以认为它是读写锁的一个改进版本。读写锁虽然分离了读和写的功能,使得读与读之间可以完全并发。但是,读和写之间依然是冲突的。读锁会完全阻塞写锁,它使用的依然是悲观的锁策略,如果有大量的读线程,它也有可能引起写线程的“饥饿”。

而StampedLock则提供了一种乐观的读策略。这种乐观的锁非常类似无锁的操作,使得乐观锁完全不会阻塞写线程。

6.6.1 StampedLock使用示例

StampedLock的使用并不困难,下面是StampedLock的使用示例:

01 public class Point {
02   private double x, y;
03   private final StampedLock sl = new StampedLock();
04
05   void move(double deltaX, double deltaY) {      // 这是一个排它锁
06     long stamp = sl.writeLock();
07     try {
08       x += deltaX;
09       y += deltaY;
10     } finally {
11       sl.unlockWrite(stamp);
12     }
13   }
14
15   double distanceFromOrigin() {           // 只读方法
16     long stamp = sl.tryOptimisticRead();
17     double currentX = x, currentY = y;
18     if (!sl.validate(stamp)) {
19       stamp = sl.readLock();
20       try {
21         currentX = x;
22         currentY = y;
23       } finally {
24         sl.unlockRead(stamp);
25       }
26     }
27     return Math.sqrt(currentX * currentX + currentY * currentY);
28   }
29 }

上述代码出自JDK的官方文档。它定义了一个点Point类,内部有两个元素x和y,表示点的坐标。第3行,定义了StampedLock锁。第15行定义的distanceFromOrigin()方法是一个只读方法,它只会读取Point的x和y坐标。在读取时,首先使用了StampedLock.tryOptimisticRead()方法。这个方法表示试图尝试一次乐观读。它会返回一个类似于时间戳的邮戳整数stamp。这个stamp就可以作为这一次锁获取的凭证。

接着,在第17行,读取x和y的值。当然,这时我们并不确定这个x和y是否是一致的(在读取x的时候,可能其他线程改写了y的值,使得currentX和currentY处于不一致的状态),因此,我们必须在第18行,使用validate()方法,判断这个stamp是否在读过程发生期间被修改过。如果stamp没有被修改过,则认为这次读取是有效的,因此就可以跳转到第27行,进行数据处理。反之,如果stamp是不可用的,则意味着在读取的过程中,可能被其他线程改写了数据,因此,有可能出现了脏读。如果出现这种情况,我们可以像处理CAS操作那样在一个死循环中一直使用乐观读,直到成功为止。

也可以升级锁的级别。在本例中,我们升级乐观锁的级别,将乐观锁变为悲观锁。在第19行,当判断乐观读失败后,使用readLock()获得悲观的读锁,并进一步读取数据。如果当前对象正在被修改,则读锁的申请可能导致线程挂起。

写入的情况可以参考第5行定义的move()函数。使用writeLock()函数可以申请写锁。这里的含义和读写锁是类似的。

在退出临界区时,不要忘记释放写锁(第11行)或者读锁(第24行)。

可以看到,StampedLock通过引入乐观读来增加系统的并行度。

6.6.2 StampedLock的小陷阱

StampedLock内部实现时,使用类似于CAS操作的死循环反复尝试的策略。在它挂起线程时,使用的是Unsafe.park()函数,而park()函数在遇到线程中断时,会直接返回(注意,不同于Thread.sleep(),它不会抛出异常)。而在StampedLock的死循环逻辑中,没有处理有关中断的逻辑。因此,这就会导致阻塞在park()上的线程被中断后,会再次进入循环。而当退出条件得不到满足时,就会发生疯狂占用CPU的情况。这一点值得我们注意,下面演示了这个问题:

01 public class StampedLockCPUDemo {
02   static Thread[] holdCpuThreads = new Thread[3];
03   static final StampedLock lock = new StampedLock();
04   public static void main(String[] args) throws InterruptedException {
05     new Thread() {
06       public void run() {
07         long readLong = lock.writeLock();
08         LockSupport.parkNanos(600000000000L);
09         lock.unlockWrite(readLong);
10       }
11     }.start();
12     Thread.sleep(100);
13     for (int i = 0; i < 3; ++i) {
14       holdCpuThreads[i] = new Thread(new HoldCPUReadThread());
15       holdCpuThreads[i].start();
16     }
17     Thread.sleep(10000);
18     //线程中断后,会占用CPU
19     for (int i = 0; i < 3; ++i) {
20       holdCpuThreads[i].interrupt();
21     }
22   }
23
24   private static class HoldCPUReadThread implements Runnable {
25     public void run() {
26       long lockr = lock.readLock();
27       System.out.println(Thread.currentThread().getName()+ " 获得读锁");
28       lock.unlockRead(lockr);
29     }
30   }
31 }

在上述代码中,首先开启线程占用写锁(第7行),注意,为了演示效果,这里使写线程不释放锁而一直等待。接着,开启3个读线程,让它们请求读锁。此时,由于写锁的存在,所有读线程都会被最终挂起。

下面是其中一个读线程在挂起时的信息:

"Thread-2" #10 prio=5 os_prio=0 tid=0x14b1d800 nid=0xafc waiting on condition [0x153ef000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x046b54c8> (a java.util.concurrent.locks.StampedLock)
    at java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1215)
    at java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428)
    at geym.conc.ch6.stamped.StampedLockCPUDemo$HoldCPUReadThread.run
(StampedLockCPUDemo.java:35)
    at java.lang.Thread.run(Thread.java:745)

可以看到,这个线程因为park()的操作而进入了等待状态,这种情况是正常的。

而在10秒以后(代码第17行执行了10秒等待),系统中断了这3个读线程,之后,你就会发现,你的CPU占用率极有可能会飙升。这是因为中断导致park()函数返回,使线程再次进入运行状态,下面是同一个线程在中断后的信息:

"Thread-2" #10 prio=5 os_prio=0 tid=0x14b1d800 nid=0xafc runnable [0x153ef000]
   java.lang.Thread.State: RUNNABLE
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x046b54c8> (a java.util.concurrent.locks.StampedLock)
    at java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1215)
    at java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428)
    at geym.conc.ch6.stamped.StampedLockCPUDemo$HoldCPUReadThread.run
(StampedLockCPUDemo.java:35)
    at java.lang.Thread.run(Thread.java:745)

此时,这个线程的状态是RUNNABLE,这是我们不愿意看到的。它会一直存在并耗尽CPU资源,直到自己抢占到了锁。

6.6.3 有关StampedLock的实现思想

StampedLock的内部实现是基于CLH锁的。CLH锁是一种自旋锁,它保证没有饥饿发生,并且可以保证FIFO(First-In-First-Out)的服务顺序。

CLH锁的基本思想如下:锁维护一个等待线程队列,所有申请锁,但是没有成功的线程都记录在这个队列中。每一个节点(一个节点代表一个线程),保存一个标记位(locked),用于判断当前线程是否已经释放锁。

当一个线程试图获得锁时,取得当前等待队列的尾部节点作为其前序节点,并使用类似如下代码判断前序节点是否已经成功释放锁:

while (pred.locked) {
}

只要前序节点(pred)没有释放锁,则表示当前线程还不能继续执行,因此会自旋等待。

反之,如果前序线程已经释放锁,则当前线程可以继续执行。

释放锁时,也遵循这个逻辑,线程会将自身节点的locked位置标记为false,那么后续等待的线程就能继续执行了。

如图6.4所示,显示了CLH队列锁的基本思想。

图6-4 CLH队列锁

StampedLock正是基于这种思想,但是实现上更为复杂。

在StampedLock内部,会维护一个等待链表队列:

01 /** Wait nodes */
02 static final class WNode {
03   volatile WNode prev;
04   volatile WNode next;
05   volatile WNode cowait;   // 读节点链表
06   volatile Thread thread;  // 当可能被暂停时非空
07   volatile int status;     // 0, WAITING, or CANCELLED
08   final int mode;      // RMODE or WMODE
09   WNode(int m, WNode p) { mode = m; prev = p; }
10 }
11
12 /** CLH 队列头部 */
13 private transient volatile WNode whead;
14 /** CLH 队列尾部 */
15 private transient volatile WNode wtail;

上述代码中,WNode为链表的基本元素,每一个WNode表示一个等待线程。字段whead和wtail分别指向等待链表的头部和尾部。

另外一个重要的字段为state:

private transient volatile long state;

字段state表示当前锁的状态。它是一个long型,有64位,其中,倒数第8位表示写锁状态,如果该位为1,表示当前由写锁占用。

对于一次乐观读的操作,它会执行如下操作:

public long tryOptimisticRead() {
  long s;
  return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

一次成功的乐观读必须保证当前锁没有写锁占用。其中WBIT用来获取写锁状态位,值为0x80。如果成功,则返回当前state的值(末尾7位清零,末尾7位表示当前正在读取的线程数量)。

如果在乐观读后,有线程申请了写锁,那么state的状态就会改变:

1 public long writeLock() {
2   long s, next;  // bypass acquireWrite in fully unlocked case only
3   return ((((s = state) & ABITS) == 0L &&
4        U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
5       next : acquireWrite(false, 0L));
6 }

上述代码中第4行,设置写锁位为1(通过加上WBIT(0x80))。这样,就会改变state的取值。那么在乐观锁确认(validate)时,就会发现这个改动,而导致乐观锁失效。

public boolean validate(long stamp) {
  U.loadFence();
  return (stamp & SBITS) == (state & SBITS);
}

上述validate()函数比较当前stamp和发生乐观锁时取得的stamp,如果不一致,则宣告乐观锁失败。

乐观锁失败后,则可以提升锁级别,使用悲观读锁。

1 public long readLock() {
2   long s = state, next;  // bypass acquireRead on common uncontended case
3   return ((whead == wtail && (s & ABITS) < RFULL &&
4        U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
5       next : acquireRead(false, 0L));
6 }

悲观读会尝试设置state状态(第4行),它会将state加1(前提是读线程数量没有溢出,对于读线程数量溢出的情况,会使用辅助的readerOverflow进行统计,我们在这里不做过于烦琐的讨论),用于统计读线程的数量。如果失败,则进入acquireRead()二次尝试锁获取。

在acquireRead()中,线程会在不同条件下进行若干次自旋,试图通过CAS操作获得锁。如果自旋宣告失败,则会启用CLH队列,将自己加到队列中。之后再进行自旋,如果发现自己成功获得了读锁,则会进一步把自己cowait队列中的读线程全部激活(使用Unsafe.unpark()方法)。如果最终依然无法成功获得读锁,则会使用Unsafe.park()方法挂起当前线程。

方法acquireWrite()和acquireRead()也非常类似,也是通过自旋尝试、加入等待队列、直至最终Unsafe.park()挂起线程的逻辑进行的。释放锁时与加锁动作相反,以unlockWrite()为例:

1 public void unlockWrite(long stamp) {
2   WNode h;
3   if (state != stamp || (stamp & WBIT) == 0L)
4     throw new IllegalMonitorStateException();
5   state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
6   if ((h = whead) != null && h.status != 0)
7     release(h);
8 }

上述代码第5行,将写标记位清零,如果state发生溢出,则退回到初始值。

接着,如果等待队列不为空,则从等待队列中激活一个线程(绝大部分情况下是第1个等待线程)继续执行(第7行)。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文