在【高并发专题】的专栏中,我们深度分析了ThreadPoolExecutor类的源代码,而ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的子类。今天我们就来一起手撕ScheduledThreadPoolExecutor类的源代码。构造方法 我们先来看下ScheduledThreadPoolExecutor的构造方法,源代码如下所示。publicScheduledThreadPoolExecutor(intcorePoolSize){super(corePoolSize,Integer。MAXVALUE,0,NANOSECONDS,newDelayedWorkQueue());}publicScheduledThreadPoolExecutor(intcorePoolSize,ThreadFactorythreadFactory){super(corePoolSize,Integer。MAXVALUE,0,NANOSECONDS,newDelayedWorkQueue(),threadFactory);}publicScheduledThreadPoolExecutor(intcorePoolSize,RejectedExecutionHandlerhandler){super(corePoolSize,Integer。MAXVALUE,0,NANOSECONDS,newDelayedWorkQueue(),handler);}publicScheduledThreadPoolExecutor(intcorePoolSize,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){super(corePoolSize,Integer。MAXVALUE,0,NANOSECONDS,newDelayedWorkQueue(),threadFactory,handler);} 从代码结构上来看,ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的子类,ScheduledThreadPoolExecutor类的构造方法实际上调用的是ThreadPoolExecutor类的构造方法。schedule方法 接下来,我们看一下ScheduledThreadPoolExecutor类的schedule方法,源代码如下所示。publicScheduledFuturelt;?schedule(Runnablecommand,longdelay,TimeUnitunit){如果传递的Runnable对象和TimeUnit时间单位为空抛出空指针异常if(commandnullunitnull)thrownewNullPointerException();封装任务对象,在decorateTask方法中直接返回ScheduledFutureTask对象RunnableScheduledFuturelt;?tdecorateTask(command,newScheduledFutureTaskVoid(command,null,triggerTime(delay,unit)));执行延时任务delayedExecute(t);返回任务returnt;}publicVScheduledFutureVschedule(CallableVcallable,longdelay,TimeUnitunit)如果传递的Callable对象和TimeUnit时间单位为空抛出空指针异常if(callablenullunitnull)thrownewNullPointerException();封装任务对象,在decorateTask方法中直接返回ScheduledFutureTask对象RunnableScheduledFutureVtdecorateTask(callable,newScheduledFutureTaskV(callable,triggerTime(delay,unit)));执行延时任务delayedExecute(t);返回任务returnt;} 从源代码可以看出,ScheduledThreadPoolExecutor类提供了两个重载的schedule方法,两个schedule方法的第一个参数不同。可以传递Runnable接口对象,也可以传递Callable接口对象。在方法内部,会将Runnable接口对象和Callable接口对象封装成RunnableScheduledFuture对象,本质上就是封装成ScheduledFutureTask对象。并通过delayedExecute方法来执行延时任务。 在源代码中,我们看到两个schedule都调用了decorateTask方法,接下来,我们就看看decorateTask方法。decorateTask方法 decorateTask方法源代码如下所示。protectedVRunnableScheduledFutureVdecorateTask(Runnablerunnable,RunnableScheduledFutureVtask){returntask;}protectedVRunnableScheduledFutureVdecorateTask(CallableVcallable,RunnableScheduledFutureVtask){returntask;} 通过源码可以看出decorateTask方法的实现比较简单,接收一个Runnable接口对象或者Callable接口对象和封装的RunnableScheduledFuture任务,两个方法都是将RunnableScheduledFuture任务直接返回。在ScheduledThreadPoolExecutor类的子类中可以重写这两个方法。 接下来,我们继续看下scheduleAtFixedRate方法。scheduleAtFixedRate方法 scheduleAtFixedRate方法源代码如下所示。publicScheduledFuturelt;?scheduleAtFixedRate(Runnablecommand,longinitialDelay,longperiod,TimeUnitunit){传入的Runnable对象和TimeUnit为空,则抛出空指针异常if(commandnullunitnull)thrownewNullPointerException();如果执行周期period传入的数值小于或者等于0抛出非法参数异常if(period0)thrownewIllegalArgumentException();将Runnable对象封装成ScheduledFutureTask任务,并设置执行周期ScheduledFutureTaskVoidsftnewScheduledFutureTaskVoid(command,null,triggerTime(initialDelay,unit),unit。toNanos(period));调用decorateTask方法,本质上还是直接返回ScheduledFutureTask对象RunnableScheduledFutureVoidtdecorateTask(command,sft);设置执行的任务sft。outerTaskt;执行延时任务delayedExecute(t);返回执行的任务returnt;} 通过源码可以看出,scheduleAtFixedRate方法将传递的Runnable对象封装成ScheduledFutureTask任务对象,并设置了执行周期,下一次的执行时间相对于上一次的执行时间来说,加上了period时长,时长的具体单位由TimeUnit决定。采用固定的频率来执行定时任务。 ScheduledThreadPoolExecutor类中另一个定时调度任务的方法是scheduleWithFixedDelay方法,接下来,我们就一起看看scheduleWithFixedDelay方法。scheduleWithFixedDelay方法 scheduleWithFixedDelay方法的源代码如下所示。publicScheduledFuturelt;?scheduleWithFixedDelay(Runnablecommand,longinitialDelay,longdelay,TimeUnitunit){传入的Runnable对象和TimeUnit为空,则抛出空指针异常if(commandnullunitnull)thrownewNullPointerException();任务延时时长小于或者等于0,则抛出非法参数异常if(delay0)thrownewIllegalArgumentException();将Runnable对象封装成ScheduledFutureTask任务并设置固定的执行周期来执行任务ScheduledFutureTaskVoidsftnewScheduledFutureTaskVoid(command,null,triggerTime(initialDelay,unit),unit。toNanos(delay));调用decorateTask方法,本质上直接返回ScheduledFutureTask任务RunnableScheduledFutureVoidtdecorateTask(command,sft);设置执行的任务sft。outerTaskt;执行延时任务delayedExecute(t);返回任务returnt;} 从scheduleWithFixedDelay方法的源代码,我们可以看出在将Runnable对象封装成ScheduledFutureTask时,设置了执行周期,但是此时设置的执行周期与scheduleAtFixedRate方法设置的执行周期不同。此时设置的执行周期规则为:下一次任务执行的时间是上一次任务完成的时间加上delay时长,时长单位由TimeUnit决定。也就是说,具体的执行时间不是固定的,但是执行的周期是固定的,整体采用的是相对固定的延迟来执行定时任务。 如果大家细心的话,会发现在scheduleWithFixedDelay方法中设置执行周期时,传递的delay值为负数,如下所示。ScheduledFutureTaskVoidsftnewScheduledFutureTaskVoid(command,null,triggerTime(initialDelay,unit),unit。toNanos(delay)); 这里的负数表示的是相对固定的延迟。 在ScheduledFutureTask类中,存在一个setNextRunTime方法,这个方法会在run方法执行完任务后调用,这个方法更能体现scheduleAtFixedRate方法和scheduleWithFixedDelay方法的不同,setNextRunTime方法的源码如下所示。privatevoidsetNextRunTime(){距离下次执行任务的时长longpperiod;固定频率执行,上次执行任务的时间加上任务的执行周期if(p0)timep;相对固定的延迟使用的是系统当前时间加上任务的执行周期elsetimetriggerTime(p);} 在setNextRunTime方法中通过对下次执行任务的时长进行判断来确定是固定频率执行还是相对固定的延迟。triggerTime方法 在ScheduledThreadPoolExecutor类中提供了两个triggerTime方法,用于获取下一次执行任务的具体时间。triggerTime方法的源码如下所示。privatelongtriggerTime(longdelay,TimeUnitunit){returntriggerTime(unit。toNanos((delay0)?0:delay));}longtriggerTime(longdelay){returnnow()((delay(Long。MAXVALUE1))?delay:overflowFree(delay));} 这两个triggerTime方法的代码比较简单,就是获取下一次执行任务的具体时间。有一点需要注意的是:delay(Long。MAXVALUE1判断delay的值是否小于Long。MAXVALUE的一半,如果小于Long。MAXVALUE值的一半,则直接返回delay,否则需要处理溢出的情况。 我们看到在triggerTime方法中处理防止溢出的逻辑使用了overflowFree方法,接下来,我们就看看overflowFree方法的实现。overflowFree方法 overflowFree方法的源代码如下所示。privatelongoverflowFree(longdelay){获取队列中的节点Delayedhead(Delayed)super。getQueue()。peek();获取的节点不为空,则进行后续处理if(head!null){从队列节点中获取延迟时间longheadDelayhead。getDelay(NANOSECONDS);如果从队列中获取的延迟时间小于0,并且传递的delay值减去从队列节点中获取延迟时间小于0if(headDelay0(delayheadDelay0))将delay的值设置为Long。MAXVALUEheadDelaydelayLong。MAXVALUEheadDelay;}返回延迟时间returndelay;} 通过对overflowFree方法的源码分析,可以看出overflowFree方法本质上就是为了限制队列中的所有节点的延迟时间在Long。MAXVALUE值之内,防止在ScheduledFutureTask类中的compareTo方法中溢出。 ScheduledFutureTask类中的compareTo方法的源码如下所示。publicintcompareTo(Delayedother){if(otherthis)comparezeroifsameobjectreturn0;if(otherinstanceofScheduledFutureTask){ScheduledFutureTasklt;?x(ScheduledFutureTasklt;?)other;longdifftimex。time;if(diff0)return1;elseif(diff0)return1;elseif(sequenceNumberx。sequenceNumber)return1;elsereturn1;}longdiffgetDelay(NANOSECONDS)other。getDelay(NANOSECONDS);return(diff0)?1:(diff0)?1:0;} compareTo方法的主要作用就是对各延迟任务进行排序,距离下次执行时间靠前的任务就排在前面。delayedExecute方法 delayedExecute方法是ScheduledThreadPoolExecutor类中延迟执行任务的方法,源代码如下所示。privatevoiddelayedExecute(RunnableScheduledFuturelt;?task){如果当前线程池已经关闭则执行线程池的拒绝策略if(isShutdown())reject(task);线程池没有关闭else{将任务添加到阻塞队列中super。getQueue()。add(task);如果当前线程池是SHUTDOWN状态并且当前线程池状态下不能执行任务并且成功从阻塞队列中移除任务if(isShutdown()!canRunInCurrentRunState(task。isPeriodic())remove(task))取消任务的执行,但不会中断执行中的任务task。cancel(false);else调用ThreadPoolExecutor类中的ensurePrestart()方法ensurePrestart();}} 可以看到在delayedExecute方法内部调用了canRunInCurrentRunState方法,canRunInCurrentRunState方法的源码实现如下所示。booleancanRunInCurrentRunState(booleanperiodic){returnisRunningOrShutdown(periodic?continueExistingPeriodicTasksAfterShutdown:executeExistingDelayedTasksAfterShutdown);} 可以看到canRunInCurrentRunState方法的逻辑比较简单,就是判断线程池当前状态下能够执行任务。 另外,在delayedExecute方法内部还调用了ThreadPoolExecutor类中的ensurePrestart()方法,接下来,我们看下ThreadPoolExecutor类中的ensurePrestart()方法的实现,如下所示。voidensurePrestart(){intwcworkerCountOf(ctl。get());if(wccorePoolSize)addWorker(null,true);elseif(wc0)addWorker(null,false);} 在ThreadPoolExecutor类中的ensurePrestart()方法中,首先获取当前线程池中线程的数量,如果线程数量小于corePoolSize则调用addWorker方法传递null和true,如果线程数量为0,则调用addWorker方法传递null和false。 关于addWork()方法的源码解析,大家可以参考【高并发专题】中的《高并发之通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程》一文,这里,不再赘述。reExecutePeriodic方法 reExecutePeriodic方法的源代码如下所示。voidreExecutePeriodic(RunnableScheduledFuturelt;?task){线程池当前状态下能够执行任务if(canRunInCurrentRunState(true)){将任务放入队列super。getQueue()。add(task);线程池当前状态下不能执行任务,并且成功移除任务if(!canRunInCurrentRunState(true)remove(task))取消任务task。cancel(false);else调用ThreadPoolExecutor类的ensurePrestart()方法ensurePrestart();}} 总体来说reExecutePeriodic方法的逻辑比较简单,但是,这里需要注意和delayedExecute方法的不同点:调用reExecutePeriodic方法的时候已经执行过一次任务,所以,并不会触发线程池的拒绝策略;传入reExecutePeriodic方法的任务一定是周期性的任务。onShutdown方法 onShutdown方法是ThreadPoolExecutor类中的钩子函数,它是在ThreadPoolExecutor类中的shutdown方法中调用的,而在ThreadPoolExecutor类中的onShutdown方法是一个空方法,如下所示。voidonShutdown(){} ThreadPoolExecutor类中的onShutdown方法交由子类实现,所以ScheduledThreadPoolExecutor类覆写了onShutdown方法,实现了具体的逻辑,ScheduledThreadPoolExecutor类中的onShutdown方法的源码实现如下所示。OverridevoidonShutdown(){获取队列BlockingQueueRunnableqsuper。getQueue();在线程池已经调用shutdown方法后,是否继续执行现有延迟任务booleankeepDelayedgetExecuteExistingDelayedTasksAfterShutdownPolicy();在线程池已经调用shutdown方法后,是否继续执行现有定时任务booleankeepPeriodicgetContinueExistingPeriodicTasksAfterShutdownPolicy();在线程池已经调用shutdown方法后,不继续执行现有延迟任务和定时任务if(!keepDelayed!keepPeriodic){遍历队列中的所有任务for(Objecte:q。toArray())取消任务的执行if(einstanceofRunnableScheduledFuturelt;?)((RunnableScheduledFuturelt;?)e)。cancel(false);清空队列q。clear();}在线程池已经调用shutdown方法后,继续执行现有延迟任务和定时任务else{遍历队列中的所有任务for(Objecte:q。toArray()){当前任务是RunnableScheduledFuture类型if(einstanceofRunnableScheduledFuture){将任务强转为RunnableScheduledFuture类型RunnableScheduledFuturelt;?t(RunnableScheduledFuturelt;?)e;在线程池调用shutdown方法后不继续的延迟任务或周期任务则从队列中删除并取消任务if((t。isPeriodic()?!keepPeriodic:!keepDelayed)t。isCancelled()){if(q。remove(t))t。cancel(false);}}}}最终调用tryTerminate()方法tryTerminate();} ScheduledThreadPoolExecutor类中的onShutdown方法的主要逻辑就是先判断线程池调用shutdown方法后,是否继续执行现有的延迟任务和定时任务,如果不再执行,则取消任务并清空队列;如果继续执行,将队列中的任务强转为RunnableScheduledFuture对象之后,从队列中删除并取消任务。大家需要好好理解这两种处理方式。最后调用ThreadPoolExecutor类的tryTerminate方法。有关ThreadPoolExecutor类的tryTerminate方法的源码解析,大家可以参考【高并发专题】中的《高并发之通过源码深度分析线程池中Worker线程的执行流程》一文,这里不再赘述。 至此,ScheduledThreadPoolExecutor类中的核心方法的源代码,我们就分析完了。