JAVA线程池的一些琐事
焦点:为什么要有线程池? java如何创建线程池,区别是什么? 几个思考的问题? 2.为什么要有线程池?
多线程特点: 资源占用多 默认一个线程的栈 1M(-Xss可配)(note: 堆外,栈溢出) 需要上下文切换 cpu切换之前的状态存储(pc,寄存器等)
线程池(资源池)都是为了解决一个主要问题:创建线程(资源)和销毁线程(资源)的成本,比维护 线程(资源)池 的成本高。
延伸可解决的问题: 限制程序对资源无限申请 可管理,可监控
对比:协程(内存8k),线程成本更高,线程池就成了常用手段之一 3. java如何创建线程池?
创建线程池的方法: new ThreadPoolExecutor() Executors.newXXXX()
Executors.newXXXX() 是 对 new ThreadPoolExecutor() 提供的封装 3.1 ThreadPoolExecutor
java中的线程池 ThreadPoolExecutor public ThreadPoolExecutor( int corePoolSize, // 核心线程数 int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize >= 0 && maximumPoolSize > 0 && maximumPoolSize >= corePoolSize && keepAliveTime >= 0L) { if (workQueue != null && threadFactory != null && handler != null) { this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } else { throw new NullPointerException(); } } else { throw new IllegalArgumentException(); } }public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } else { int c = this.ctl.get(); if (workerCountOf(c) < this.corePoolSize) { // 当前worker数量 < corePoolSize,将添加一个worker线程来工作 if (this.addWorker(command, true)) { return; } c = this.ctl.get(); } if (isRunning(c) && this.workQueue.offer(command)) { // 当前worker数量 >= corePoolSize, 将多余的任务放到 worker 队列中 int recheck = this.ctl.get(); if (!isRunning(recheck) && this.remove(command)) { this.reject(command); } else if (workerCountOf(recheck) == 0) { this.addWorker((Runnable)null, false); } } else if (!this.addWorker(command, false)) { // 如果 使用 worker 队列放不下,且 也无法增加worker线程的数量(达到maximumPoolSize)了,就执行拒绝 this.reject(command); } } } /** * 添加worker线程(core = true :占用 corePoolSize 线程的名额;core = false :占用 maximumPoolSize-corePoolSize 线程的名额) **/ private boolean addWorker(Runnable firstTask, boolean core) {}
BlockingQueue 有什么? ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。 LinkedBlockingQueue:一个基于链表结构的无界阻塞队列(capacity=2147483647),此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。 SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。 PriorityBlockingQueue:一个具有优先级的无界阻塞队列(capacity=2147483647), 内部使用PriorityQueue。
RejectedExecutionHandler 有什么? ThreadPoolExecutor.AbortPolicy 如果元素添加到线程池中失败,则直接抛运行时异常 RejectedExecutionException(默认) ThreadPoolExecutor.DiscardPolicy 如果元素添加线程池失败,则放弃,不抛异常。 ThreadPoolExecutor.CallerRunsPolicy 如果元素添加线程池失败,则主线程自己来运行任务。 ThreadPoolExecutor.DiscardOldestPolicy 如果元素添加线程池失败,会将队列中最早的元素删除之后,再尝试添加,一直重复成功为止。 3.2 Executors.newXXXX()
这些都啥区别? java.util.concurrent.Executors#newFixedThreadPool(int) // core size 和 max size 一样大,无界队列 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); }java.util.concurrent.Executors#newCachedThreadPool() // 无队列,来一个任务,开一个线程,直到 oom (内部逻辑,2147483647 之后就手动oom异常) public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory); }java.util.concurrent.Executors#newSingleThreadExecutor() // core 和 max 都是1,无界队列 public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory)); }java.util.concurrent.Executors#newScheduledThreadPool(int) // 定时执行的线程池 public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory)); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }java.util.concurrent.Executors#newWorkStealingPool(int) // 支持工作窃取的线程池 public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true); } public static ExecutorService newWorkStealingPool() { return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true); }4. 几个思考的问题4.1 核心线程怎么保活
使用阻塞队列(这就是为什么就算空队列,也一定要传入 workQueue 这个参数)。getTask 是每个worker线程,获取自己要执行的任务。
4.2 怎么实现keepAliveTime的
同上 blockQueue 的 poll 4.3 ScheduledExecutorService怎么实现任务定时执行
构造函数: // core size 传入,max size = 2147483647, workQueue = ScheduledThreadPoolExecutor.DelayedWorkQueue() public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, 2147483647, 10L, TimeUnit.MILLISECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, 2147483647, 10L, TimeUnit.MILLISECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, 2147483647, 10L, TimeUnit.MILLISECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, 2147483647, 10L, TimeUnit.MILLISECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), threadFactory, handler); }
ScheduledThreadPoolExecutor.DelayedWorkQueue: 相当于DelayQueue和PriorityQueue的结合体。(权重是 下次执行的时间),无界队列
delayQueue 是什么?是一个BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。 class MyDelayedTask implements Delayed{ private String name ; private long start = System.currentTimeMillis(); private long time ; public MyDelayedTask(String name,long time) { this.name = name; this.time = time; } /** * 需要实现的接口,获得延迟时间, 0的时候,就表示可以用了 */ @Override public long getDelay(TimeUnit unit) { return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS); } /** * 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间 */ @Override public int compareTo(Delayed o) { MyDelayedTask o1 = (MyDelayedTask) o; return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { return "MyDelayedTask{" + "name="" + name + """ + ", time=" + time + "}"; } }
常用方法:
注意:execute 并不会实现延迟效果(因为execute是线程池通用方法,仍然依赖 先core, 后queue, 再max, 最后reject)
问题:schedule,scheduleAtFixedRate,scheduleWithFixedDelay 是如何实现延迟的? public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command != null && unit != null) { RunnableScheduledFuture t = this.decorateTask((Runnable)command, new ScheduledThreadPoolExecutor.ScheduledFutureTask(command, (Object)null, this.triggerTime(delay, unit), sequencer.getAndIncrement())); // 根据 任务(command)构建一个 延迟任务对象 this.delayedExecute(t); // 延迟执行这个任务 return t; } else { throw new NullPointerException(); } } private void delayedExecute(RunnableScheduledFuture<?> task) { if (this.isShutdown()) { this.reject(task); } else { super.getQueue().add(task); // 直接将任务加到 workQueue 里面,所以 ScheduledThreadPoolExecutor 是 先queue,再core,再max,在reject if (!this.canRunInCurrentRunState(task) && this.remove(task)) { task.cancel(false); } else { this.ensurePrestart(); } } }
scheduleAtFixedRate 和 scheduleWithFixedDelay 流程一样,但是延迟任务对象 构造出来的不一样 4.4 ForJoinPool4.4.1 什么是 forkjoin 什么是 mapreduce
forkjoin 和 mapreduce 都是分治的思想。
举例子: forkjoin: 1+2+3+4+5+6 => ((1+2+3)+(4+5+6)) => (((1+2)+(3))+((4+5)+(6))) 递归拆分 拆分之后 块与块之间仍然有联系 单机 mapreduce: 1+2+3+4+5+6 => sum((1+2), (5+6), (3+4)) 拆分一次,随机拆分,拆分成目标大小的块即可 reduce 也是随机的,块与块之间 不存在联系 分布式
forkjoin模式:
4.4.2 什么是 work staling
核心思想: work stealing 工作窃取
充分利用线程进行并行计算,减少线程间的竞争(在某些情况下还是会存在竞争,比如双端队列里只有一个任务时)。 4.4.3 Java ForkJoinPool
在 ForkJoinPool 中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数 mode 决定),然后以FIFO的顺序随机窃取其他队列中的任务。
具体思路如下: 每个线程都有自己的一个WorkQueue,该工作队列是一个双端队列。 队列支持三个功能push、pop、poll push/pop只能被队列的所有者线程调用,而poll可以被其他线程调用。 划分的子任务调用fork时,都会被push到自己的队列中。 默认情况下,工作线程从自己的双端队列获出任务并执行。 当自己的队列为空时,线程随机从另一个线程的队列末尾调用poll方法窃取任务。
4.4.4 java 怎么使用 ForkJoinPool
Fork/Join框架主要包含三个模块: 任务对象: ForkJoinTask (包括RecursiveTask、RecursiveAction 和 CountedCompleter) 执行Fork/Join任务的线程: ForkJoinWorkerThread 线程池: ForkJoinPool
ForkJoinPool 只接收 ForkJoinTask 任务(在实际使用中,也可以接收 Runnable/Callable 任务,但在真正运行时,也会把这些任务封装成 ForkJoinTask 类型的任务),RecursiveTask 是 ForkJoinTask 的子类,是一个可以递归执行的 ForkJoinTask,RecursiveAction 是一个无返回值的 RecursiveTask,CountedCompleter 在任务完成执行后会触发执行一个自定义的钩子函数。 import java.util.concurrent.RecursiveTask; public class CountTask extends RecursiveTask { private int start; private int end; private int limit; public CountTask(int start, int end, int limit) { this.start = start; this.end = end; this.limit = limit; } @Override protected Long compute() { long sum = 0; if (end - start < limit) { for (int i = start; i <= end; i++) { sum += i; } } else { int mid = (start + end) / 2; CountTask countTask1 = new CountTask(start, mid, limit); CountTask countTask2 = new CountTask(mid + 1, end, limit); countTask1.fork(); countTask2.fork(); Long result1 = countTask1.join(); Long result2 = countTask2.join(); sum = result1 + result2; } return sum; } }public class Main { public static void main(String[] args) { long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask countTask = new CountTask(1, 1000000, 5); Long invoke = forkJoinPool.invoke(countTask); long end = System.currentTimeMillis(); System.out.println(invoke + "," + (end - start)); } } // 500000500000,126
提交任务的区别: invoke()会等待任务计算完毕并返回计算结果; execute()是直接向池提交一个任务来异步执行,无返回结果; submit()也是异步执行,但是会返回提交的任务,在适当的时候可通过task.get()获取执行结果。 4.4.5 ForkJoinPool 的核心参数核心线程数:默认线程数 Runtime. getRuntime ().availableProcessors() 最小线程数 1 // 默认 Runtime.getRuntime().availableProcessors() ,cpu线程数 // 最大 32767 // keepAlive 60000 ms public ForkJoinPool() { this(Math.min(32767, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, false, 0, 32767, 1, (Predicate)null, 60000L, TimeUnit.MILLISECONDS); }
队列大小:
爱奇艺改的图标真的好看么,大品牌的logo的变迁史爱奇艺换新logo了爱奇艺启用全新Logo,突破边界新绿升级化方为圆开放舒展你觉得新logo做得真的好看么?这次爱奇艺的logo改动还是蛮大的。单从图标来看,取消了外框,我猜测这边
关于EUV光刻机的新消息来了,外媒不能坐着不动了导读关于EUV光刻机的新消息来了。外国媒体ASML不能坐着不动了!众所周知,科学技术的发展现在需要严重依赖半导体芯片。美国芯片禁令后,全球芯片市场出现了芯片短缺,这也带来了整个芯片
顶住!1500万美元设备交付于俄,华为迈出了这一步华为交付设备了随着一纸禁令,当年的光刻机厂商台积电高通等企业开始逐渐地断供于华,这是大家都知道的。没想到的是,在今年进入2月之后,不少美企也开始陆续地断供于俄,诸如苹果甲骨文微软等
SilkRoad创始人同意将69370枚比特币交由美国政府拍卖暗网丝绸之路(SilkRoad)的创始人RossUlbricht在2015年暗网市场关闭后被判终身监禁,并被勒令支付1。83亿美元的赔偿金。根据一份法庭文件,RossUlbrich
别太浮夸了!华为交付俄企设备,并不是你想的那样?一个猝不及防!华为交付通信设备,竟然也值得惊呼了?综合俄媒消息与各路专家点评来看,主要是2022年4月份的华为技术对外出货交付了约合9751。95万元人民币的设备。接收这批价值15
华为的海思半导体收入锐减81,麒麟芯片短时间真的没有希望了吗根据Gartner公布的2021全球半导体研究报告,由于美国的贸易制裁,中国在全球芯片市场的整体份额受到影响。目前,华为海思已退出全球前25位半导体供应商的行列,目前排名前10位的
也太明显了?不愿赴美建芯片厂,台积电再次出招了一些肆无忌惮口嗨内容过于泛滥,标题扯上外媒(实际是凭空造谣)给外界一种台积电不行了的错觉当然了,失去彼时两大客户之一的华为海思芯片产能订单,台积电确实被小美打乱了客户多元化业务架构
逐渐消失的中层岗位互联网时代,最早消失的岗位是什么?小宝最近对大宝不太满意,觉得大宝老是高高在上,不了解民间疾苦,更关键的是涨薪幅度远不及预期,所以一直在留意外面的机会,上班也有点得过且过的意思。大
买东西,网购,大家觉得是淘宝好还是京东好?网上购物简称网购,网购已经成为人们生活中必不可缺的一部分了,想起小编十多年的网购历程,从刚开始直接汇款的易趣到现在的淘宝京东拼夕夕等等,,从刚开始的怀疑不信任,到现在的生活必备,想
失业之后如何谋出路最近针对这个问题想了很多,有想过先临时去跑外卖,跑滴滴,跑代驾,然后三者也对比了一下。1外卖虽然辛苦但投入的确少,熬夜时间少,一个月下来也能有个0。81。2k左右。2跑滴滴,由于现
浅谈2022年互联网裁员,你想知道的在这里Emm,2min就能看完。关键词收缩首先,互联网企业在过去20年,是鼎盛黄金期。众多互联网公司赚得盆满钵满。十年电脑端互联网,十年移动端互联网。那么今后十年呢?一定是物联网。其实集
电动自行车充电桩市场前景广阔数据显示,我国电动自行车保有量已达3。5亿,是一个名副其实的电动自行车大国。如此庞大的规模,带动了充电需求的日益增长,再加上国家相关政策对电动车充电有了明确规定,使得电动自行车充电
新能源汽车充一次电要多少钱?现在买车的人是越来越多了,而且很多地方对购置新能源汽车都有一定的补贴,所以在汽车市场,新能源汽车的关注度也越来越高。相比于燃油车,新能源汽车有低碳环保的先天优势,但唯一存在的短板就
电动车飞线充电起火,场景令人触目惊心近年来,广大人民群众的安全意识普遍增强,也都认识到电动车进楼道进电梯飞线充电等行为存在极大的消防安全隐患,开始使用电动自行车充电桩来充电。然而,并不是所有的地方都有条件安装充电桩,
不吐不快!使用充电桩时遇到的那些糟心事儿电动车充电过去一直都在困扰着广大车主,飞线充电固然方便,但存在极大的安全隐患。8月1日起,小区电动车禁止飞线充电,使用充电桩成为居民充电既方便又安全的方式。随着多地大力推进小区充电
加油站建充电桩,汽车充电更方便现在很多地方都在大力推进汽车充电桩建设,充电桩覆盖面积的增加确实让新能源车主充电更加方便,然而仍有部分车主在使用公共充电桩时遇到了问题,比如说充电站不好找没人管理充电桩被占用等,给
物业不给装充电桩怎么办?现在很多新能源车主在买来新车之后,如果没有在自家车位安装汽车充电桩的话,想要充电就得去找外面的公共充电桩,如果遇到排队情况,就需要等很长时间。因此,很多车主在买车之前就做好在车位安
莫让汽车充电桩沦为僵尸桩如今随着国内新能源汽车销量不断再创新高,汽车充电桩的建设数量也水涨船高。汽车充电桩数量的增加固然能为新能源车主充电提供便利,然而并非所有充电桩都能很好地做到物尽其用,现在有不少地方
共享私人充电桩将大有可为相信国庆假期开新能源汽车上高速的车主基本都遇到充电难的问题,长假期间新能源汽车增多,充电需求大大增加,仅靠现有的充电桩数量根本无法满足车主的充电需求。在我国新能源汽车市场蓬勃发展的
抓住老旧小区改造机遇,推动电动车充电桩建设自禁止电动车上楼规定正式执行以来,在小区安装电动车充电桩是解决居民充电问题的最佳方案,然而对于老旧小区而言,由于基础设施先天不足,要顺利安装充电桩并非易事。为切实解决老旧小区老旧脏
在自家车位安装充电桩有哪些好处?近年来我国新能源汽车发展突飞猛进,连续多年产销量稳定上涨,在汽车市场的渗透率也逐年增加,相信未来越来越多车主将开上新能源汽车。日益增加的新能源汽车保有量,对充电提出了更高的需求。目
现在的电动车充电桩能有多智能?下班后,小王骑着电动车回到小区,将车子停放在车棚后,再把充电线插入充电桩插座,拿出手机扫一下充电桩上的二维码,支付费用,充电就开始了随着电动自行车充电桩在各地居民区大面积覆盖,车主