AQS的概述 体系架构 源码解读(非公平锁) 源码总结


JUC并发编程目录

1.AQS是什么?

1.是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的队列的变种来完成资源获取线程的排队工作,将每条将要去抢占资源的线程封装成一个Node节点来实现锁的分配,有一个int类变量表示持有锁的状态(private volatile int state),通过CAS完成对status值的修改(0表示没有,1表示阻塞)

2.AQS为什么是JUC内容中最重要的基石

锁:使用者直接调用API,隐藏了实现细节

同步器:面向锁的是实现者

Java并发大神DqugLee,提出统一规范并简化了锁的实现,将其抽象出来。屏蔽了同步状态管理、同步队列的管理和维护、阻塞线程排队和通知、唤醒机制等,是一切锁和同步组件实现的。

加锁会导致阻塞。有阻塞就需要排队,实现排队必然需要队列,排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS同步队列的抽象表现。它将要请求共享资源的线程及自身的等待状态封装成队列的结点对象(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果。

3.AQS同步队列的基本结构

2.AQS内部体系结构

内部类Node(Node类在AQS类内部)
Node的int变量
Node的等待状态waitState成员变量:volatile int waitStatus;
等候区其它顾客(其它线程)的等待状态。队列中每个排队的个体就是一个Node

Node类的详细方法

AQS总结

有阻塞就需要排队,实现排队必然需要队列
state变量+双端队列

AQS的int变量
AQS的同步状态State成员变量 private volatile int state;

通过自旋等待
state变量判断是否阻塞
从尾部入队,从头部出队

2..从ReentrantLock开始解读AQS

ReentrantLock与AQS的关系

ReentrantLock的抽象类Sync继承了AQS的方法,Sync的两个子类非公平锁,公平锁实现了Sync的抽象方法。

公平锁

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        final void lock() {
            acquire(1);
        }
    }

非公平锁

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

非公平锁比公平锁多加了一个if判断,每次都尝试去获得锁

if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());

compareAndSetState是调用AQS类的compareAndSetState,底层是调用Unsafe类的CAS方法

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

如果当前锁没有被占用,即state还是0,那就把state设为1,并且调用setExclusiveOwnerThread方法,该方法最终调用了AOS的setExclusiveOwnerThread方法,设置该线程拥有当前独占访问权。

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

如果锁已经被占用了,那就进入acquire方法了。此时公平锁与非公平锁代码相同。acquire方法在AQS中,主要有三个流向(tryAcquire,addWaiter,acquireQueued),不成功执行cancelAcquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
2.1 acquire()

以非公平锁作为案例突破,acquire方法三步骤:

  • 尝试加锁;
  • 加锁失败,线程入队列;
  • 线程入队列后,进入阻塞状态。
2.2 tryAcquire()

tryAcquire是AQS类的方法,有四个实现类重写了该方法。

FairSync

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

NonFairSync

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

可以明显看出公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors(),它是公平锁加锁时判断等待队列中是否存在有效节点的方法。

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

查询是否有其他线程已经等待了更长时间来获取锁

hasQueuedPredecessors()中判断了是否需要排队,导致公平锁和非公平锁的差异如下:

公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;

非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程苏醒后,不一定就是排头的这个线程获得锁,它还是需要参加竞争锁(存在线程竞争的情况下),后来的线程可能不讲武德插队夺锁了。

队列里面先进先出(公平),队列外面不讲武德(非公平)

hasQueuedPredecessors()方法返回true,那么tryAcquire方法就会返回false,然后才会有后面的addWaiter和acquireQueued

主线是非公平锁,公平锁只是连带着讲一下,现在看回非公平锁。非公平锁的tryAcquire调用的是nonfairTryAcquire方法

nonfairTryAcquire()

nonfairTryAcquire中主要做了两个判断:

  • state是否为0?非公平锁刚进来的时候已经判断过一次了,这里进行再次判断是为了防止这个过程中锁已经被释放了。如果这里执行成功,那么后面的两大流程就不用走了,成功获取到锁。
  • 当前线程是否是持有锁的线程?这里主要是为了重入锁做判断的。ReentrantLock是重入锁。c + acquires就代表重入的次数,acquires是1,c是当前锁重入的次数。
  • 返回false获取锁失败才会执行下面两大流程(addWaiter,acquireQueued)。
2.2.2 addWaiter()
private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);
     // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;
     if (pred != null) {
         node.prev = pred;
         if (compareAndSetTail(pred, node)) {
             pred.next = node;
             return node;
         }
     }
     enq(node);
     return node;
 }

Node.EXCLUSIVE表示独占,Node.SHARED表示共享。非公平锁和公平锁都是独占锁。
以当前线程和mode(Node.EXCLUSIVE)创建一个Node
定义Node节点pred,并把尾结点赋值给pred

线程B进来时队列还未初始化,此时tail是null,即走enq(node)方法,这个只有尾结点等于null才会走该方法.

线程C进来时pred是B节点,不为null,然后把C节点的前驱节点设为B,并把C设为尾结点,同时把B的后继节点设为C.

//addWaiter方法返回的是以当前线程构造的节点
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

使用for (;;)开启死循环,第一次循环时,tail还是null,新建一个空节点并通过cas把它设为虚拟头结点,尾结点也指向它。退出第一次循环。

第二次循环时,tail指向了刚才创建的新节点记为n(new Node()),不等于null了,则将以B线程基础构造的node节点的前驱指针指向t,此时t等于tail,tail又指向了n,即将node挂在了新的n后面,node的前驱节点是n,然后通过CAS操作将node作为尾结点,并将n的后继节点设为node,然后返回头结点n。

acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

假如再抢抢失败就会进入shouldParkAfterFailedAcquire和 parkAndChecklnterrupt方法中

shouldParkAfterFailedAcquire

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)          
        return true;
    if (ws > 0) {
    
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

如果前驱节点的 waitStatus是 SIGNAL状态(-1),即 shouldParkAfterFailedAcquire方法会返回 true。程序会继续向下执行parkAndCheckInterrupt方法,用于将当前线程挂起.

parkAndCheckInterrupt

private final boolean parkAndCheckInterrupt() {
    //线程挂起,程序不会继续向下执行
    LockSupport.park(this);
    //根据park方法API描述,程序在下述三种情视会继续向下执行
    //1.被unpark
    //2.被中断(interrupt)
    //3.其他不合逻每的返回才会继续向下执行
    
    //因上述三种情泥程序执行至此,返回当前线程的中断状态,并清空中断状态     
    //如果由于被中断,该方法会返回true
    return Thread.interrupted();
}
2.2.3 acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

定义两个标志位failed 和interrupted,当前是B线程,开启第一次循环,然后获取B节点的前驱节点n

predecessor()

final Node predecessor() throws NullPointerException {
   Node p = prev;
   if (p == null)
       throw new NullPointerException();
   else
       return p;
}

n是头结点但是tryAcquire尝试获取锁一般会失败,然后执行shouldParkAfterFailedAcquire方法
头结点n的waitStatus默认值为0,通过cas方法将头结点的waitStatus设置为1,并返回false,不会执行parkAndCheckInterrupt

开启第二次循环,头节点还是n,tryAcquire尝试获取锁依旧失败,然后执行shouldParkAfterFailedAcquire方法,此时头结点n的waitStatus是1,返回true,继续执行parkAndCheckInterrupt方法,B线程被挂起。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 该节点已设置状态,要求释放信号,以便安全停车。
        return true;
    if (ws > 0) {
        // 前置任务已取消。跳过前置任务并指示重试
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // waitStatus必须为0或PROPAGATE。表明我们需要信号,但不要停车。来电者需要重试以确保在停车前无法获取
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

然后C线程进来,它的前驱节点为B,执行两次shouldParkAfterFailedAcquire方法终于返回true,然后继续执行parkAndCheckInterrupt方法,C线程被挂起。

2.3 unlock()方法源码

unlock–> sync.release(1) –> tryRelease(arg) –> unparkSuccessor

unlock()方法分析

public void unlock() {
    sync.release(1);
}
2.3.1 release()方法

unlock底层调用的release方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
2.3.2 tryRelease()方法

tryRelease是AQS类的方法,实现类方法主要有三个。
tryRelease方法若抛出异常,则说明线程释放锁失败,返回false
tryRelease方法返回true,则继续执行。定义节点h,令h等于头结点,如果h不等于null且h的waitStatus不等于0,说明有线程在等待唤醒。执行

释放锁时公平锁和非公平锁共用一个方法

protected final boolean tryRelease(int releases) {
          int c = getState() - releases;
          if (Thread.currentThread() != getExclusiveOwnerThread())
              throw new IllegalMonitorStateException();
          boolean free = false;
          if (c == 0) {
              free = true;
              setExclusiveOwnerThread(null);
          }
          setState(c);
          return free;
      }

因为当前锁被持有了,所以state一定是1。千万不要把state和waitStatus搞混了。
c=1-1=0。如果当前线程和持有锁的线程不同,则抛出异常。
定义锁空闲等于false,如果c等于0,将锁空闲改为true,将当前持有锁的线程设置为null,并把state置为0,返回true。释放锁成功

2.3.3 unparkSuccessor()方法
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0)
            s = t;
}
if (s != null)
    LockSupport.unpark(s.thread);
}

拿到头结点的waitStatus,因为此时等待队列有线程在等待,所以头结点的waitStatus一定不等于0,用cas将头结点的waitStatus置为0。然后获取头结点的后继节点s,如果等于null或者s的waitStatus(s已取消获取锁的等待)。
如果不为null,唤醒后继节点B。

唤醒节点B,镜头切回到acquireQueued的parkAndCheckInterrupt方法中,之前线程B就是在这里被挂起的,哪里跌到,就在哪里爬起。
返回return Thread.interrupted();因为线程B从未中断过,所以返回false。

复习一下interrupted()方法和isInterrupted()的区别: interrupted()执行后具有清除状态标志值为false的功能,但isInterrupted()不会。
acquireQueued方法继续讲解,因为返回false,所以继续循环,获取B节点前驱节点p,p就是头结点,然后执行tryAcquire尝试获取锁,因为
刚才已经把锁释放了,所以state等于0,tryAcquire能够获取锁成功。执行setHead方法。将头结点p的后继节点置为null,此时已经没有节点指向p,p已经成了无根之木,垃圾对象,随时等待命运的铁扫帚将自己扫入历史的垃圾堆。将failed 置为false,则不用执行finally中的cancelAcquire方法了。然后acquireQueued方法返回false,也不用执行acquire方法中的selfInterrupt方法了。

final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
     boolean interrupted = false;
     for (;;) {
         final Node p = node.predecessor();
         if (p == head && tryAcquire(arg)) {
             setHead(node);
             p.next = null; // help GC
             failed = false;
             return interrupted;
         }
         if (shouldParkAfterFailedAcquire(p, node) &&
             parkAndCheckInterrupt())
             interrupted = true;
     }
 } finally {
     if (failed)
         cancelAcquire(node);
 }
}

将B节点设为头结点,将B节点持有线程改为null,把B节点的前驱节点置为null

private void setHead(Node node) {
     head = node;
     node.thread = null;
     node.prev = null;
 }
2.3.4 cancelAcquire(node)

acquireQueued方法发生异常 撤销请求

private void cancelAcquire(Node node) {
   
    if (node == null)
        return;

    node.thread = null;
   
    Node pred = node.prev;
    //一直循环找,直到找到前面的状态大于0,即非取消状态。
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;

    node.waitStatus = Node.CANCELLED;

    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
      
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

  目录