Welcome everyone

AQS 真的难顶!

java 汪明鑫 879浏览 2评论

谈AQS 首先需要聊到多线程并发

多线程并发操作内存数据,会出现线程安全的问题

因此引入了锁的概念,只允许一个线程争抢到锁,进行后续操作,其他线程阻塞

 

 

 

就类比高速公路收费站,车辆都是有序排队的通过收费站,一辆车过去缴费时,后面的车是排队等待的

再比如在火车上蹲坑,一个进去占住了厕所,后面的人等待

 

 

因此我们需要考虑的是需要一个互斥变量来记录锁的状态

还需要一个队列来保存没有抢到锁的线程

另外锁一旦抢占,后面的线程如果不断的去轮询尝试获取锁,将会耗费大量cpu资源

因此没有抢到锁的线程阻塞并放在一个容器里记录下来

 

占用锁的线程操作完后释放锁并唤醒队列中线程

这大概是AQS的核心思想

 

 

 

java.util.concurrent.locks.AbstractQueuedSynchronizer

AQS  同步发生器  是JUC同步组件的基础

内置FIFO  链式双向队列,完成线程争夺资源的管理工作
AQS继承体系
FairSync (公平锁  排队)  NonfairSync(非公平锁,插队)

 

 

写JUC的真是大神。。。牛逼

 

AQS核心静态内部类 Node 组成双向队列

static final class Node {
        //共享
        static final Node SHARED = new Node();
        //独占
        static final Node EXCLUSIVE = null;

        //因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态
        static final int CANCELLED =  1;
        //后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
        static final int SIGNAL    = -1;
        //节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
        static final int CONDITION = -2;
        //表示下一次共享式同步状态获取,将会无条件地传播下去
        static final int PROPAGATE = -3;

        
        volatile int waitStatus;

     
        volatile Node prev;

        
        volatile Node next;

        
        volatile Thread thread;

        
        Node nextWaiter;

waitStatus 等待状态  用来控制线程的阻塞和唤醒

prev 前一个节点

next 后一个节点

thread 占有该节点的线程

 

简单来说,AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。

 

锁分为独占式、共享式
共享式与独占式的最主要区别在于同一时刻独占式只能有一个线程获取同步状态,而共享式在同一时刻可以有多个线程获取同步状态。例如读操作可以有多个线程同时进行,而写操作同一时刻只能有一个线程进行写操作,其他操作都会被阻塞。
Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)

 

AQS 通过内置的 FIFO 同步队列来完成资源获取线程的排队工作

  • 如果当前线程获取同步状态失败(锁)时,AQS 则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程
  • 当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

 

【入队列与出队列】
AQS通过死循环往队列尾部添加节点
首节点的线程释放同步状态后,将会唤醒它的后继节点(next),
而后继节点将会在获取同步状态成功时将自己设置为首节点,
head执行该节点并断开原首节点的next和当前节点的prev即可

 

 

AQS中其他关键字段

   //指向双向队列的头节点
    private transient volatile Node head;

    //指向双向队列的尾节点
    private transient volatile Node tail;

    //同步状态
    private volatile int state;

 

state 来表示同步状态【锁的状态】

  • 当 state > 0 时,表示已经获取了锁。 ( >1 表示重入锁)
  • 当 state = 0 时,表示释放了锁。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作

 

 

通过上面的我们了解AQS有一个互斥变量state,加一个内部的双向队列,还有线程的阻塞
AQS里都是抽象的逻辑

然后我们通过java.util.concurrent.locks.ReentrantLock 为入口,进一步探究一波AQS

ReentrantLock 里的lock方法其实就是调用NonfairSync / FairSync 的lock方法

 

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            //CAS 实现对state的修改  
            if (compareAndSetState(0, 1))
                //设置锁的拥有者
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

【锁的获取成功与失败就是通过state,期望是0,即无锁状态,通过CAS来修改成1,则表明获取锁】

我们再看一下 setExclusiveOwnerThread

我们看到了AQS的父类 java.util.concurrent.locks.AbstractOwnableSynchronizer

记录了获得锁的线程  owner

如果争抢锁失败,进入acquire

public final void acquire(int arg) {
        //尝试获取锁
        if (!tryAcquire(arg) &&
           
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            //产生一个中断
            selfInterrupt();

    }

tryAcquire 把CAS更新state的逻辑又走了一遍

addWaiter 如果tryAcquire失败,则将当前线程加入到CLH同步队列尾部,并标记为独占模式

acquireQueued  当前线程进入同步队列后,就会自旋,每个节点都会观察,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。

 

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;

           //自旋
            for (;;) {

                //当前线程的前驱节点
                final Node p = node.predecessor();

                // 前驱节点是头节点  自己就有资格去尝试获取锁  然后获取锁成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }

               //获取失败  线程等待
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 

acquire 的流程如下

 

if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())  interrupted = true;

看这一段代码

主要用于检查状态,看看自己是否真的可以去休息了

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        
        //前驱节点状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //前驱节点已经为SIGNAL,释放会通知自己,自己可以安心去休息了
            return true;
        
        //前驱节点取消  说明需要重置当前节点的前驱节点 (干掉那些无效节点 状态大于0的)
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //cas 设置前驱节点为 SIGNAL 
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

如果前驱节点为SIGNAL  (后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行)

根据shouldParkAfterFailedAcquire 判断当前线程可以休息了

进入 parkAndCheckInterrupt

private final boolean parkAndCheckInterrupt() {
        //休息
        LockSupport.park(this);  
        return Thread.interrupted();
 }

parkAndCheckInterrupt() 方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。其内部则是调用LockSupport工具类的park()方法来阻塞该方法。

 

那阻塞的线程如何被唤醒?

当有线程释放同步状态后,则需要唤醒该线程的后继节点

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
private void unparkSuccessor(Node node) {
        //当前节点状态
        int ws = node.waitStatus;
        //当前状态 < 0 则设置为 0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //当前节点的后继节点
        Node s = node.next;
        
        //后继节点为null或者其状态 > 0 (超时或者被中断了)
        if (s == null || s.waitStatus > 0) {
            s = null;
            
            //从tail节点像前遍历,找到不为空,且waitStatus <= 0 的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }

        //唤醒后继节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }

 

可能会存在当前线程的后继节点为null,超时、被中断的情况,如果遇到这种情况了,则需要跳过该节点,但是为何是从tail尾节点开始,而不是从node.next开始呢?原因在于node.next仍然可能会存在null或者取消了,所以采用tail回溯办法找第一个可用的线程。最后调用LockSupport的unpark(Thread thread)方法唤醒该线程。

 

park 和 unpark 是两个native方法 通过Unsafe这个后门调用的,我们直接写代码没法直接调用

 

 AQS小结一下:

1,维护一个互斥变量state 【通过cas更新state

2,队列存放没有争抢到锁的线程,自旋,由于消耗cpu,需要被挂起 LockSupport.park

3,释放锁后,通知第一个有效的后继节点,解阻塞,LockSupport.unpark

 

 

画些图帮助我们更好的理解上文

 

 

 

 

 

 

参考文章:
http://ifeve.com/java%e5%b9%b6%e5%8f%91%e4%b9%8baqs%e8%af%a6%e8%a7%a3/
http://cmsblogs.com/?p=2197

 

如有错误,敬请指正

 

 

转载请注明:汪明鑫的个人博客 » AQS 真的难顶!

喜欢 (0)

说点什么

2 评论 在 "AQS 真的难顶!"

提醒
avatar
排序:   最新 | 最旧 | 得票最多
The Shy
游客

鑫爷好强啊,膜拜鑫爷

wpDiscuz