高并发深度解析ScheduledThreadPoolExecutor类的源代码
在【高并发专题】的专栏中,我们深度分析了ThreadPoolExecutor类的源代码,而ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的子类。今天我们就来一起手撕ScheduledThreadPoolExecutor类的源代码。 构造方法
我们先来看下ScheduledThreadPoolExecutor的构造方法,源代码如下所示。 public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
从代码结构上来看,ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的子类,ScheduledThreadPoolExecutor类的构造方法实际上调用的是ThreadPoolExecutor类的构造方法。 schedule方法
接下来,我们看一下ScheduledThreadPoolExecutor类的schedule方法,源代码如下所示。 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { //如果传递的Runnable对象和TimeUnit时间单位为空 //抛出空指针异常 if (command == null || unit == null) throw new NullPointerException(); //封装任务对象,在decorateTask方法中直接返回ScheduledFutureTask对象 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask(command, null, triggerTime(delay, unit))); //执行延时任务 delayedExecute(t); //返回任务 return t; } public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) //如果传递的Callable对象和TimeUnit时间单位为空 //抛出空指针异常 if (callable == null || unit == null) throw new NullPointerException(); //封装任务对象,在decorateTask方法中直接返回ScheduledFutureTask对象 RunnableScheduledFuture t = decorateTask(callable, new ScheduledFutureTask(callable, triggerTime(delay, unit))); //执行延时任务 delayedExecute(t); //返回任务 return t; }
从源代码可以看出,ScheduledThreadPoolExecutor类提供了两个重载的schedule方法,两个schedule方法的第一个参数不同。可以传递Runnable接口对象,也可以传递Callable接口对象。在方法内部,会将Runnable接口对象和Callable接口对象封装成RunnableScheduledFuture对象,本质上就是封装成ScheduledFutureTask对象。并通过delayedExecute方法来执行延时任务。
在源代码中,我们看到两个schedule都调用了decorateTask方法,接下来,我们就看看decorateTask方法。 decorateTask方法
decorateTask方法源代码如下所示。 protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) { return task; } protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task) { return task; }
通过源码可以看出decorateTask方法的实现比较简单,接收一个Runnable接口对象或者Callable接口对象和封装的RunnableScheduledFuture任务,两个方法都是将RunnableScheduledFuture任务直接返回。在ScheduledThreadPoolExecutor类的子类中可以重写这两个方法。
接下来,我们继续看下scheduleAtFixedRate方法。 scheduleAtFixedRate方法
scheduleAtFixedRate方法源代码如下所示。 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { //传入的Runnable对象和TimeUnit为空,则抛出空指针异常 if (command == null || unit == null) throw new NullPointerException(); //如果执行周期period传入的数值小于或者等于0 //抛出非法参数异常 if (period <= 0) throw new IllegalArgumentException(); //将Runnable对象封装成ScheduledFutureTask任务, //并设置执行周期 ScheduledFutureTask sft = new ScheduledFutureTask(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); //调用decorateTask方法,本质上还是直接返回ScheduledFutureTask对象 RunnableScheduledFuture t = decorateTask(command, sft); //设置执行的任务 sft.outerTask = t; //执行延时任务 delayedExecute(t); //返回执行的任务 return t; }
通过源码可以看出,scheduleAtFixedRate方法将传递的Runnable对象封装成ScheduledFutureTask任务对象,并设置了执行周期,下一次的执行时间相对于上一次的执行时间来说,加上了period时长,时长的具体单位由TimeUnit决定。采用固定的频率来执行定时任务。
ScheduledThreadPoolExecutor类中另一个定时调度任务的方法是scheduleWithFixedDelay方法,接下来,我们就一起看看scheduleWithFixedDelay方法。 scheduleWithFixedDelay方法
scheduleWithFixedDelay方法的源代码如下所示。 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { //传入的Runnable对象和TimeUnit为空,则抛出空指针异常 if (command == null || unit == null) throw new NullPointerException(); //任务延时时长小于或者等于0,则抛出非法参数异常 if (delay <= 0) throw new IllegalArgumentException(); //将Runnable对象封装成ScheduledFutureTask任务 //并设置固定的执行周期来执行任务 ScheduledFutureTask sft = new ScheduledFutureTask(command, null,triggerTime(initialDelay, unit), unit.toNanos(-delay)); //调用decorateTask方法,本质上直接返回ScheduledFutureTask任务 RunnableScheduledFuture t = decorateTask(command, sft); //设置执行的任务 sft.outerTask = t; //执行延时任务 delayedExecute(t); //返回任务 return t; }
从scheduleWithFixedDelay方法的源代码,我们可以看出在将Runnable对象封装成ScheduledFutureTask时,设置了执行周期,但是此时设置的执行周期与scheduleAtFixedRate方法设置的执行周期不同。此时设置的执行周期规则为:下一次任务执行的时间是上一次任务完成的时间加上delay时长,时长单位由TimeUnit决定。也就是说,具体的执行时间不是固定的,但是执行的周期是固定的,整体采用的是相对固定的延迟来执行定时任务。
如果大家细心的话,会发现在scheduleWithFixedDelay方法中设置执行周期时,传递的delay值为负数,如下所示。 ScheduledFutureTask sft = new ScheduledFutureTask(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));
这里的负数表示的是相对固定的延迟。
在ScheduledFutureTask类中,存在一个setNextRunTime方法,这个方法会在run方法执行完任务后调用,这个方法更能体现scheduleAtFixedRate方法和scheduleWithFixedDelay方法的不同,setNextRunTime方法的源码如下所示。 private void setNextRunTime() { //距离下次执行任务的时长 long p = period; //固定频率执行, //上次执行任务的时间 //加上任务的执行周期 if (p > 0) time += p; //相对固定的延迟 //使用的是系统当前时间 //加上任务的执行周期 else time = triggerTime(-p); }
在setNextRunTime方法中通过对下次执行任务的时长进行判断来确定是固定频率执行还是相对固定的延迟。 triggerTime方法
在ScheduledThreadPoolExecutor类中提供了两个triggerTime方法,用于获取下一次执行任务的具体时间。triggerTime方法的源码如下所示。 private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
这两个triggerTime方法的代码比较简单,就是获取下一次执行任务的具体时间。有一点需要注意的是:delay < (Long.MAX_VALUE >> 1判断delay的值是否小于Long.MAX_VALUE的一半,如果小于Long.MAX_VALUE值的一半,则直接返回delay,否则需要处理溢出的情况。
我们看到在triggerTime方法中处理防止溢出的逻辑使用了overflowFree方法,接下来,我们就看看overflowFree方法的实现。 overflowFree方法
overflowFree方法的源代码如下所示。 private long overflowFree(long delay) { //获取队列中的节点 Delayed head = (Delayed) super.getQueue().peek(); //获取的节点不为空,则进行后续处理 if (head != null) { //从队列节点中获取延迟时间 long headDelay = head.getDelay(NANOSECONDS); //如果从队列中获取的延迟时间小于0,并且传递的delay //值减去从队列节点中获取延迟时间小于0 if (headDelay < 0 && (delay - headDelay < 0)) //将delay的值设置为Long.MAX_VALUE + headDelay delay = Long.MAX_VALUE + headDelay; } //返回延迟时间 return delay; }
通过对overflowFree方法的源码分析,可以看出overflowFree方法本质上就是为了限制队列中的所有节点的延迟时间在Long.MAX_VALUE值之内,防止在ScheduledFutureTask类中的compareTo方法中溢出。
ScheduledFutureTask类中的compareTo方法的源码如下所示。 public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
compareTo方法的主要作用就是对各延迟任务进行排序,距离下次执行时间靠前的任务就排在前面。 delayedExecute方法
delayedExecute方法是ScheduledThreadPoolExecutor类中延迟执行任务的方法,源代码如下所示。 private void delayedExecute(RunnableScheduledFuture<?> task) { //如果当前线程池已经关闭 //则执行线程池的拒绝策略 if (isShutdown()) reject(task); //线程池没有关闭 else { //将任务添加到阻塞队列中 super.getQueue().add(task); //如果当前线程池是SHUTDOWN状态 //并且当前线程池状态下不能执行任务 //并且成功从阻塞队列中移除任务 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) //取消任务的执行,但不会中断执行中的任务 task.cancel(false); else //调用ThreadPoolExecutor类中的ensurePrestart()方法 ensurePrestart(); } }
可以看到在delayedExecute方法内部调用了canRunInCurrentRunState方法,canRunInCurrentRunState方法的源码实现如下所示。 boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
可以看到canRunInCurrentRunState方法的逻辑比较简单,就是判断线程池当前状态下能够执行任务。
另外,在delayedExecute方法内部还调用了ThreadPoolExecutor类中的ensurePrestart()方法,接下来,我们看下ThreadPoolExecutor类中的ensurePrestart()方法的实现,如下所示。 void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
在ThreadPoolExecutor类中的ensurePrestart()方法中,首先获取当前线程池中线程的数量,如果线程数量小于corePoolSize则调用addWorker方法传递null和true,如果线程数量为0,则调用addWorker方法传递null和false。
关于addWork()方法的源码解析,大家可以参考【高并发专题】中的《高并发之——通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程》一文,这里,不再赘述。 reExecutePeriodic方法
reExecutePeriodic方法的源代码如下所示。 void reExecutePeriodic(RunnableScheduledFuture<?> task) { //线程池当前状态下能够执行任务 if (canRunInCurrentRunState(true)) { //将任务放入队列 super.getQueue().add(task); //线程池当前状态下不能执行任务,并且成功移除任务 if (!canRunInCurrentRunState(true) && remove(task)) //取消任务 task.cancel(false); else //调用ThreadPoolExecutor类的ensurePrestart()方法 ensurePrestart(); } }
总体来说reExecutePeriodic方法的逻辑比较简单,但是,这里需要注意和delayedExecute方法的不同点:调用reExecutePeriodic方法的时候已经执行过一次任务,所以,并不会触发线程池的拒绝策略;传入reExecutePeriodic方法的任务一定是周期性的任务。 onShutdown方法
onShutdown方法是ThreadPoolExecutor类中的钩子函数,它是在ThreadPoolExecutor类中的shutdown方法中调用的,而在ThreadPoolExecutor类中的onShutdown方法是一个空方法,如下所示。 void onShutdown() { }
ThreadPoolExecutor类中的onShutdown方法交由子类实现,所以ScheduledThreadPoolExecutor类覆写了onShutdown方法,实现了具体的逻辑,ScheduledThreadPoolExecutor类中的onShutdown方法的源码实现如下所示。 @Override void onShutdown() { //获取队列 BlockingQueue q = super.getQueue(); //在线程池已经调用shutdown方法后,是否继续执行现有延迟任务 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); //在线程池已经调用shutdown方法后,是否继续执行现有定时任务 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); //在线程池已经调用shutdown方法后,不继续执行现有延迟任务和定时任务 if (!keepDelayed && !keepPeriodic) { //遍历队列中的所有任务 for (Object e : q.toArray()) //取消任务的执行 if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); //清空队列 q.clear(); } //在线程池已经调用shutdown方法后,继续执行现有延迟任务和定时任务 else { //遍历队列中的所有任务 for (Object e : q.toArray()) { //当前任务是RunnableScheduledFuture类型 if (e instanceof RunnableScheduledFuture) { //将任务强转为RunnableScheduledFuture类型 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; //在线程池调用shutdown方法后不继续的延迟任务或周期任务 //则从队列中删除并取消任务 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { if (q.remove(t)) t.cancel(false); } } } } //最终调用tryTerminate()方法 tryTerminate(); }
ScheduledThreadPoolExecutor类中的onShutdown方法的主要逻辑就是先判断线程池调用shutdown方法后,是否继续执行现有的延迟任务和定时任务,如果不再执行,则取消任务并清空队列;如果继续执行,将队列中的任务强转为RunnableScheduledFuture对象之后,从队列中删除并取消任务。大家需要好好理解这两种处理方式。最后调用ThreadPoolExecutor类的tryTerminate方法。有关ThreadPoolExecutor类的tryTerminate方法的源码解析,大家可以参考【高并发专题】中的《高并发之——通过源码深度分析线程池中Worker线程的执行流程》一文,这里不再赘述。
至此,ScheduledThreadPoolExecutor类中的核心方法的源代码,我们就分析完了。
油价调整信息今天3月15号,国内加油站调整后9295号汽油价格今天是2022年3月15日星期二,国内成品油预计调整幅度仍旧处于正值运行,各位车主要小心油价上涨了。国际原油期货下跌,跌幅在2。9美元桶左右。WTI盘中最高触及104。77美元桶,
今日油价调整信息3月14日调整后,全国9295汽油价格最新售价表今日最新油价调整今天是3月14日,星期一。油价要连涨6次,本周油价又要创高了从2021年末至今,油价已经连涨5次,而新一轮油价的十个工作日统计周期已经进行到第7个工作日,预计油价上
奋进虎年虎力全开,为何近期中国最耀眼的省份是湖北?湖北自从2019年年底2020年年初开始,新冠病毒已经在全球肆虐已有两年多时间了,且至今仍然没有消散的迹象,而全国受到新冠影响最严重的地区,无疑就是湖北了。而根据近期国家发布的,关
对未来十年房地产行业的推演先说结论,如果下面的推演没有问题,那么到2030年,中国房地产市场将累计新增128。3亿平方米的住房面积,累计销售金额143。2万亿。下面是整个推演的完整过程2016年12月30日
居民中长期贷款历史首次负增长,楼市还要加大剂量奶一奶2月经济数据出炉,新增人民币贷款1。23万亿,不及市场预期,中断了1月开启的信贷复苏态势。从结构来看,短期贷款与票据融资继续发挥支撑作用,信贷结构恶化情况并没有缓解。分部门来看,企
明天这些板块大概率这样走白酒,医药,新能源今天来看三大指数恐慌性下跌,似乎由于是战争的影响,但总体来看还是属于内在基本面因素,三大指数上涨已经连续两两年,现在已经出现上涨无力的态势,所以将会开启深度回调探底行情。板块方面来
10个关于成长的句子,清醒且透彻见字第1044期01hr如果你只读每个人都在读的书,你也只能想到每个人都能想到的事。02hr在人间挑选属于自己的颜色,去追那只风筝,去做你想做的人。03hr成长很大一部分是接受,接
散文时光雅静,带来春天的爱恋作者子墨活着就是活一种心情,人生过的就是一种心境,简单快乐随缘,随心,也就看清了人生起落,浮生风雨,懂得珍惜和拥有,万事随缘,才能看见岁月花开。题记三月,春风吹乱了思绪,凌乱了一场
优美的句子(30)一个好的东西往往是说不清楚的,说得清楚的往往不是好东西。马云我不需要任何人来反驳我,我本身就足以反驳我自己。尼采人性的太人性的早晨的最大好处,是让我们知道今天能从头开始。生活不是为
年轮梁晓声,饥饿的记忆,还没有远到遗忘的地步年轮梁晓声,饥饿的记忆,还没有远到遗忘的地步看完这本书,我的心情久久不能平复。对于德保的死很震惊,那么善良,有责任的一个人就这样离开了爱他的家人,朋友,真的很可惜。我感动于徐克,吴
最近哪一段比较治愈你?一点文艺与美好。1。hr我会在冬日最后的一场雪结束后动身沿途采撷凛冬散去后初发的绿意在春雷响时抵达你身旁把冬天遗落在后将春天作为伴手礼献于你那便是我一生的迁徙。这句真的太美了,适合