彻底了解线程池
我们开始今天的主题:线程池。线程池是面试中必问的八股文,我将涉及到到的问题分为3大类:基础使用线程池是什么?为什么要使用线程池?Executor框架是什么?Java提供了哪些线程池?实现原理线程池的底层原理是如何实现的?创建线程池的参数有哪些?线程池中的线程是什么时间创建的?系统设计如何合理的设置线程池的大小?如果服务器宕机,怎么处理队列中的任务?
希望今天的内容能够帮你解答以上的问题。
Tips:本文使用Java11源码进行分析;文章会在源码中添加注释,关键内容会有单独的分析。池化思想
在你的编程生涯中,一定遇到过各种各样的池,如:数据库连接池,常量池,以及今天的线程池。无一例外,它们都是借助池化思想来管理计算机中的资源。
维基百科中是这样描述池化的:
Inresourcemanagement,poolingisthegroupingtogetherofresources(assets,equipment,personnel,effort,etc。)forthepurposesofmaximizingadvantageorminimizingrisktotheusers。Thetermisusedinfinance,computingandequipmentmanagement。
池化指的是将资源汇聚到一起,以发挥优势或降低风险。
接着来看维基百科中对池的描述:
Incomputerscience,apoolisacollectionofresourcesthatarekept,inmemory,readytouse,ratherthanthememoryacquiredonuseandthememoryreleasedafterwards。Apoolclientrequestsaresourcefromthepoolandperformsdesiredoperationsonthereturnedresource。Whentheclientfinishesitsuseoftheresource,itisreturnedtothepoolratherthanreleasedandlost。
计算机科学中的池,是内存中保存资源的集合,创建资源以备使用,停用时回收,而不是使用时创建,停用时丢弃。客户端从池中请求资源,并执行操作,当不再使用资源时,将资源归还到池中,而不是释放或丢弃。为什么要使用池?
首先池是资源的集合,通过池可以实现对资源的统一管理;
其次,池内存放已经创建并初始化的资源,使用时直接从池内获取,跳过了创建及初始化的过程,提高了响应速度;
最后,资源使用完成后归还到池中,而非丢弃或销毁,提高资源的利用率。线程池
池化思想的引入是为了解决资源管理中遇到的问题,而线程池正是借助池化思想实现的线程管理工具。那么线程池可以帮助我们解决哪些实际的问题呢?
最直接的是控制线程的创建,不加以限制的创建线程会耗尽系统资源。不信的话你可以试试下面的代码:publicstaticvoidmain(String〔〕args){while(true){newThread((){})。start();}}复制代码
Tips:卡顿警告
其次,线程的创建和销毁是需要时间的,借助线程池可以有效的避免线程频繁的创建和销毁线程,提高程的序响应速度。
问题解答:线程池是什么?为什么要使用线程池?Executor体系
Java中提供了功能完善的Executor体系,用于实现线程池。先来了解下Executor体系中的核心成员间的关系:
Executor体系的最顶层是Executor接口和ExecutorService接口,它们定义了Executor体系的核心功能。Executor接口
Executor接口的注释:
AnobjectthatexecutessubmittedRunnabletasks。Thisinterfaceprovidesawayofdecouplingtasksubmissionfromthemechanicsofhoweachtaskwillberun,includingdetailsofthreaduse,scheduling,etc。AnExecutorisnormallyusedinsteadofexplicitlycreatingthreads。
Executor接口非常简单,只定义了execute方法,主要目的是将Runnable任务与执行机制(线程,调度任务等)解耦,提供了执行Runnable任务的方法。publicinterfaceExecutor{Executesthegivencommandatsometimeinthefuture。Thecommandmayexecuteinanewthread,inapooledthread,orinthecallingthread,atthediscretionofthe{codeExecutor}implementation。voidexecute(Runnablecommand);}复制代码ExecutorService接口
ExecutorService接口继承了Executor接口,拓展了Executor接口的能力。ExecutorService接口的注释:
AnExecutorthatprovidesmethodstomanageterminationandmethodsthatcanproduceaFuturefortrackingprogressofoneormoreasynchronoustasks。
ExecutorService接口关键方法的声明:publicinterfaceExecutorServiceextendsExecutor{Initiatesanorderlyshutdowninwhichpreviouslysubmittedtasksareexecuted,butnonewtaskswillbeaccepted。Invocationhasnoadditionaleffectifalreadyshutdown。voidshutdown();Attemptstostopallactivelyexecutingtasks,haltstheprocessingofwaitingtasks,andreturnsalistofthetasksthatwereawaitingexecution。Thismethoddoesnotwaitforactivelyexecutingtaskstoterminate。Use{linkawaitTerminationawaitTermination}todothat。ListRunnableshutdownNow();booleanisShutdown();booleanisTerminated();Blocksuntilalltaskshavecompletedexecutionafterashutdownrequest,orthetimeoutoccurs,orthecurrentthreadisinterrupted,whicheverhappensfirst。booleanawaitTermination(longtimeout,TimeUnitunit)throwsInterruptedException;TFutureTsubmit(CallableTtask);TFutureTsubmit(Runnabletask,Tresult);SubmitsaRunnabletaskforexecutionandreturnsaFuturerepresentingthattask。TheFutures{codeget}methodwillreturn{codenull}uponemsuccessfulemcompletion。Futurelt;?submit(Runnabletask);}复制代码
对关键方法做一个说明:继承自Executor接口:execute:执行Runnable任务;ExecutorService接口定义的方法:submit:执行Runnable或Callable任务,并返回Future;shutdown:允许已提交的任务执行完毕,但不接受新任务的关闭;shutdownNow:尝试关闭所有任务(正在等待执行),并返回等待执行的任务。
Tips:其余方法建议阅读源码中的注释,即便是提到的4个方法,也要阅读注释。
问题解答:Executor框架是什么?ThreadPoolExecutor核心流程
Executor体系中,大家最熟悉的一定是ThreadPoolExecutor实现了,也是我们能够实现自定义线程池的基础。接下来逐步分析ThreadPoolExecutor的实现原理。构造线程池
ThreadPoolExecutor提供了4个构造方法,我们来看参数最全的那个构造方法:publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueRunnableworkQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){if(corePoolSize0maximumPoolSize0maximumPoolSizecorePoolSizekeepAliveTime0)thrownewIllegalArgumentException();if(workQueuenullthreadFactorynullhandlernull)thrownewNullPointerException();this。corePoolSizecorePoolSize;this。maximumPoolSizemaximumPoolSize;this。workQueueworkQueue;this。keepAliveTimeunit。toNanos(keepAliveTime);this。threadFactorythreadFactory;this。handlerhandler;}复制代码
ThreadPoolExecutor的构造方法提供了7个参数:intcorePoolSize:线程池的核心线程数量,创建线程的数量小于等于corePoolSize时,会一直创建线程;intmaximumPoolSize:线程池的最大线程数量,当线程数量等于corePoolSize后且队列已满,允许继续创建(maximumPoolSizecorePoolSize)(maximumPoolSizecorePoolSize)(maximumPoolSizecorePoolSize)个线程;longkeepAliveTime:线程的最大空闲时间,当创建了超出corePoolSize数量的线程后,这些线程在不执行任务时能够存活的时间,超出keepAliveTime后会被销毁;TimeUnitunit:keepAliveTime的单位;BlockingQueueworkQueue:阻塞队列,用于保存等待执行的任务;ThreadFactorythreadFactory:线程工厂,用于创建线程,默认使用Executors。defaultThreadFactory()。RejectedExecutionHandlerhandler:拒绝策略,当队列已满,且没有空闲的线程时,执行的拒绝任务的策略。
Tips:有些小伙伴会疑问,如果每次执行一个任务,执行完毕后再执行新任务,线程池依旧会创建corePoolSize个线程吗?答案是会的,后文解释。
问题解答:创建线程池的参数有哪些?主控状态CTL与线程池状态
ThreadPoolExecutor中定义了主控状态CTL和线程池状态:Themainpoolcontrolstate,ctl,isanatomicintegerpackingtwoconceptualfieldsworkerCount,indicatingtheeffectivenumberofthreadsrunState,indicatingwhetherrunning,shuttingdownetcprivatefinalAtomicIntegerctlnewAtomicInteger(ctlOf(RUNNING,0));privatestaticfinalintCOUNTBITSInteger。SIZE3;29privatestaticfinalintCOUNTMASK(1COUNTBITS)1;00011111111111111111111111111111privatestaticfinalintRUNNING1COUNTBITS;11100000000000000000000000000000privatestaticfinalintSHUTDOWN0COUNTBITS;00000000000000000000000000000000privatestaticfinalintSTOP1COUNTBITS;00100000000000000000000000000000privatestaticfinalintTIDYING2COUNTBITS;01000000000000000000000000000000privatestaticfinalintTERMINATED3COUNTBITS;01100000000000000000000000000000privatestaticintrunStateOf(intc){returncCOUNTMASK;}privatestaticintworkerCountOf(intc){returncCOUNTMASK;}privatestaticintctlOf(intrs,intwc){returnrswc;}复制代码
CTL包含了两部分内容:线程池状态(runState,源码中使用rs替代)和工作线程数(workCount,源码中使用wc替代)。当看到位运算符和MASK一起出现时,就应该想到应用了位掩码技术。
主控状态CTL的默认值是RUNNING0即:11100000000000000000000000000000。runStateOf方法返回低29位为0的CTL,与之对应的是线程池状态,workerCountOf方法则返回高3位为0的CTl,用低29位表示工作线程数量,所以线程池最多允许536870911个线程。
Tips:工作线程指的是已经创建的线程,并不一定在执行任务,后文解释;位运算的可以参考编程技巧:高端的位运算;Java中二进制使用补码,注意原码,反码和补码间的转换。线程池的状态
注释中对线程池的状态做出了详细的说明:RUNNING:Acceptnewtasksandprocessqueuedtasks
SHUTDOWN:Dontacceptnewtasks,butprocessqueuedtasks
STOP:Dontacceptnewtasks,dontprocessqueuedtasks,andinterruptinprogresstasks
TIDYING:Alltaskshaveterminated,workerCountiszero,thethreadtransitioningtostateTIDYINGwillruntheterminated()hookmethod
TERMINATED:terminated()hascompletedRUNNING:接收新任务,处理队列中的任务;SHUTDOWN:不接收新任务,处理队列中的任务;STOP:不接收新任务,不处理队列中的任务,中断正在执行的任务;TIDYING:所有任务已经执行完毕,并且工作线程为0,转换到TIDYING状态后将执行Hook方法terminated();TERMINATED:terminated()方法执行完毕。状态的转换
注释中也对线程池状态的转换做出了详细说明:RUNNINGSHUTDOWNOninvocationofshutdown()
(RUNNINGorSHUTDOWN)STOPOninvocationofshutdownNow()
SHUTDOWNTIDYINGWhenbothqueueandpoolareempty
STOPTIDYINGWhenpoolisempty
TIDYINGTERMINATEDWhentheterminated()hookmethodhascompleted
我们通过一张状态转换图来了解线程池状态之间的转换:
结合源码,可以看到线程池的状态从RUNNING到TERMINATED其数值是单调递增的,换句话说线程池从活着到死透所对应的数值是逐步增大,所以可以使用数值间的比较去确定线程池处于哪一种状态。使用线程池
我们已经对ThreadPoolExecutor有了一个整体的认知,现在可以创建并使用线程池了:ThreadPoolExecutorthreadPoolExecutornewThreadPoolExecutor(2,4,10,TimeUnit。SECONDS,newLinkedBlockingQueue(6));threadPoolExecutor。submit((){业务逻辑});复制代码
这里我使用最简单的构造方法,我们看到在线程池中提交任务使用的是submit方法,该方法在抽象类AbstractExecutorService中实现:publicabstractclassAbstractExecutorServiceimplementsExecutorService{publicFuturelt;?submit(Runnabletask){if(tasknull)thrownewNullPointerException();RunnableFutureVoidftasknewTaskFor(task,null);execute(ftask);returnftask;}publicFuturelt;?submit(Runnabletask){if(tasknull)thrownewNullPointerException();RunnableFutureVoidftasknewTaskFor(task,null);execute(ftask);returnftask;}publicTFutureTsubmit(CallableTtask){if(tasknull)thrownewNullPointerException();RunnableFutureTftasknewTaskFor(task);execute(ftask);returnftask;}}复制代码
submit的重载方法之间只有参数列表的差别,实现逻辑是相同的,均是先封装RunnableFuture对象,再调用ThreadPoolExecutorexecute方法。
问题解答:submit()和execute()方法有什么区别?execute方法
继承自Executor接口的execute方法是线程池的关键方法:publicvoidexecute(Runnablecommand){检测待执行任务if(commandnull){thrownewNullPointerException();}获取主控状态CTLintcctl。get();STEP1:当工作线程数量小于核心线程时,执行addWorker方法if(workerCountOf(c)corePoolSize){if(addWorker(command,true)){return;}cctl。get();}当工作线程数量大于核心线程数量时STEP2:首先判断线程池是否处于运行状态,接着尝试添加到队列中if(isRunning(c)workQueue。offer(command)){再次检查线程池状态intrecheckctl。get();不再处于RUNNING,则从队列中删除当前任务,并执行拒绝策略if(!isRunning(recheck)remove(command)){reject(command);}elseif(workerCountOf(recheck)0){addWorker(null,false);}}STEP3:无法添加到队列时,尝试执行addWorkerelseif(!addWorker(command,false))addWorker执行失败,则执行拒绝策略reject(command);}复制代码
阅读execute方法的源码时需要知道一个前提,addWorker方法会检查线程池状态和工作线程数量,并执行工作任务。接着来看execute方法的3种执行情况:STEP1:线程池状态:RUNNING,工作线程数:小于核心线程数,此时执行addWorker(command,true);STEP2:线程池状态:RUNNING,工作线程数:等于核心线程数,队列:未饱和,添加到队列中;STEP3:线程池状态:RUNNING,工作线程数:等于核心线程数,队列:已饱和,执行addWorker(command,false)。
需要重点关注STEP1的部分,还记得构造线程池最后的问题吗?STEP1便解释了为什么一个接一个的执行任务,依旧会创建出corePoolSize个线程。接着我们通过一张流程图展示execute方法的执行流程:
流程图画得比较复杂,因为有些判断看似在一行中执行,实际上是借助了运算符短路的特性来决定是否执行,例如isRunning(c)workQueue。offer(command)中,如果isRunning(c)false则不会执行workQueue。offer(command)。addWorker方法privatebooleanaddWorker(RunnablefirstTask,booleancore)复制代码
返回值为布尔类型表示是否成功执行,参数列表中有两个参数:RunnablefirstTask,待执行任务;booleancore,true表示最多允许创建corePoolSize个线程,false表示使用最多允许创建maximumPoolSize个线程。
在分析execute方法的过程中,我们提前剧透了addWorker方法的功能:检查线程池状态和工作线程数量执行工作任务
因此addWorker方法的源码部分我们分成两部分来看。
Tips:再次强调本文使用Java11源码进行分析,在addWorker方法的实现上Java11与Java8存在差异。检查线程池状态和工作线程数量
第一部分是线程池状态和工作线程数量检查的源码:retry:获取主控状态CTLfor(intcctl。get();;){注释1Java11相对友好很多,减少了很多!的使用,看起来比较符合人的思维这部分判断可以分成两部分:1。至少为SHUTDOWN状态2。条件3选1满足:21,至少为STOP状态22,firstTask不为空23,workQueue为空if(runStateAtLeast(c,SHUTDOWN)(runStateAtLeast(c,STOP)firstTask!nullworkQueue。isEmpty())){returnfalse;}for(;;){coretrue,保证工作线程数量小于核心线程数量corefalse,保证线程数量小于最大线程数量if(workerCountOf(c)((core?corePoolSize:maximumPoolSize)COUNTMASK)){returnfalse;}增加工作线程数量并退出if(compareAndIncrementWorkerCount(c)){breakretry;}如果至少是SHUTDOWN状态,则重新执行cctl。get();if(runStateAtLeast(c,SHUTDOWN)){continueretry;}}}复制代码
注释1的代码并不复杂,只是需要结合线程池在不同状态下的处理逻辑来分析:当状态至少为SHUTDOWN时,什么情况不需要处理?添加新的任务(对应条件22)队列为空(对应条件23)当状态至少为STOP时,线程池应当立即停止,不接收,不处理。
Tips:线程池状态的部分说线程池状态从RUNNING到TERMINATED是单调递增的,因此在Java11的实现中才会出现runStateAtLeast方法。执行工作任务
第二部分是执行工作任务的源码:booleanworkerStartedfalse;booleanworkerAddedfalse;Workerwnull;try{创建Worker对象wnewWorker(firstTask);从worker对象中获取线程finalThreadtw。thread;if(t!null){上锁finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{intcctl。get();线程池状态检查RUNNING状态,或者小于STOP状态(处理队列中的任务)if(isRunning(c)(runStateLessThan(c,STOP)firstTasknull)){线程状态检查if(t。getState()!Thread。State。NEW){thrownewIllegalThreadStateException();}将Worker对象添加到workers中workers。add(w);workerAddedtrue;intsworkers。size();if(slargestPoolSize){记录线程池中出现过的最大线程数largestPoolSizes;}}}finally{mainLock。unlock();}if(workerAdded){启动线程t。start();workerStartedtrue;}}}finally{if(!workerStarted){addWorker执行失败addWorkerFailed中包含工作线程数减1的逻辑addWorkerFailed(w);}}returnworkerStarted;复制代码
结合两部分代码,一个正向流程是这样的:检查状态:检查是否允许创建Worker,如果允许执行compareAndIncrementWorkerCount(c),CTL中工作线程数量1;执行任务:创建Worker对象,通过Worker对象获取线程,添加到workers中,最后启动线程。
回过头看我们之前一直提到的工作线程,实际上是Worker对象,我们可以近似的将Worker对象和工作线程画上等号。
问题解答:线程池中的线程是什么时间创建的?三调addWorker
execute方法中,有3种情况调用addWorker方法:STEP1:addWorker(command,true)STEP2:addWorker(null,false)STEP3:addWorker(command,false)
STEP1和STEP3很好理解,STEP1最多允许创建corePoolSize个线程,STEP3最多允许创建maximumPoolSize个线程。STEP2就比较难理解了,传入了空任务然后调用addWorker方法。
什么情况下会执行到addWorker(null,false)?第1个条件:workerCountcorePoolSizeworkerCountgeqcorePoolSizeworkerCountcorePoolSize。第2个条件:isRunning(c)workQueue。offer(command)第3个条件:workerCountOf(recheck)0
处于RUNNING状态的条件不难理解,矛盾的是第1个条件和第3个条件。根据这两个条件可以得到:corePoolSizeworkCount0corePoolSizeleqworkCount0corePoolSizeworkCount0,也就是说允许创建核心线程数为0的线程池。
接着我们来看addWorker(null,false)做了什么?创建了Worker对象,添加到workers中,并调用了一次Thread。start,虽然没有任何待执行的任务。
为什么要创建一个Worker对象?别忘了,已经执行过workQueue。offer(command)了,需要保证线程池中至少有一个Worker,才能执行workQueue中的任务。工具人Worker
实际上ThreadPoolExecutor维护的工作线程就是Worker对象,我们来看Worker类的原码:privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{finalThreadthread;RunnablefirstTask;volatilelongcompletedTasks;Worker(RunnablefirstTask){setState(1);this。firstTaskfirstTask;通过默认线程工厂创建线程this。threadgetThreadFactory()。newThread(this);}publicvoidrun(){runWorker(this);}}复制代码
Worker继承自AbstractQueuedSynchronizer,并实现了Runnable接口。
我们重点关注构造方法,尤其是this。threadgetThreadFactory()。newThread(this),通过线程工厂创建线程,传入的Runnable接口是谁?
是Worker对象本身,也就是说如果有worker。getThread()。start(),此时会执行Worker。run方法。
Tips:AbstractQueuedSynchronizer就是大名鼎鼎的AQS,Worker借助AQS实现非重入独占锁,不过这部分不是今天的重点;Woker对象与自身的成员变量thread的关系可谓是水乳交融,好好梳理下,否则会很混乱。runWorker方法
runWorker方法传入的是Worker对象本身,来看方法实现:finalvoidrunWorker(Workerw){注释1ThreadwtThread。currentThread();Worker对象中获取执行任务Runnabletaskw。firstTask;将Worker对象中的任务置空w。firstTasknull;w。unlock();booleancompletedAbruptlytrue;try{注释2while(task!null(taskgetTask())!null){w。lock();线程池的部分状态要中断正在执行的任务if((runStateAtLeast(ctl。get(),STOP)(Thread。interrupted()runStateAtLeast(ctl。get(),STOP)))!wt。isInterrupted()){wt。interrupt();}try{beforeExecute(wt,task);try{执行任务task。run();afterExecute(task,null);}catch(Throwableex){afterExecute(task,ex);throwex;}}finally{tasknull;w。completedTasks;w。unlock();}}completedAbruptlyfalse;}finally{processWorkerExit(w,completedAbruptly);}}复制代码
大家可能会对注释1的部分比较迷惑,这个ThreadwtThread。currentThread()是什么鬼?别急,我带你从头梳理一下。
以使用线程池中的代码为例,假设是首次执行,我们看主线程做了什么:
刚才也说了,Worker对象的线程在启动后执行worker。run,也即是在runWorker方法中Thread。currentThread()是Worker对象的线程,并非主线程。
再来看注释2的部分,第一次进入循环时,执行的task是Runnabletaskw。firstTask,即初次判断task!null,第二次进入循环时,task是通过taskgetTask()获取的。
线程池中,除了当前Worker正在执行的任务,还有谁可以提供待执行任务?答案是队列,因此我们可以合理得猜测getTask()是获取队列中的任务。getTask方法privateRunnablegetTask(){上次从队列中获取任务是否超时booleantimedOutfalse;for(;;){线程池状态判断,某些状态下不需要处理队列中的任务intcctl。get();if(runStateAtLeast(c,SHUTDOWN)(runStateAtLeast(c,STOP)workQueue。isEmpty())){decrementWorkerCount();returnnull;}intwcworkerCountOf(c);allowCoreThreadTimeOut是否允许核心线程超时销毁,默认为false通过allowCoreThreadTimeOut方法设置wccorePoolSize为true表示启用了非核心线程booleantimedallowCoreThreadTimeOutwccorePoolSize;wcmaximumPoolSize,可能的情况是因为同时执行了setMaximumPoolSize方法timedtimedOut为true时,表示上次获取任务超时,当前需要进行超时控制wc1workQueue。isEmpty(),工作线程数量大于1或队列为空if((wcmaximumPoolSize(timedtimedOut))(wc1workQueue。isEmpty())){减少工作线程数量if(compareAndDecrementWorkerCount(c)){returnnull;}continue;}try{注释1从队列中获取待执行任务Runnablertimed?workQueue。poll(keepAliveTime,TimeUnit。NANOSECONDS):workQueue。take();if(r!null){returnr;}timedOuttrue;}catch(InterruptedExceptionretry){timedOutfalse;}}}复制代码
注释1的部分有两种获取任务的方式:workQueue。poll(keepAliveTime,TimeUnit。NANOSECONDS),获取队首元素,如果当前队列为空,则等待指定时间后返回null;workQueue。take(),获取队首元素,如果队列为空,则一直等待,直到有返回值。
线程池只会在一种情况下使用workQueue。take,即不允许核心线程超时销毁,同时线程池的工作线程数量小于核心线程数量,结合runWorker方法的源码我们可以得知,此时借助了阻塞队列的能力,保证runsWoker方法一直停留在taskgetTask()上,直到getTask()返回响应的任务。
而在选择使用workQueue。poll时存在两种情况:允许核心线程超时销毁,即allowCoreThreadTimeOuttrue;当前工作线程数大于核心线程数,即线程池已经创建足够数量的核心线程,并且队列已经饱和,开始创建非核心线程处理任务。
结合runWorker方法的源码我们可以知道,如果队列中的任务已经被消耗完毕,即getTask()返回null,则会跳出while循环,执行processWorkerExit方法。processWorkerExit方法privatevoidprocessWorkerExit(Workerw,booleancompletedAbruptly){runWorker执行失败的场景if(completedAbruptly){decrementWorkerCount();}finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{completedTaskCountw。completedTasks;从workers中删除Workerworkers。remove(w);}finally{mainLock。unlock();}根据线程池状态判断是否结束线程池tryTerminate();intcctl。get();STOP之下的状态,runWorker正常结束时completedAbruptlyfalse保证至少有1个worker,用于处理队列中的任务if(runStateLessThan(c,STOP)){if(!completedAbruptly){intminallowCoreThreadTimeOut?0:corePoolSize;if(min0!workQueue。isEmpty()){min1;}if(workerCountOf(c)min){return;}}runWorker异常退出时,即completedAbruptlytrue或者是workers存活少于1个addWorker(null,false);}}复制代码
processWorkerExit方法做了3件事:移除多余的Worker对象(允许销毁的核心线程或者非核心线程);尝试修改线程池状态;保证在至少存活1个Worker对象。
Tips:我跳过了tryTerminate()方法的分析,对,是故意的
问题解答:线程池的底层原理是如何实现的?销毁非核心线程
设想一个场景:已经创建了足够数量的核心线程,并且队列已经饱和,仍然有任务提交时,会是怎样的执行流程?
线程池创建非核心线程处理任务,当非核心线程执行完毕后并不会立即销毁,而是和核心线程一起去处理队列中的任务。那么当所有的任务都处理完毕之后呢?
回到runWorker中,当所有任务执行完毕后再次进入循环,getTask中判断工作线程数大于和核心线程数,此时启用workQueue。poll(keepAliveTime,TimeUnit。NANOSECONDS),而keepAliveTime就是构建线程池时设定的线程最大空闲时间,当超过keepAliveTime后仍旧没有获得任务返回null,跳出runWorker的循环,执行processWorkerExit销毁非核心线程。ThreadPoolExecutor拾遗
目前我们已经详细分析了线程池的执行流程,这里我会补充一些前文未涉及到的内容,因为是补充内容,所以涉及不会详细的解释源码。预创建线程
我们在提到线程池的优点时会特别强调一句,池内保存了创建好的资源,使用时直接取出,但线程池好像依旧是首次接到任务后才会创建资源啊?
实际上,线程池提供prestartCoreThread方法,用于预创建核心线程:publicbooleanprestartCoreThread(){returnworkerCountOf(ctl。get())corePoolSizeaddWorker(null,true);}复制代码
如果你的程序需要做出极致的优化,可以选择预创建核心线程。关闭和立即关闭
ThreadPoolExecutor提供了两个关闭的功能shutdown和shutdownNow:publicvoidshutdown(){finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{checkShutdownAccess();advanceRunState(SHUTDOWN);中断空闲线程interruptIdleWorkers();ScheduledThreadPoolExecutor的hookonShutdown();}finally{mainLock。unlock();}tryTerminate();}publicListRunnableshutdownNow(){ListRunnabletasks;finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{checkShutdownAccess();advanceRunState(STOP);中断所有线程interruptWorkers();tasksdrainQueue();}finally{mainLock。unlock();}tryTerminate();returntasks;}复制代码
两者的差别还是很明显的:shutdown将线程池的状态改为SHUTDOWN,而shutdownNow则改为STOP;shutdown不返回队列中的任务,而shutdownNow返回队列中的任务,因为STOP状态不会再去执行队列的任务;shutdown中断空闲线程,而shutdownNow则是中断所有线程。
从实现效果上来看关闭shutdown会更温和一些,而立即关闭shutdownNow则更为强烈,仿佛语气中带着不容置疑。拒绝策略
线程池不会无条件的接收任务,有两种情况下它会拒绝任务:核心线程已满,添加到队列后,线程池不再处于RUNNING状态,此时从队列删除任务,并执行拒绝策略;核心线程已满,队列已满,非核心线程已满,此时执行拒绝策略。
Java提供了RejectedExecutionHandler接口:publicinterfaceRejectedExecutionHandler{voidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor);}复制代码
因此,我们可以通过实现RejectedExecutionHandler接口,完成自定义拒绝策略。另外,Java中也提供了4种默认拒绝策略:AbortPolicy:直接抛出异常;CallerRunsPolicy:提交任务的线程执行;DiscardOldestPolicy:丢弃队列中最先加入的线程;DiscardPolicy:直接丢弃,就是啥也不干。
源码非常简单,大家自行阅读即可。Java提供了哪些线程池
如果不想自己定义线程池,Java也贴心的提供了4种内置线程池,默认线程池通过Executors获取。
Java的命名中,s后缀通常是对应工具类,通常提供大量静态方法,例如:Collections之于Collection。所以即便属于Executor体系中的一员,但却没办法在族谱上出现,打工人的悲惨命运。FixedThreadPoolpublicstaticExecutorServicenewFixedThreadPool(intnThreads){returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueueRunnable());}publicstaticExecutorServicenewFixedThreadPool(intnThreads,ThreadFactorythreadFactory){returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueueRunnable(),threadFactory);}复制代码
固定大小线程池,核心线程数和最大线程数一样,看起来都还不错,主要的问题是通过无参构造器创建的LinkedBlockingQueue,它允许的最大长度是Integer。MAXVALUE。
Tips:这也就是为什么《阿里巴巴Java开发手册》中不推荐的原因。CachedThreadPoolpublicstaticExecutorServicenewCachedThreadPool(){returnnewThreadPoolExecutor(0,Integer。MAXVALUE,60L,TimeUnit。SECONDS,newSynchronousQueueRunnable());}publicstaticExecutorServicenewCachedThreadPool(ThreadFactorythreadFactory){returnnewThreadPoolExecutor(0,Integer。MAXVALUE,60L,TimeUnit。SECONDS,newSynchronousQueueRunnable(),threadFactory);}复制代码
可以说是无限大的线程池,接到任务就创建新线程,另外SynchronousQueue是非常特殊的队列,不存储数据,每个put操作对应一个take操作。我们来分析下实际可能发生的情况:前提:大量并发涌入提交第一个任务,进入队列,判断核心线程数为0,执行addWorker(null,false),对应execute的SETP2;提交第二个任务,假设第一个任务未结束,第二个任务直接提交到队列中;提交第三个任务,假设第一个任务未结束,无法添加到队列中,执行addWorker(command,false)对应execute的SETP3。
也就是说,只要提交的够快,就会无限创建线程。SingleThreadExecutorpublicstaticExecutorServicenewSingleThreadExecutor(){returnnewFinalizableDelegatedExecutorService(newThreadPoolExecutor(1,1,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueueRunnable()));}publicstaticExecutorServicenewSingleThreadExecutor(ThreadFactorythreadFactory){returnnewFinalizableDelegatedExecutorService(newThreadPoolExecutor(1,1,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueueRunnable(),threadFactory));}复制代码
只有一个线程的线程池,问题也是在于LinkedBlockingQueue,可以无限的接收任务。ScheduledExecutorpublicstaticScheduledExecutorServicenewScheduledThreadPool(intcorePoolSize){returnnewScheduledThreadPoolExecutor(corePoolSize);}publicstaticScheduledExecutorServicenewScheduledThreadPool(intcorePoolSize,ThreadFactorythreadFactory){returnnewScheduledThreadPoolExecutor(corePoolSize,threadFactory);}publicstaticScheduledExecutorServicenewSingleThreadScheduledExecutor(){returnnewDelegatedScheduledExecutorService(newScheduledThreadPoolExecutor(1));}publicstaticScheduledExecutorServicenewSingleThreadScheduledExecutor(ThreadFactorythreadFactory){returnnewDelegatedScheduledExecutorService(newScheduledThreadPoolExecutor(1,threadFactory));}复制代码
用来执行定时任务,DelegatedScheduledExecutorService是对ScheduledExecutorService的包装。
在Executor体系的族谱中,是有体现到ScheduledExecutorService和ScheduledThreadPoolExecutor的,这部分留给大家自行分析了。
除了以上4种内置线程池外,Java还提供了内置的ForkJoinPool:publicstaticExecutorServicenewWorkStealingPool(intparallelism){returnnewForkJoinPool(parallelism,ForkJoinPool。defaultForkJoinWorkerThreadFactory,null,true);}publicstaticExecutorServicenewWorkStealingPool(){returnnewForkJoinPool(Runtime。getRuntime()。availableProcessors(),ForkJoinPool。defaultForkJoinWorkerThreadFactory,null,true);}复制代码
这部分是Java8之后提供的,我们暂时按下不表,放到后期关于ForkJoin框架中详细解释。
问题解答:Java提供了哪些线程池?合理设置线程池
通常我们在谈论合理设置线程池的时候,指的是设置线程池的corePoolSize和maximumPoolSize,合理的设置能够最大化的发挥线程池的能力。
我们先来看美团技术团队的调研结果:
无论是哪种公式,都是基于理论得出的结果,但往往理论到工程还有很长得一段路要走。
按照我的经验,合理的设置线程池可以汇总成一句话:根据理论公式预估初始设置,随后对核心业务进行压测调整线程池设置。
Java也提供了动态调整线程池的能力:publicvoidsetThreadFactory(ThreadFactorythreadFactory);publicvoidsetRejectedExecutionHandler(RejectedExecutionHandlerhandler);publicvoidsetCorePoolSize(intcorePoolSize);publicvoidsetMaximumPoolSize(intmaximumPoolSize);publicvoidsetKeepAliveTime(longtime,TimeUnitunit);复制代码
除了workQueue都能调整,本文不讨论线程池动态调整的实现。
Tips:调研结果源自于《Java线程池实现原理及其在美团业务中的实践》;该篇文章中也详细的讨论了动态化线程池的思路,推荐阅读。结语
线程池的大部分内容就到这里结束了,希望大家够通过本篇文章解答绝大部分关于线程池的问题,带给大家一些帮助,如果有错误或者不同的想法,欢迎大家留言讨论
整合粤港澳优势以科技创新提升湾区竞争力大湾观察光明日报记者安胜蓝龚亮1月28日,广东省高质量发展大会在广州召开。会上提出,广东将突出深化粤港澳合作,扎实推动横琴前海南沙三个平台重大政策重大改革重大任务落地落实。粤港澳大
君子盟的烂尾,只成就了辜清章的高光和催泪一口气追完了君子盟,没有被井柏然宋威龙这俩双男主圈粉,反而是被剧末才出现的疯批皇子辜清章圈粉了!君子盟本身具有爆相,由知名ip张公案改编一线男星井柏然宋威龙搭档,又适逢山河令这种耽
逛吃买看!首届中国非遗保护年会打卡攻略来啦非遗与旅游的深度体验可以有多好玩?首届中国非物质文化遗产保护年会告诉你答案。4800m非遗沉浸式精品馆,沉浸式体验陕西河南山东四川的非遗区域特色实景式非遗商业街区非遗大集,汇聚了以
今日欧美明星时尚街拍图集(2023年2月16日)今日欧美明星时尚街拍图集(2023年2月16日)时尚欧美明星的街拍就是行走在马路上的秀场,明星们的街拍就是时尚界的潮流风向标。明星们身穿什么牌子的衣服肩背什么大牌包包,脚蹬什么潮流
孙俪国外彻底放飞!白T外只披一件披肩,过膝靴长得可以当裤子穿女性真正的魅力,其实往往体现在一些看不见的地方,比如她的气质涵养自信和面对生活的态度,都需要你了解她之后才知道。看一个女人到底是否真的有气质?千万不要肤浅的看到她的外表,而且要从她
成本45块卖1500?除了鹿晗还有他们,看看明星潮牌有多赚钱明星想开展副业赚粉丝钱有多容易?前有欧阳娜娜一件聚酯纤维的浴袍卖999块,这几天鹿晗自创的潮牌又出了事,网上一位从事服装行业多年的测评博主自费1500元购买了鹿晗潮牌的一件上衣,测
摆脱冬日的沉闷,春日清新穿搭指南是否已经厌倦了冬季的黑白灰,在春天暖风吹的日子里,清新明亮的衣服也会让你的心情跟着明媚起来。不知道怎么搭吗,看看这里有没有适合的。打褶宽松的奶油色长裤,看上去温柔有很美好,这种宽松
学会知足总是在安慰别人,其实自己何偿又不是在找别人安慰呢?人活着的目的其实很简单也很单一,那就是活得更好些,生活,工作,家人,朋友所有的所有。可就是这么个简单的目标却也很难很难实现,因为人
客家娘酒陈桂峰一场约会,水和酒饼,以及糯米这些本质的事物,因期待,因向往携手进入土瓮里缱绻缠绵,发酵酝酿甜蜜,一些喜欢,一些爱意一些世俗的仪式,都得到确认和呈现以醇厚的方式以规定的时间谁说
一个人,能顺应上天,老天就不会辜负你01hr命由天定,事在人为,顺者昌,逆者难。人活在天地之间,要生存,就要顺天应人,与天地同心,上天才不会亏待你。有道是善者,天助之。不善者,天弃之。夫大人者,与天地合其德,与日月合
上下五千年,惟有琅琊王羲之自驾游临沂王羲之故居(三)分享历史五千年上下五千年,惟有琅琊王羲之自驾游临沂王羲之故居(三)岁次癸卯正月廿五,晴天,冷锋凛冽,当真。昨日济南15度,今早临沂0度,晚上连云港零下3度,玩我的节奏。早上,临沂名