老司机发车了,CountDownLatch到底等不等你呀
前几天我们把ReentrantLock的原理进行了详细的讲解,不熟悉的同学可以翻看前文,今天我们介绍另一种基于AQS的同步工具CountDownLatch。
CountDownLatch被称为倒计时器,也叫闭锁,是juc包下的工具类,同时也是共享锁的一种实现。它的作用是可以让一个或多个线程等待,直到所有线程的任务都执行完之后再继续往下执行。
举个简单的例子:阿Q高中时期都是乘坐大巴往返于县城与农村,那时的司机为了利益的最大化,会在汽车满员的情况下才会发车。
如果我们把乘客去车站乘车比作一个一个的线程,那CountDownLatch做的事就是等大家到齐之前的等待工作。
我们从源码的角度来分析下它的工作原理
谁来决定公交车上的座位数?
公交车上的座位数是由汽车制造商决定的,在CountDownLatch中也会存在这样一个值count,用来表示需要等待的线程个数。
count值是在CountDownLatch的构造函数中进行初始化的publicCountDownLatch(intcount){if(count0)thrownewIllegalArgumentException(count0);this。syncnewSync(count);}Sync(intcount){设置AQS中的state为count值setState(count);}
计数值count是一次性的,当它的值减为0后就不会再变化了,这也是其存在的不足之处。
谁来确定乘客全部到齐?
在汽车发车前检票员会对车上的乘客数量进行清点,如果满员了就会通知司机开车。
当然也可以采用这种方法:在得知车座位数的前提下,每上来一位乘客,座位数进行减一操作。CountDownLatch就是采用的上述方法,它的countDown()方法会对state的值执行减1操作。
让我们从源码的角度来认识一下该方法。publicvoidcountDown(){释放共享锁sync。releaseShared(1);}publicfinalbooleanreleaseShared(intarg){if(tryReleaseShared(arg)){doReleaseShared();returntrue;}returnfalse;}
先尝试释放锁,如果返回true,则执行释放操作,反之不执行。我们分析下上边的两个方法protectedbooleantryReleaseShared(intreleases){for(;;){获取当前等待的线程数量intcgetState();等待线程数为0,表示没有等待线程,故不需要释放锁资源if(c0)returnfalse;执行减1操作intnextcc1;自旋CAS将state的属性值1if(compareAndSetState(c,nextc))returnnextc0;}}
最后一步中,如果减一之后为0,则说明没有其它线程等待,需要执行释放锁操作,返回true,反之不需要。
在开始分析doReleaseShared()之前,我们先来补全一下AQS中waitStatus的状态说明初始化状态:0,表示当前节点在同步队列中,等待获取锁;CANCELLED:1,表示当前节点取消获取锁;SIGNAL:1,表示后续节点等待当前节点唤醒;CONDITION:2,表示当前线程正在条件等待队列中;PROPAGATE:3,共享模式,前置节点唤醒后续节点后,唤醒操作无条件传播下去;释放锁:唤醒后续节点privatevoiddoReleaseShared(){for(;;){Nodehhead;不是null且不为尾节点,因为尾节点没有后续节点需要唤醒了if(h!nullh!tail){intwsh。waitStatus;只有状态为1才可以唤醒后续节点if(wsNode。SIGNAL){将waitStatus设置为0失败会继续循环if(!compareAndSetWaitStatus(h,Node。SIGNAL,0))continue;unparkSuccessor(h);}将waitStatus设置为PROPAGATE失败会继续循环elseif(ws0!compareAndSetWaitStatus(h,0,Node。PROPAGATE))continue;}if(hhead)break;}}
unparkSuccessor()方法用于唤醒AQS中被挂起的线程,在ReentrantLock的原理中讲过了,此处不再赘述。
小结:当线程使用countDown()方法时,其实是使用了tryReleaseShared()方法以CAS的操作来减少state,直至state为0,进而释放锁资源,唤醒后续节点。
谁来发车?
肯定是司机来发车呀,那我们的CountDownLatch是如何实现的呢?
CountDownLatch中的await()方法,就是等待线程的总开关,当发现state的值为0时会释放所有的等待线程,发车了。
我们从源码角度来看下它是如何工作的publicvoidawait()throwsInterruptedException{sync。acquireSharedInterruptibly(1);}publicfinalvoidacquireSharedInterruptibly(intarg)throwsInterruptedException{如果线程中断了,直接抛出中断异常if(Thread。interrupted())thrownewInterruptedException();如果小于0,代表state不为0,即还有任务未执行完毕,会执行获取共享锁的操作if(tryAcquireShared(arg)0)doAcquireSharedInterruptibly(arg);}protectedinttryAcquireShared(intacquires){return(getState()0)?1:1;}
我们来看看它到底是如何获取共享锁的privatevoiddoAcquireSharedInterruptibly(intarg)throwsInterruptedException{将当前线程封装成node放到队尾finalNodenodeaddWaiter(Node。SHARED);booleanfailedtrue;try{for(;;){finalNodepnode。predecessor();if(phead){intrtryAcquireShared(arg);state为0,表示此时等待线程全部执行完毕,r为1。if(r0){setHeadAndPropagate(node,r);p。nextnull;failedfalse;return;}}从当前node节点向前寻找有效节点,并保证有效节点的waitStatus状态为1if(shouldParkAfterFailedAcquire(p,node)挂起线程parkAndCheckInterrupt())在拿锁的期间,如果被中断了,那么会抛出异常,取消拿锁thrownewInterruptedException();}}finally{if(failed)将当前节点设置为失效节点,并挂到最近的有效节点后边,上文中有图解cancelAcquire(node);}}
其中最重要的就是setHeadAndPropagate()方法privatevoidsetHeadAndPropagate(Nodenode,intpropagate){Nodehhead;将当前node设置为head,并将node的线程置为空setHead(node);if(propagate0hnullh。waitStatus0(hhead)nullh。waitStatus0){Nodesnode。next;if(snulls。isShared())释放锁:唤醒后续节点doReleaseShared();}}
小结:当线程使用await()方法时会将当前线程封装成node加入AQS队列中,如果发现state不为0,说明还有任务未执行完成,继续阻塞;如果state为0,会释放掉所有的等待线程,执行await()之后的数据。
流程图了解一下
理论讲完了,那我们用代码来演示下上边的例子publicstaticvoidmain(String〔〕args)throwsInterruptedException{intcount10;设置线程池并发数ExecutorServiceexecutorServiceExecutors。newFixedThreadPool(count);假设大巴可以拉十个乘客,初始化stateCountDownLatchcountDownLatchnewCountDownLatch(count);for(inti0;icount;i){finalintnumi;executorService。execute((){try{Thread。sleep((long)(newRandom()。nextDouble()3000)1000);System。out。println(乘客坐在了(num1)号座位上);}catch(InterruptedExceptionexception){exception。printStackTrace();}finally{countDownLatch。countDown();}});}System。out。println(司机等待乘客上车);countDownLatch。await();System。out。println(发车了);executorService。shutdown();}
执行结果如下:
细心地同学肯定会问了:如果遇上刮风下雨,来坐车的人少了,那已经上车的乘客岂不是回不了家了?
当然不是了,大巴其实也是有时间观念的,即使车上的乘客不满员到了一定的时间司机也会发车的,另外还会在路上顺道稍几个人上车。那我们的CountDownLatch是如何实现的呢?
CountDownLatch还提供了一个await(longtimeout,TimeUnitunit)方法,在一定的时间间隔内会阻塞当前线程,等待count个线程执行任务,一旦超出了等待时间,便会继续往下执行。
我们将上边的countDownLatch。await();替换为countDownLatch。await(3,TimeUnit。SECONDS);,执行结果如下所示
上文中的例子是CountDownLatch的其中一种用法,即主线程等待其他线程执行完毕之后再执行。它还有另一种用法,即实现多个线程开始执行任务的最大并行性,类似发令枪响前,运动员统一在起跑线就位的场景。publicstaticvoidmain(String〔〕args)throwsInterruptedException{设置线程池并发数ExecutorServiceexecutorServiceExecutors。newFixedThreadPool(10);CountDownLatchcountDownLatchnewCountDownLatch(1);一组有6名运动员for(inti0;i6;i){finalintnumi;executorService。execute((){try{System。out。println(运动员(num1)等待发令枪响);countDownLatch。await();System。out。println(运动员(num1)开始起跑);}catch(InterruptedExceptionexception){exception。printStackTrace();}});}Thread。sleep(3000);countDownLatch。countDown();System。out。println(发令枪响);executorService。shutdown();}
执行结果如下
说了这么多,都是样例?你有没有在项目中应用过呢?
回答当然是Yes了,之前的运营端有个统计页面,要求统计用户新增数量、订单数量、商品交易总额等多张表的指标值,为了提高执行速率,我就启用了多个子线程分别去统计,用CountDownLatch来等待它们的统计结果。