AQS(AbstractQueuedSynchronizer)是JUC中的核心类,这个类的原理搞定了,对于理解JUC中其他工具类,比如:ReentrantLock、CountDownLatch、CyclicBarrier、Semaphore的原理就是小意思了。目录什么是AQS基于CAS的状态更新CLH队列恢复与挂起其他特性独占锁如何实现共享锁如何实现条件处理如何使用AQS总结精华推荐什么是AQS 并发使计算机得以充分利用计算能力,有效率地完成各类程序任务。当深入地学习Java中的并发,不可避免地将学习到锁使并发的资源能被正确访问的手段。锁的学习也将分为两部分,一部分是如何加解锁,另一部分是把锁分配给谁。 AQS(AbstractQueuedSynchronizer)也叫抽象队列同步器,它提供了把锁分配给谁这一问题的一种解决方案,使得锁的开发人员可以将精力放在如何加解锁上,避免陷于把锁进行分配而带来的种种细节陷阱之中。 例如JUC中,如CountDownLatch、Semaphore、ReentrantLock、ReentrantReadWriteLock等并发工具,均是借助AQS完成他们的所需要的锁分配问题。基于CAS的状态更新 AQS要把锁正确地分配给请求者,就需要其他的属性来维护信息,那么自身也要面对并发问题,因为信息将会被更改,而且可能来源于任意线程。 AQS使用了CAS(compareandset)协助完成自身要维护的信息的更新(后续的源码处处可见)。CAS的意义为:期望对象为某个值并设置为新的值。那么,如果不为期望的值或更新值失败,返回false;如果为期望的值并且设置成功,那么返回true。用例子表达就是我认为我的家门是开着的,我将把它关上。那么只有在家门是开着的,并且我把他关上了,这句断言为ture。 CAS是硬件层面上提供的原子操作保证,意味着任意时刻只有一个线程能访问CAS操作的对象。那么,AQS使用CAS的原因在于:CAS足够快如果并发时CAS失败时,可能通过自旋再次尝试,因为AQS知道维护信息的并发操作需要等待的时间非常短AQS对信息的维护不能导致其它线程的阻塞 因此,AQS对于自身所需要的各种信息更新,均使用CAS协助并发正确。CLH队列 CLH队列得名于Craig、Landin和Hagersten的名字缩写,他们提出实现了以自旋锁方式在并发中构建一个FIFO(先入先出)队列。在AQS中,也维护着这样一个同步队列,来记录各个线程对锁的申请状态。 每一记录单元,以AQS的内部类Node作为体现:staticfinalclassNode{表示线程取消申请锁staticfinalintCANCELLED1;表示线程正在申请锁,等待被分配staticfinalintSIGNAL1;表示线程在等待某些条件达成,再进入下一阶段staticfinalintCONDITION2;表示把对当前节点进行的操作,继续往队列传播下去staticfinalintPROPAGATE3;表示当前线程的状态volatileintwaitStatus;指向前一个节点,也叫前驱节点volatileNodeprev;指向后一个节点,也叫后继节点volatileNodenext;节点代表的线程volatileThreadthread;指向下一个代表要等待某些条件达成时,才进行下阶段的线程的节点NodenextWaiter;} 以Node的结构来看,prev和next属性将可以支持AQS可以将请求锁的线程构成双向队列,而入队列出队列,以及先入先出的特性,需要方法来支持。privatetransientvolatileNodehead;privatetransientvolatileNodetail;privateNodeenq(finalNodenode){for(;;){Nodettail;if(tnull){进入到这里,说明没有head节点,CAS操作创建一个head节点失败也不要紧,失败说明发生了并发,会走到下面的elseif(compareAndSetHead(newNode()))tailhead;}else{node。prevt;把Node加入到尾部,保证加入到为止,并发会重走if(compareAndSetTail(t,node)){t。nextnode;returnt;}}}} AQS中,以head为CLH队列头部,以tail为CLH队列尾部,当加入节点时,通过CAS和自旋保证节点正确入队。 上图解释了插入Node时,可能发生的并发情况和解决过程。AQS支持独占锁和共享锁,那么CLH队列也就需要能区分节点类型。无论那种节点,都能通过addWaiter()将节点插入到队列而不是直接调用enq()staticfinalclassNode{表明是共享锁节点staticfinalNodeSHAREDnewNode();表明是独占锁节点staticfinalNodeEXCLUSIVEnull;}privateNodeaddWaiter(Nodemode){NodenodenewNode(Thread。currentThread(),mode);Nodepredtail;if(pred!null){node。prevpred;if(compareAndSetTail(pred,node)){如果插入尾部成功,就直接返回pred。nextnode;returnnode;}}通过CAS自旋确保入队enq(node);returnnode;} 根据前面的内容,Node。waitStatus表示Node处于什么样的状态,意味着状态是可以改变的,那么CLH队列中的节点也是可以取消等待的:privatevoidcancelAcquire(Nodenode){if(nodenull)return;node。threadnull;Nodeprednode。prev;首先,找到当前节点前面未取消等待的节点while(pred。waitStatus0)node。prevpredpred。prev;方便操作NodepredNextpred。next;记录当前节点状态为取消,这样,如果发生并发,也能正确地处理掉node。waitStatusNode。CANCELLED;如果当前节点为tail,通过CAS将tail设置为找到的没被取消的pred节点if(nodetailcompareAndSetTail(node,pred)){compareAndSetNext(pred,predNext,null);}else{intws;if(pred!head((wspred。waitStatus)Node。SIGNAL(ws0compareAndSetWaitStatus(pred,ws,Node。SIGNAL)))pred。thread!null){Nodenextnode。next;if(next!nullnext。waitStatus0)移除掉找到的CANCELLED节点,整理CLH队列compareAndSetNext(pred,predNext,next);}else{表示当pred头节点,唤醒下一节点unparkSuccessor(node);}node。nextnode;helpGC}} 对于代码中处进入的情况为:pred不为头节点pred记录的线程不为空及pred的状态为SIGNAL,即等待分配到锁或及pred的状态小于0时,能通过CAS设置为SIGNAL cancelAcquire()将CLH队列整理成了新的状态,完成了并发状态下将已取消等待的节点的移除操作 那么,AQS的CLH队列如何完成FIFO的呢?恢复与挂起 前面提到,AQS只解决锁分配的问题,锁的加解锁控制就由子类进行控制,为了便于阅读,子类要实现的方法就先一笔带过。publicfinalvoidacquire(intarg){如果获取到锁,获取锁的成程序就执行下去如果获取不到锁,插入代表当前线程的Node节点放入队列中,并请求锁if(!tryAcquire(arg)acquireQueued(addWaiter(Node。EXCLUSIVE),arg))中断selfInterrupt();} 以独占锁请求锁的实现方法acquire()来看,tryAcquire()是子类要实现的控制的锁获取成功与否逻辑。addWaiter(),将新的代表当前线程的独占锁Node加入到CLH队列中,然后请求锁。finalbooleanacquireQueued(finalNodenode,intarg){booleanfailedtrue;try{booleaninterruptedfalse;for(;;){自旋读取前驱结点,因为前驱节点可能发生了改变,如取消等待操作finalNodepnode。predecessor();if(pheadtryAcquire(arg)){只有当前驱节点为head时,才有资格获取锁设置head为当前节点setHead(node);p。nextnull;helpGCfailedfalse;返回是否发生过中断returninterrupted;}更新当前节点状态,并检查线程是否发生过中断if(shouldParkAfterFailedAcquire(p,node)parkAndCheckInterrupt())interruptedtrue;}}finally{if(failed)说明发生了意料之外的异常,将节点移除,避免影响到其他节点cancelAcquire(node);}} acquireQueued()表达的逻辑为:只有当自己的前驱节点为head时,才有资格去获取锁,这表达了FIFO。获取锁成功后,会返回线程是否被中断过,结合acquire()看,如果线程被中断过,会让线程回到中断状态。以acquireQueued()看,请求锁是的过程是公平的,按照队列排列顺序申请锁。以acquire()看,请求锁的过程是不公平的,因为acquire()会先尝试获取锁再入队,意味着将在某一时刻,有线程完成插队。 那么,shouldParkAfterFailedAcquire()是把Node状态更新,parkAndCheckInterrupt则将线程挂起,恢复后返回线程是否被中断过。privatestaticbooleanshouldParkAfterFailedAcquire(Nodepred,Nodenode){intwspred。waitStatus;if(wsNode。SIGNAL)前驱节点状态为SIGNAL直接返回returntrue;if(ws0){这里和cancelAcquire()类似,整合移除node之前被取消的节点do{node。prevpredpred。prev;}while(pred。waitStatus0);pred。nextnode;}else{CAS设置前驱节点状态为SIGNALcompareAndSetWaitStatus(pred,ws,Node。SIGNAL);}returnfalse;}privatefinalbooleanparkAndCheckInterrupt(){挂起当前线程LockSupport。park(this);returnThread。interrupted();} 那么,获取锁的过程就清晰了,进入到acquireQueued()的方法,可能预见的情况如下图: 情况一:Node的前驱节点为head,那么直接拿到锁,调用acquire()的线程继续执行。 情况二:Node的前驱节点不为head,并且也是申请锁状态,那么在parkAndCheckInterrupt()中此线程将被挂机。等到线程从parkAndCheckInterrupt()中回复后,再次中acquireQueued()的自旋逻辑,此时可能发生情况一、情况二、情况三 情况三:Node的前驱节点被取消了,那么通过shouldParkAfterFailedAcquire()整合CLH队列后,走到情况一。 目前,没有申请到锁的Node在CLH队列中排队,其线程阻塞在parkAndCheckInterrupt()等待唤醒,然后继续尝试获取锁。 那么,在何时恢复线程?privatevoidunparkSuccessor(Nodenode){intwsnode。waitStatus;if(ws0)CAS修改节点状态为0compareAndSetWaitStatus(node,ws,0);Nodesnode。next;if(snulls。waitStatus0){如果s的后继节点为空或者状态大于0snull;for(Nodettail;t!nullt!node;tt。prev)从tail开始,找到最靠近head的状态不为0的节点if(t。waitStatus0)st;}if(s!null)唤醒节点中记录的线程LockSupport。unpark(s。thread);} 线程唤醒发生在取消请求时cancelAcquire(),或释放锁时,对unparkSuccessor()的调用。unparkSuccessor()将从CLH队列中唤醒最靠前的应该被唤醒的Node记录的线程,此之后,线程从parkAndCheckInterrupt()继续执行下去。 这里也以独占锁的释放锁的方法看unparkSuccessor()的调用publicfinalbooleanrelease(intarg){子类的实现,尝试解锁if(tryRelease(arg)){Nodehhead;if(h!nullh。waitStatus!0)释放锁,唤醒下一线程unparkSuccessor(h);returntrue;}returnfalse;}其他特性 上面借助独占锁的acquire()和release(),说明了AQS如何通过CLH队列对锁进行分配。此外,AQS还支持了其他的特性。 可中断向AQS请求锁的线程是可以中断的,从parkAndCheckInterrupt()会检查恢复的线程的中断状态,以让更上层的调用决定如何处理。以acquire()来看,它会让已中断过的线程回到中断状态。 可重入性控制可以通过isHeldExclusively()设置可重入性控制,在AQS中是为了共享锁服务的。当然,也可以在子类tryAcquire()等加锁的方法中,借助setExclusiveOwnerThread()和getExclusiveOwnerThread()一起实现是否可重入。 可控获取锁时间申请锁的时间,也可以控制,实现只需要通过在申请不到锁入队时,设置线程唤醒时间即可。AQS提供了其他版本的申请锁方法,流程大体一致。 并发量控制AQS通过属性state来提供控制并发量的方式,state只能通过原子性的操作修改。子类控制加解锁操作时,可以通过控制state来做出判断。独占锁如何实现 在前文中,借用独占锁的例子acquire()和release()说明了AQS是如何运作的。这里主要为其他补充。 可中断、可控获取锁时间这样的特性,提供了不同的入口方法,也实现了不同版本的acquireQueued(),其仅有少处不同。下面以中断的方式获取锁为例子抛砖引玉privatevoiddoAcquireInterruptibly(intarg)throwsInterruptedException{。。。。。。if(shouldParkAfterFailedAcquire(p,node)parkAndCheckInterrupt())与acquireQueued()主要不同,向上抛出了异常thrownewInterruptedException();。。。。。。} 中断方式获取锁关联方法为:acquireInterruptibly()doAcquireInterruptibly() 可控获取锁时间关联方法为:doAcquireNanos()doAcquireSharedNanos()共享锁如何实现 与独占锁的实现相比,共享锁的实现更复杂一些。从申请锁看privatevoiddoAcquireShared(intarg){。。。。。。与独占锁相比差异为这一段if(phead){尝试获取锁,r表示资源情况intrtryAcquireShared(arg);if(r0){获取到了锁,重新设置head,并传播setHeadAndPropagate(node,r);p。nextnull;helpGCif(interrupted)selfInterrupt();failedfalse;return;}}。。。。。。}privatevoidsetHeadAndPropagate(Nodenode,intpropagate){Nodehhead;重新设置headsetHead(node);if(propagate0hnullh。waitStatus0(hhead)nullh。waitStatus0){Nodesnode。next;if(snulls。isShared())唤醒其他的NodedoReleaseShared();}} 在共享锁的情况下,申请锁成功后,还需要考虑到有更多的资源能支持更多的并发,那么,可以唤醒Node。 进入处可能为以下任意情况:有更多的资源,即propagate0旧的head为空或未被取消新的head为空或未被取消 判断新旧head来调用doReleaseShared()的原因在于,如果旧的head已经被释放,不去检查新的head的状态,就有可能少唤醒一个Node。privatevoiddoReleaseShared(){for(;;){Nodehhead;if(h!nullh!tail){intwsh。waitStatus;if(wsNode。SIGNAL){设置头为0if(!compareAndSetWaitStatus(h,Node。SIGNAL,0))continue;唤醒下一节点unparkSuccessor(h);}elseif(ws0!compareAndSetWaitStatus(h,0,Node。PROPAGATE))continue;}if(hhead)break;}} unparkSuccessor()就不再赘述。微妙之处在于,如图 unparkSuccessor()需要唤醒一个状态小于0的节点,设想某一时刻,A、B在申请锁,C释放了锁,A拿到了锁,head状态被设置为0。时间片分配给了D,D释放锁,但是发现head状态为0,因此不进行唤醒。A获得了时间片,继续调用setHeadAndPropagate(),传入的propagate值为0,也不进行唤醒。而我们期望的结果是,B被唤醒。 如果不进行处理,那么随着程序运行,将不断地减少并发量。那么,处将头节点状态设置为PROPAGATE就避免了这个问题。在上面的例子中,A进入到setHeadAndPropagate()后将唤醒B。因此,PROPAGATE就表示了将某个行为传播下去。 与独占锁类似,其他的特性也提供了对应的入口,这里就不放出源码:可中断方式获取锁的方法为:acquireSharedInterruptibly()doAcquireSharedInterruptibly() 可控获取锁时间关联方法为:tryAcquireSharedNanos()doAcquireSharedNanos()条件处理 与独占锁不同的是,共享锁需要支持条件,即有时候,需要达到一些条件后,线程才应继续运行下去。Condition就表达了这一协作关系,它提供了模板方法,其中await()系列:表示等待条件的完成signal()、signalAll():表示条件达成的信号 AQS以ConditionObject实现了Condition的语义publicclassConditionObjectimplementsCondition,java。io。Serializable{privatetransientNodefirstWaiter;privatetransientNodelastWaiter;}staticfinalclassNode{表示下一个CONDITION状态的节点NodenextWaiter;} ConditionObject维护了一个单向队列,用来记录等待Condition达成的节点。privateNodeaddConditionWaiter(){NodetlastWaiter;if(t!nullt。waitStatus!Node。CONDITION){如果尾部节点已经不为CONDITION,那么把这些节点移除unlinkCancelledWaiters();重新指向尾部节点tlastWaiter;}NodenodenewNode(Thread。currentThread(),Node。CONDITION);if(tnull)作为头节点firstWaiternode;else作为下一节点t。nextWaiternode;更新尾部节点lastWaiternode;returnnode;}privatevoidunlinkCancelledWaiters(){NodetfirstWaiter;Nodetrailnull;while(t!null){从头结点开始,移除所有不为Node。CONDITION的节点Nodenextt。nextWaiter;if(t。waitStatus!Node。CONDITION){t。nextWaiternull;if(trailnull)firstWaiternext;elsetrail。nextWaiternext;if(nextnull)lastWaitertrail;}elsetrailt;tnext;}} CONDITION节点的插入的操作较简单,移除操作如图 任意时刻,如果条件达成,则signal()doSignal()privatevoiddoSignal(Nodefirst){do{if((firstWaiterfirst。nextWaiter)null)lastWaiternull;移除first节点的下一记录first。nextWaiternull;}while(!transferForSignal(first)加入CLH队列(firstfirstWaiter)!null);}finalbooleantransferForSignal(Nodenode){更新node的状态if(!compareAndSetWaitStatus(node,Node。CONDITION,0))returnfalse;将节点加入CLH队列Nodepenq(node);intwsp。waitStatus;if(ws0!compareAndSetWaitStatus(p,ws,Node。SIGNAL))如果取消等待或者不能设置为SIGNAL,唤起线程LockSupport。unpark(node。thread);returntrue;} 当Conditon条件达成时,将把节点从ConditionObject维护的队列移动到CLH队列,这样,当有资源时,才可被正确唤醒。挂起处位于:publicfinalvoidawait()throwsInterruptedException{if(Thread。interrupted())如果线程中断了,抛出异常thrownewInterruptedException();加入到CONDITION队列中NodenodeaddConditionWaiter();intsavedStatefullyRelease(node);记录中断的场景intinterruptMode0;while(!isOnSyncQueue(node)){自旋如果没有被加入到CLH队列中,那么挂起线程LockSupport。park(this);更新中断场景if((interruptModecheckInterruptWhileWaiting(node))!0)break;}尝试获取锁,此时Node已经在CLH队列中了if(acquireQueued(node,savedState)interruptMode!THROWIE)interruptModeREINTERRUPT;if(node。nextWaiter!null)cleanupifcancelledunlinkCancelledWaiters();if(interruptMode!0)根据中断场景做不同的处理reportInterruptAfterWait(interruptMode);} 因此,当Condition达成时,被唤醒的线程将从while(!isOnSyncQueue(node)){。。。}自旋中继续执行,粗略地来看,其过程为: 如何使用AQS AQS解决了锁的分配过程,加解锁的过程就需要子类自行实现。子类可以根据需要,提供独占锁或共享锁的实现。tryAcquire(int):获取独占锁tryRelease(int):释放独占锁tryAcquireShared(int):获取共享锁tryReleaseShared(int):释放共享锁 子类要实现的方法中,都带有int参数,一般而言,此int参数用于辅助控制AQS的state属性,也就是说,可以通过保证更改state的状态为原子性操作,即可保证并发状态。AQS也提供了compareAndSetState()的CAS操作对state进行更改。 一个简单的例子为publicstaticclassMySyncextendsAbstractQueuedSynchronizer{publicvoidlock(){acquire(0);}publicvoidunlock(){release(0);}OverrideprotectedbooleantryAcquire(intarg){returncompareAndSetState(0,1);}OverrideprotectedbooleantryRelease(intarg){returncompareAndSetState(1,0);}} 这个例子中,实现了一个独占锁,在这个例子中acquire()和release()传入的参数是无意义的,因为只是单并发,因此直接通过compareAndSetState()的成功与否完成了加解锁。总结 AQS是解决并发过程中锁分配的问题,使锁的实现者可以聚焦于加解锁的实现上。AQS的实现概要为:维护一个CLH队列里,记录每一个需要获取锁的线程;在首次请求锁时,是不公平的;在队列里的锁请求时,是公平的。当Node锁代表的线程没有请求到锁时,将被挂起,等被唤醒后,尝试再次请求锁,如果还是没有获取到锁,重复此过程。当一个Node入队时,将从队尾移除取消等待的节点,直到找到第一个未取消等待的节点,插入此节点后。当释放锁时,从CLH队里头部开始,找到第一个未取消等待的节点,唤醒。对于共享锁,如果需要等待条件,则Node进入一个单项队列,自旋,挂起;待条件达成后,将Node加入到CLH队里,请求锁;若请求到锁,继续执行线程。 此外,AQS的还支持的特性为通过CAS和自旋控制自身状态并发,足够快支持重入性判断,通过控制isHeldExclusively(),其代码位于操作CONDITION节点的各处,较零碎,因此没有将代码放出。可在tryAcquire()等子类的加锁方法中,借助setExclusiveOwnerThread()和getExclusiveOwnerThread()一起实现是否可重入支持中断。支持锁的获取时间控制。