基础介绍
AQS代码概览
Node类解析
通过ReentrantLock窥探AQS独占锁
最简单的实例
重入锁实例
锁竞争实例
结尾
基础介绍
JUC中的许多并发类都继承了AbstractQueuedSynchronizer(AQS),如CountDownLatch、ReentrantLock、ThreadLocalExcutor等。
它主要实现了对同步状态的管理以及对阻塞线程进行排队、等待通知,就拿ReetrantLock为例,它有以下的功能
获取锁
争抢这把锁却没有成功的这些线程要被存放到一个集合中
释放锁,集合中的线程会被唤醒重现来争抢锁
使用锁来创建Condition对象
.....
上述这写功能都是依赖于AQS实现的,因为ReetrantLock是只能被一个线程获取,所以它是一把独占锁,而像ReentrantReadWriteLock中的ReadLock是可以被多个线程共享的,也就是说它是一把共享锁。AQS中既提供了独占锁的一些底层的实现,也提供了共享锁的实现。
所以AQS中的内容主要可以分为四部分
CLH队列:存储等待线程,其主要是通过双向链表的方式实现,CLH是它的发明者的三个大佬的名字的首字母。
独占锁
共享锁
Condition实现
AQS代码概览
AbstractQueuedSynchronizer这个类当中包含两个内部类,其中ConditionObject就是Condition功能的主要实现,一般创建Condition的方式就是Lock.newCondition(),而我们通过查看ReentrantLock源码可以发现,其实际创建的Condition就是一个ConditionObject实例。
Node是等待线程的载体,也就是等待线程所在的双向链表上的节点。
AbstractQueuedSynchronizer中有大量的方法,其中类似于tryAcquire和tryAcquireShared就是"类似方法"在独占锁和共享锁中的不同实现。
下图中是AbstractQueuedSynchronizer中的一些成员变量,其中head和tail都是一个Node变量分别用于表示队头和队尾节点,state表示同步状态,stateOffset表示state变量相对于java对象的偏移量,也就是相对于AbstractQueuedSynchronizer.class的偏移量(class也是一个对象,java中万物皆对象),主要是用于后面使用CAS的方式给相应变量设置值、修改值等操作,headOffset、tailOffset同理,waitStatusOffset和nextOffset是相对于Node.class的偏移量。另外在AbstractQueuedSynchronizer 的父类AbstractOwnableSynchronizer中还有一个重要的变量exclusiveOwnerThread表示独占模式下拥有当前锁的线程。
Node类解析
static final class Node { //用于标记共享模式 static final Node SHARED = new Node(); //用于标记独占模式 static final Node EXCLUSIVE = null; // waitStatus 为这个值的时候 表示线程已经被取消 static final int CANCELLED = 1; // waitStatus 为这个值的时候 表示后继线程需要取消阻塞 static final int SIGNAL = -1; // waitStatus 为这个值的时候 表示线程处于Condition下的等待状态 static final int CONDITION = -2; //waitStatus 为这个值的时候 表示下个acquireShared操作将被允许 static final int PROPAGATE = -3; /** * 这个字段可能有以下状态 * SIGNAL: 该节点的后继节点被(或即将)阻塞(通过停放),因此当前节点在 * 释放或取消时必须解除其后继节点的停放。为了避免竞争,获取方法 * 必须首先表明它们需要一个信号,然后重试原子获取,然后在失败时 * 阻塞。 * CANCELLED: 节点因超时或中断而被取消 * CONDITION: 这个节点被用于condition队列,在装个状态下这个节点不会被用于 * 同步队列。 * PROPAGATE: 这个节点是被共享的 * 0: 以上都不是 * * 这个字段的初始值为0,且是通过cas的方式对他进行安全写操作 */ volatile int waitStatus; //前置节点 volatile Node prev; //继承节点 volatile Node next; //该节点拥有的线程 volatile Thread thread; //可能有两种作用 //1.独占模式下的condition条件下的等待节点 //2.用于判断是共享模式 Node nextWaiter; //是否是共享模式 final boolean isShared() { return nextWaiter == SHARED; } //返回前置节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; }}
通过ReentrantLock窥探AQS独占锁
下面我们通过几个实例来探究AQS中的一些方法的实现以及在ReentrantLock中起到的作用。
最简单的实例
下面我们就通过一个简单的lock & unLock实例来入手
通过断点进入,lock的具体实现在ReentrantLock的NonfairSync内部类中,这是由于我们为lock对象设置的非公平锁。
然后我们会进入AQS中的compareAndSetState方法,它主要是通过cas的方式判断state是否为0,是-就将其更改为1并返回true、否-不修改直接返回false,若为0就意味着这个锁现在是没有被任何线程占有的,然后我们将它的状态更改为1表示将其占用。
返回是后,将AQS中的独占线程的字段赋值为当前线程,然后就加锁成功了。
然后我们进入ReentrantLock的unlock方法,这个方法的主要实现就是在AQS的release方法中
然后进入tryRelease方法,会获取当前锁的状态,然后用c表示要被更改成的目标状态,校验后,将锁的独占线程置为空并修改其状态字段state为目标状态,到这里锁已经解除占用了。
tryRelease返回为true后会判断AQS中存不存在等待节点,如果存在则就将其唤醒(后面会看这里的源码)
重入锁实例
我们使用同一把ReentrantLock进行两次lock操作,由于第一次和上面的简单实例流程是一样的所以我们只关注第二次lock和unLock
此时,由于已经lock过一次,即state=1,所以compareAndSetState(0,1)不会赋值成功,所以会进入到acquire方法,进而会首先进入到tryAcquire方法
我们会在TryAcquire中再次判断锁的状态(因为在此过程中上一次lock可能被释放),然后由于当前线程就是这把锁的独占线程,所以我们是可重入这把锁的,最后将state的值改为2代表这把锁被当前线程重入了两次。
由于tryAcquire(1)返回的是true所以!tryAcquire(1)为false导致程序不会进入acquire方法中的后续执行流程,到此,意味着第二次lock已经完成。
和简单实例中的unlock一样,程序会先进入release方法然后进入tryRelease方法,再这里面因为更改后的state为1所以不会讲当前锁的独占线程设置为null(会在最后一次unlock中设置)
锁竞争实例
锁竞争就会涉及到等待队列以及等待节点的阻塞与唤醒,所以它的一系列操作的复杂度相对于上面的例子要更高一些。使用以下实例来体验一下多线程竞争锁的过程。
t1会首先获取到lock,这过程与无竞争锁的获取是一样的,主要的不同点在于t2获取锁和t1释放锁的过程。
在idea中可以在下入这个位置切换调试的线程
在t1线程获取到锁之后,我们切换到t2线程,发现idea此时已经给我们标注了lock这把锁已经被t1占用了。
然后会进入到acquire方法,由于此时t1已经占用了锁,所以state ≠ 0且拥有锁的当前线程为t1≠t2所以 tryAcquire返回的是false,因此程序会进入addWaiter方法。
在这个方法中,会首先将t2线程封装到一个Node对象当中,然后通过tail节点判断队列是否被初始化了,由于CLH队列此时并没有元素存在,所以会进入到enq方法进行队列的首次初始化。
在enq中会初始化这个队列会初始化队列,然后将传入的node插入到队尾,在这里面我们看到了for(;;)的死循环(优雅点可以叫做自旋),那么它的作用是什么呢?
在此次进入到enq中实际上for只进行了两次,第一次给头节点设置了一个没有实际数据的head节点,第二次将传入的node加入到了队尾,那么以上工作我们是可以在一次循环中完成的,就比如以下代码块中的实现方式
private Node enq(final Node node) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}
其实自旋是为了保证线程安全,在t2线程获取锁的时候可能也有其它线程正在争抢lock,就比如恰好有线程在t2执行Node t = tail
和 compareAndSetHead(new Node())
之间的时候初始化成功了队列设置了head节点, 那么compareAndSetHead
就会返回false不会进入这个分支,这时候就会重新获取tail节点再将传入的node节点插入到tail地next中,但是,此时tail可能也会被别的线程更改,那么就需要不断地自旋尝试修改直到成功位置,自旋结束。
addWaiter结束后会进入acquireQueued,这个方法主要是会进行锁地争抢以及阻塞等待,最后根据failed字段判断是否要取消获取线程,这种情况一般就是状态被置为了Canceled
shouldParkAfterFailedAcquire判断节点是否应该阻塞等待,如果这个节点为SIGNAL状态就说明该节点的后继节点应该被阻塞,继而会执行parkAndCheckInterrupt方法对其进行阻塞,并在它被唤醒的时候判断此线程是否是被interrupt的。
正常情况下如果如果t1线程不unlock,那么t2线程将一直阻塞在parkAndCheckInterrupt方法,当其被唤醒后会继续自旋尝试获取锁。
然后我们切换回t1线程,进入unlock方法,调用AQS的release方法,然后tryRelease里面操作跟上面两个实例相同不在赘述,唯一不同的是之前实例的等待队列都为空,也就是head节点都是null,所以不会去唤醒阻塞节点,因为此时我们有t2线程所在的节点是被存储到了队列中所以,程序会进入到unparkSuccessor方法中,执行完这个方法后t2线程会从之前的WAIT状态转换为RUNNING状态即被唤醒!
t2被唤醒后会去再次tryAcquire,成功后去执行临界区的内容,然后正常释放lock锁。
结尾
上面利用ReentrantLock介绍了AQS独占锁相关内容,除此之外,后面还会通过ReentrantReadWriteLock介绍共享锁的实现、Condition的实现以及其它相关的JUC类中AQS的使用。
原文:https://juejin.cn/post/7100515070072848398