问题 抱着问题,看源码,不至于让自己在看源码时懵逼,看的同时参考网上的源码分析,加深自己的理解线程池的核心线程数,最大线程数,队列间的关系?设置核心线程数,能大于最大线程数么?释放线程时,怎样才能释放核心线程?线程池为什么能维持线程不释放,随时运行各种任务?简介 线程池常用的创建方法有那么几种: newFixedThreadPool() newSingleThreadExecutor() newCachedThreadPool() newScheduledThreadPool() 线程池的创建方式,是装饰模式的一种体现,底层封装ThreadPoolExecutor这个类 在阿里巴巴开放的代码规范里,因为线程池默认使用无界队列,会导致线程无限增长导致应用oom,所以要显示的创建ThreadPoolExecutorpublicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueRunnableworkQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler);}参数说明corePoolSize代表核心线程池的个数,当线程池当前的个数大于核心线程池的时候,线程池会回收多出来的线程maximumPoolSize代表最大的线程池个数,当线程池需要执行的任务大于核心线程池的时候,会创建更多的线程,但是最大不能超过这个数keepAliveTime代表空余的线程存活的时间,当多余的线程完成任务的时候,需要多长时间进行回收,时间单位是unit去控制workQueue非常重要,这个工作队列会存放所有待执行的Runnable对象ArrayBlockingQueue:有界队列。是一种基于数组的有界阻塞队列LinkedBlockingQueue:无界队列。是一个基于链表的阻塞队列,吞吐量通常要高于ArrayBlockingQueue,newSingleThreadExecutor,newFixedThreadPool默认队列SynchronousQueue:直接提交。是一种不存储元素的阻塞队列,每个插入操作必须等另一个线程调用移除操作,否则插入操作一直处于阻塞状态。newCachedThreadPool使用了这个队列PriorityBlockingQueue:是一种具有优先权的阻塞队列,优先级大的任务可以先执行,用户由此可以控制任务的执行顺序DelayedWorkQueue:优先级队列,newScheduledThreadPool默认这个队列threadFactory创建线程时,使用的工厂DefaultThreadFactory默认,创建一个同线程组且默认优先级的线程PrivilegedThreadFactory,使用访问权限创建一个权限控制的线程handler线程池满时,提供的拒绝策略AbortPolicy:默认,抛出异常Taskr。toString()rejectedfrome。toString()DiscardPolicy:不抛异常,直接丢弃当前线程DiscardOldestPolicy::如果线程池没有中止,移除队列第一个任务,再执行当前任务CallerRunsPolicy:如果线程池没有中止,直接用调用者的线程资源执行任务执行过程 判断是否大于核心线程数,小于,创建线程超过核心线程数,判断队列是否满,不满,创建线程队列已满,判断是否超过最大线程数,没有超过,创建线程超过最大线程数,执行拒绝策略类说明An{linkExecutorService}thatexecuteseachsubmittedtaskusingoneofpossiblyseveralpooledthreads,normallyconfiguredusing{linkExecutors}factorymethods。线程池主要解决两个问题:在执行大量异步线程时改善性能,应付减少每个任务的调用开销,并且提供了一种限制和管理资源(包含线程)的方法。ThreadPoolExecutor提供了一些基本的统计,比如完成任务的数量。pThreadpoolsaddresstwodifferentproblems:theyusuallyprovideimprovedperformancewhenexecutinglargenumbersofasynchronoustasks,duetoreducedpertaskinvocationoverhead,andtheyprovideameansofboundingandmanagingtheresources,includingthreads,consumedwhenexecutingacollectionoftasks。Each{codeThreadPoolExecutor}alsomaintainssomebasicstatistics,suchasthenumberofcompletedtasks。在大部分的上下文场景里,这个类提供了很多可调整的参数和可扩展的hook。不管怎样,开发者可以更方便的创建无界线可回收的线程池,固定的线程池,单个后台线程,这些都是为大部分应用场景设置的默认线程池配置。如果以上线程方法不足以满足应用场景,可以手动配置和调校线程池ThreadPoolExecutorpTobeusefulacrossawiderangeofcontexts,thisclassprovidesmanyadjustableparametersandextensibilityhooks。However,programmersareurgedtousethemoreconvenient{linkExecutors}factorymethods{linkExecutorsnewCachedThreadPool}(unboundedthreadpool,withautomaticthreadreclamation),{linkExecutorsnewFixedThreadPool}(fixedsizethreadpool)and{linkExecutorsnewSingleThreadExecutor}(singlebackgroundthread),thatpreconfiguresettingsforthemostcommonusagescenarios。Otherwise,usethefollowingguidewhenmanuallyconfiguringandtuningthisclass:dl核心和最大线程池数量dtCoreandmaximumpoolsizesdt线程池执行器将会根据核心线程池数量和最大线程池数量自动地调整线程池大小。ddA{codeThreadPoolExecutor}willautomaticallyadjustthepoolsize(see{linkgetPoolSize})accordingtotheboundssetbycorePoolSize(see{linkgetCorePoolSize})andmaximumPoolSize(see{linkgetMaximumPoolSize})。当一个新的线程提交到方法里,如果当前执行的线程小于核心线程数,会根据请求新建一个线程,即便其他线程仍旧闲置。如果多于核心线程数但小于最大线程数线程在执行。一个新线程将被创建,直到队列满。如果需要设置maximumPoolSize为无界的,比如Integer。MAXVALUE,那么将允许线程池容纳任意数量的任务并发执行。在典型的场景中,corePoolSize和maximumPoolSize仅仅在构造中设置,但是我们也可以动态的调整用setCorePoolSize和setMaximumPoolSize函数Whenanewtaskissubmittedinmethod{linkexecute(Runnable)},andfewerthancorePoolSizethreadsarerunning,anewthreadiscreatedtohandletherequest,evenifotherworkerthreadsareidle。IftherearemorethancorePoolSizebutlessthanmaximumPoolSizethreadsrunning,anewthreadwillbecreatedonlyifthequeueisfull。BysettingcorePoolSizeandmaximumPoolSizethesame,youcreateafixedsizethreadpool。BysettingmaximumPoolSizetoanessentiallyunboundedvaluesuchas{codeInteger。MAXVALUE},youallowthepooltoaccommodateanarbitrarynumberofconcurrenttasks。Mosttypically,coreandmaximumpoolsizesaresetonlyuponconstruction,buttheymayalsobechangeddynamicallyusing{linksetCorePoolSize}and{linksetMaximumPoolSize}。dddtOndemandconstructiondtddBydefault,evencorethreadsareinitiallycreatedandstartedonlywhennewtasksarrive,butthiscanbeoverriddendynamicallyusingmethod{linkprestartCoreThread}or{linkprestartAllCoreThreads}。Youprobablywanttoprestartthreadsifyouconstructthepoolwithanonemptyqueue。dddtCreatingnewthreadsdtddNewthreadsarecreatedusinga{linkThreadFactory}。Ifnototherwisespecified,a{linkExecutorsdefaultThreadFactory}isused,thatcreatesthreadstoallbeinthesame{linkThreadGroup}andwiththesame{codeNORMPRIORITY}priorityandnondaemonstatus。BysupplyingadifferentThreadFactory,youcanalterthethreadsname,threadgroup,priority,daemonstatus,etc。Ifa{codeThreadFactory}failstocreateathreadwhenaskedbyreturningnullfrom{codenewThread},theexecutorwillcontinue,butmightnotbeabletoexecuteanytasks。ThreadsshouldpossessthemodifyThread{codeRuntimePermission}。Ifworkerthreadsorotherthreadsusingthepooldonotpossessthispermission,servicemaybedegraded:configurationchangesmaynottakeeffectinatimelymanner,andashutdownpoolmayremaininastateinwhichterminationispossiblebutnotcompleted。dddtKeepalivetimesdt如果当前线程数大于核心线程数,那些超过keepAliveTime时间的限制线程将会被终止。当线程池没有充分利用的情况下,此策略可以减少资源的消耗如果线程池变得活跃,新的线程将会被构造。我们可以用setKeepAliveTime参数动态改变,用一个Long。MAXVALU,TimeUnitNANOSECONDS作为保活时间,那么空闲的线程可以避免在线程池关闭之前被终止。保活策略只有在当前线程池线程数量大于核心线程池数量时,才起作用。allowCoreThreadTimeOut用于控制当任务线程空闲时,是否允许线程等待keepAliveTime时间,以便在这个过程中,有新的任务进来ddIfthepoolcurrentlyhasmorethancorePoolSizethreads,excessthreadswillbeterminatediftheyhavebeenidleformorethanthekeepAliveTime(see{linkgetKeepAliveTime(TimeUnit)})。Thisprovidesameansofreducingresourceconsumptionwhenthepoolisnotbeingactivelyused。Ifthepoolbecomesmoreactivelater,newthreadswillbeconstructed。Thisparametercanalsobechangeddynamicallyusingmethod{linksetKeepAliveTime(long,TimeUnit)}。Usingavalueof{codeLong。MAXVALUE}{linkTimeUnitNANOSECONDS}effectivelydisablesidlethreadsfromeverterminatingpriortoshutdown。Bydefault,thekeepalivepolicyappliesonlywhentherearemorethancorePoolSizethreads。Butmethod{linkallowCoreThreadTimeOut(boolean)}canbeusedtoapplythistimeoutpolicytocorethreadsaswell,solongasthekeepAliveTimevalueisnonzero。dddtQueuingdtBlockingQueu用于存放提交的任务,队列的实际容量与线程池大小相关联ddAny{linkBlockingQueue}maybeusedtotransferandholdsubmittedtasks。Theuseofthisqueueinteractswithpoolsizing:ul如果当前线程池小于核心线程数,执行器总是优先创建一个新线程,而不是从队列获取liIffewerthancorePoolSizethreadsarerunning,theExecutoralwaysprefersaddinganewthreadratherthanqueuing。li如果当前线程池大于核心线程数,执行器是从队列获取空闲线程,而不是从新建一个空闲线程liIfcorePoolSizeormorethreadsarerunning,theExecutoralwaysprefersqueuingarequestratherthanaddinganewthread。li如果当前线程池任务数量大于核心线程池数量,且队列中无空闲任务线程,将会创建一个任务线程,直到超出最大线程数后,则任务将会被拒绝liIfarequestcannotbequeued,anewthreadiscreatedunlessthiswouldexceedmaximumPoolSize,inwhichcase,thetaskwillberejected。liulThreadPoolExecutor有3中出队列策略Therearethreegeneralstrategiesforqueuing:olliemDirecthandoffs。emAgooddefaultchoiceforaworkqueueisa{linkSynchronousQueue}thathandsofftaskstothreadswithoutotherwiseholdingthem。Here,anattempttoqueueataskwillfailifnothreadsareimmediatelyavailabletorunit,soanewthreadwillbeconstructed。Thispolicyavoidslockupswhenhandlingsetsofrequeststhatmighthaveinternaldependencies。DirecthandoffsgenerallyrequireunboundedmaximumPoolSizestoavoidrejectionofnewsubmittedtasks。Thisinturnadmitsthepossibilityofunboundedthreadgrowthwhencommandscontinuetoarriveonaveragefasterthantheycanbeprocessed。liliemUnboundedqueues。emUsinganunboundedqueue(forexamplea{linkLinkedBlockingQueue}withoutapredefinedcapacity)willcausenewtaskstowaitinthequeuewhenallcorePoolSizethreadsarebusy。Thus,nomorethancorePoolSizethreadswilleverbecreated。(AndthevalueofthemaximumPoolSizethereforedoesnthaveanyeffect。)Thismaybeappropriatewheneachtaskiscompletelyindependentofothers,sotaskscannotaffecteachothersexecution;forexample,inawebpageserver。Whilethisstyleofqueuingcanbeusefulinsmoothingouttransientburstsofrequests,itadmitsthepossibilityofunboundedworkqueuegrowthwhencommandscontinuetoarriveonaveragefasterthantheycanbeprocessed。liliemBoundedqueues。emAboundedqueue(forexample,an{linkArrayBlockingQueue})helpspreventresourceexhaustionwhenusedwithfinitemaximumPoolSizes,butcanbemoredifficulttotuneandcontrol。Queuesizesandmaximumpoolsizesmaybetradedoffforeachother:UsinglargequeuesandsmallpoolsminimizesCPUusage,OSresources,andcontextswitchingoverhead,butcanleadtoartificiallylowthroughput。Iftasksfrequentlyblock(forexampleiftheyareIObound),asystemmaybeabletoscheduletimeformorethreadsthanyouotherwiseallow。Useofsmallqueuesgenerallyrequireslargerpoolsizes,whichkeepsCPUsbusierbutmayencounterunacceptableschedulingoverhead,whichalsodecreasesthroughput。lioldd拒绝策略dtRejectedtasksdt当执行器关闭时,或者执行器用有界的最大线程池数量和任务队列容量饱和时,新提交的任务将会被拒绝。在其他情况下,execute方法将调用RejectedExecutionHandler的rejectedExecution方法处理任务。有四种处理策略提供如下:ddNewtaskssubmittedinmethod{linkexecute(Runnable)}willbeemrejectedemwhentheExecutorhasbeenshutdown,andalsowhentheExecutorusesfiniteboundsforbothmaximumthreadsandworkqueuecapacity,andissaturated。Ineithercase,the{codeexecute}methodinvokesthe{linkRejectedExecutionHandlerrejectedExecution(Runnable,ThreadPoolExecutor)}methodofits{linkRejectedExecutionHandler}。Fourpredefinedhandlerpoliciesareprovided:olliInthedefault{linkThreadPoolExecutor。AbortPolicy},thehandlerthrowsaruntime{linkRejectedExecutionException}uponrejection。liliIn{linkThreadPoolExecutor。CallerRunsPolicy},thethreadthatinvokes{codeexecute}itselfrunsthetask。Thisprovidesasimplefeedbackcontrolmechanismthatwillslowdowntheratethatnewtasksaresubmitted。liliIn{linkThreadPoolExecutor。DiscardPolicy},ataskthatcannotbeexecutedissimplydropped。liliIn{linkThreadPoolExecutor。DiscardOldestPolicy},iftheexecutorisnotshutdown,thetaskattheheadoftheworkqueueisdropped,andthenexecutionisretried(whichcanfailagain,causingthistoberepeated。)liolItispossibletodefineanduseotherkindsof{linkRejectedExecutionHandler}classes。Doingsorequiressomecareespeciallywhenpoliciesaredesignedtoworkonlyunderparticularcapacityorqueuingpolicies。dddtHookmethodsdtddThisclassprovides{codeprotected}overridable{linkbeforeExecute(Thread,Runnable)}and{linkafterExecute(Runnable,Throwable)}methodsthatarecalledbeforeandafterexecutionofeachtask。Thesecanbeusedtomanipulatetheexecutionenvironment;forexample,reinitializingThreadLocals,gatheringstatistics,oraddinglogentries。Additionally,method{linkterminated}canbeoverriddentoperformanyspecialprocessingthatneedstobedoneoncetheExecutorhasfullyterminated。pIfhookorcallbackmethodsthrowexceptions,internalworkerthreadsmayinturnfailandabruptlyterminate。dddtQueuemaintenancedtddMethod{linkgetQueue()}allowsaccesstotheworkqueueforpurposesofmonitoringanddebugging。Useofthismethodforanyotherpurposeisstronglydiscouraged。Twosuppliedmethods,{linkremove(Runnable)}and{linkpurge}areavailabletoassistinstoragereclamationwhenlargenumbersofqueuedtasksbecomecancelled。dddtFinalizationdtddApoolthatisnolongerreferencedinaprogramemANDemhasnoremainingthreadswillbe{codeshutdown}automatically。Ifyouwouldliketoensurethatunreferencedpoolsarereclaimedevenifusersforgettocall{linkshutdown},thenyoumustarrangethatunusedthreadseventuallydie,bysettingappropriatekeepalivetimes,usingalowerboundofzerocorethreadsandorsetting{linkallowCoreThreadTimeOut(boolean)}。dddlpbExtensionexampleb。Mostextensionsofthisclassoverrideoneormoreoftheprotectedhookmethods。Forexample,hereisasubclassthataddsasimplepauseresumefeature:pre{codeclassPausableThreadPoolExecutorextendsThreadPoolExecutor{privatebooleanisPaused;privateReentrantLockpauseLocknewReentrantLock();privateConditionunpausedpauseLock。newCondition();publicPausableThreadPoolExecutor(。。。){super(。。。);}protectedvoidbeforeExecute(Threadt,Runnabler){super。beforeExecute(t,r);pauseLock。lock();try{while(isPaused)unpaused。await();}catch(InterruptedExceptionie){t。interrupt();}finally{pauseLock。unlock();}}publicvoidpause(){pauseLock。lock();try{isPausedtrue;}finally{pauseLock。unlock();}}publicvoidresume(){pauseLock。lock();try{isPausedfalse;unpaused。signalAll();}finally{pauseLock。unlock();}}}}presince1。5authorDougLea代码解析代码分析1。如果少于核心线程数,则试着用给定的第一个task启动线程。对addworker以原子方式检查运行状态和任务统计,通过返回false,来防止在增加线程时出现错误警报2。如果一个task能成功排队,这是我们仍旧需要两次检查,无论我们是否应该加入线程(因为最后检查时存在的已经死掉)或者在刚进入时线程池就关掉。因此我们重新检查状态,以确保停止时回滚回队列;如果没有,则启动新线程,3。如果不能从队列拿去,我们尝试新建线程。如果失败,我们就知道已经停止或者饱和并且丢弃这个任务intcctl。get();如果少于核心线程数,则创建if(workerCountOf(c)corePoolSize){if(addWorker(command,true))return;cctl。get();}如果正在运行,并且可以进入队列if(isRunning(c)workQueue。offer(command)){intrecheckctl。get();if(!isRunning(recheck)remove(command))reject(command);elseif(workerCountOf(recheck)0)addWorker(null,false);}elseif(!addWorker(command,false))reject(command);} 核心工作方法addWorkerprivatebooleanaddWorker(RunnablefirstTask,booleancore){try{wnewWorker(firstTask);finalThreadtw。thread;}finally{if(!workerStarted)addWorkerFailed(w);}} 查看Worker这个类包装类,调用thread。start()方法时候,实际调用的就是Worker类的run()方法privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnableWorker(RunnablefirstTask){setState(1);inhibitinterruptsuntilrunWorkerthis。firstTaskfirstTask;this。threadgetThreadFactory()。newThread(this);}{publicvoidrun(){runWorker(this);}} 具体干活的方法runWorker死循环,等条件,执行线程finalvoidrunWorker(Workerw){while(task!null(taskgetTask())!null){w。lock();try{}finally{w。unlock();}}} 判断条件getTask()死循环,拿到任务才返回如果线程超时或者大于核心线程数设置,释放线程。否则队列为空,则一直阻塞privateRunnablegetTask(){for(;;){intwcworkerCountOf(c);booleantimedallowCoreThreadTimeOutwccorePoolSize;略Runnablertimed?workQueue。poll(keepAliveTime,TimeUnit。NANOSECONDS):workQueue。take();if(r!null)returnr;略}} 大于核心线程数时,会根据心跳时间释放队列的线程 当小于等于核心线程数,会卡在workQueue。take()方法,直到拿到Runnable然后返回总结线程池的核心线程数,最大线程数,队列,线程间的关系? 答:创建时,当小于核心线程数,无论队列是否有空闲线程,总会新建线程。当大于核心线程数,优先获取队列的空闲线程,当队列满了,则新建线程,如果大于最大线程数,则该任务被拒绝。 自动销毁时,大于核心线程数,会根据KeepAliveTime时间,释放队列中的线程,直至线程数总数等于核心线程数,不再释放,核心线程数通过循环维持设置核心线程数,能大于最大线程数么? 答:不能,看源码publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueRunnableworkQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){if(corePoolSize0maximumPoolSize0maximumPoolSizecorePoolSizekeepAliveTime0)thrownewIllegalArgumentException();。。。。。}publicvoidsetMaximumPoolSize(intmaximumPoolSize){if(maximumPoolSize0maximumPoolSizecorePoolSize)thrownewIllegalArgumentException();。。。。。}释放线程时,怎样才能释放核心线程? allowCoreThreadTimeOut设置为true 释放线程时,条件判断:privateRunnablegetTask(){。。。intwcworkerCountOf(c);Areworkerssubjecttoculling?allowCoreThreadTimeOut或者超过核心线程数booleantimedallowCoreThreadTimeOutwccorePoolSize;if((wcmaximumPoolSize(timedtimedOut))(wc1workQueue。isEmpty())){if(compareAndDecrementWorkerCount(c))returnnull;continue;}。。。}线程池为什么能维持线程不释放,随时运行各种任务? 答:核心线程是在一系列的死循环里,通过队列阻塞保留。从而可以不断接受任务来进行死循环,拿到任务才返回如果线程超时或者大于核心线程数设置,释放线程。否则队列为空,则一直阻塞privateRunnablegetTask(){for(;;){intwcworkerCountOf(c);booleantimedallowCoreThreadTimeOutwccorePoolSize;略Runnablertimed?workQueue。poll(keepAliveTime,TimeUnit。NANOSECONDS):workQueue。take();if(r!null)returnr;略}}