返回介绍

5. 什么是 AQS

发布于 2024-09-08 13:17:47 字数 5462 浏览 0 评论 0 收藏 0

Java 并发包(JUC)中提供了很多并发工具,这其中,很多我们耳熟能详的并发工具,譬如 ReentrangLock、Semaphore,它们 的实现都用到了一个共同的基类--AbstractQueuedSynchronizer,简称 AQS。AQS 是一个用来构建锁和同步器的框架

AQS 的基本实现原理

AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。

  private volatile int state;//共享变量,使用 volatile 修饰保证线程可见性

状态信息通过 procted 类型的 getState,setState,compareAndSetState 进行操作

AQS 支持两种同步方式

  1. 独占式
  2. 共享式
  • 独占式如 ReentrantLock,共享式如 Semaphore,CountDownLatch,组合式的如 ReentrantReadWriteLock。总之,AQS 为使用提供了底层支撑,如何组装实现,使用者可以自由发挥。

同步器的设计是基于模板方法模式的,一般的使用方式是这样:

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放)
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

AQS 定义的这些可重写的方法:

  1. protected boolean tryAcquire(int arg) : 独占式获取同步状态,试着获取,成功返回 true,反之为 false
    1. protected boolean tryRelease(int arg) :独占式释放同步状态,等待中的其他线程此时将有机会获取到同步状态;
  2. protected int tryAcquireShared(int arg) :共享式获取同步状态,返回值大于等于 0,代表获取成功;反之获取失败;
  3. protected boolean tryReleaseShared(int arg) :共享式释放同步状态,成功为 true,失败为 false
  4. protected boolean isHeldExclusively() : 是否在独占模式下被线程占用。

如何使用 AQS

  • 首先,我们需要去继承 AbstractQueuedSynchronizer 这个类,然后我们根据我们的需求去重写相应的方法,比如要实现一个独 占锁,那就去重写 tryAcquire,tryRelease 方法,要实现共享锁,就去重写 tryAcquireShared, tryReleaseShared;最后,在我们的组件中调用 AQS 中的模板方法就可以了,而这些模板方法是会调用到我们之前重写的那些方法的。也就是 说,我们只需要很小的工作量就可以实现自己的同步组件,重写的那些方法,仅仅是一些简单的对于共享资源 state 的获取和释放操作,至于像是获取资源失 败,线程需要阻塞之类的操作,自然是 AQS 帮我们完成了。

自定义同步器代码实现

package com.example.AQS;

import java.io.Serializable;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * 基于 AQS 实现自定义同步器<br>
 * @author qinxuewu
 * @create 18/7/14 下午 1:23
 * @since 1.0.0
 */


public class Mutex implements Serializable {

    //静态内部类 继承 AQS
    private  static class Sync extends AbstractQueuedSynchronizer{
        //是否处于占用状态
        @Override
        protected boolean isHeldExclusively(){
            return getState()==1;
        }

        //当状态为 0 的时候获取锁,CAS 操作成功,则 state 状态为 1,
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                 setExclusiveOwnerThread(Thread.currentThread());
                 return true;
             }
                return false;
        }

        //释放锁,将同步状态置为 0
        @Override
       protected boolean tryRelease(int releases) {
               if (getState() == 0) throw new IllegalMonitorStateException();
                  setExclusiveOwnerThread(null);
                   setState(0);
                   return true;
        }
    }

    //同步对象完成一系列复杂的操作,我们仅需指向它即可
    private final Sync sync = new Sync();
    //加锁操作,代理到 acquire(模板方法)上就行,acquire 会调用我们重写的 tryAcquire 方法
    public void lock() {
        sync.acquire(1);
    }
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    //释放锁,代理到 release(模板方法)上就行,release 会调用我们重写的 tryRelease 方法。
    public void unlock() {
        sync.release(1);
    }
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

package com.example.AQS;

import java.util.concurrent.CyclicBarrier;

/**
 * @author qinxuewu
 * @create 18/7/14 下午 1:44
 * @since 1.0.0
 */


public class TestMutex {
    //CyclicBarrier 是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点
    private static CyclicBarrier barrier = new CyclicBarrier(31);
    private static int a = 0;
    private static  Mutex mutex = new Mutex();


    public static void main(String []args) throws Exception {
        //说明:我们启用 30 个线程,每个线程对 i 自加 10000 次,同步正常的话,最终结果应为 300000;
        //未加锁前
        for(int i=0;i<30;i++){
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int i=0;i<10000;i++){
                        increment1();//没有同步措施的 a++;
                    }
                    try {
                        // barrier 的 await 方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
                        barrier.await();//等 30 个线程累加完毕
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            t.start();
        }
        barrier.await();
        System.out.println("加锁前,a="+a);

        //加锁后
        barrier.reset();//重置 CyclicBarrier
        a=0;
        for(int i=0;i<30;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int i=0;i<10000;i++){
                        increment2();//a++采用 Mutex 进行同步处理
                    }
                    try {
                        barrier.await();//等 30 个线程累加完毕
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        barrier.await();
        System.out.println("加锁后,a="+a);
    }
    /**
     * 没有同步措施的 a++
     * @return
     */
    public static void increment1(){
        a++;
    }
    /**
     * 使用自定义的 Mutex 进行同步处理的 a++
     */
    public static void increment2(){
        mutex.lock();
        a++;
        mutex.unlock();
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文