java线程池的深入了解1
ThreadPoolExecutor先抛出几个问题线程池参数有哪些都是什么意思 线程池状态 线程池如何保证核心线程不死亡 线程池执行过程 线程池有什么好处 常见的线程池以及使用场景
直接上最简单是使用实例 public static void main(String[] args) { BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10); ThreadFactory poolName = new ThreadFactoryBuilder().setNameFormat("handsame-guy-%d").build(); ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, blockingQueue, poolName); // 开心的话这里可以加一个循环,里面做一个随机休眠 executor.execute(() -> { System.out.println("早上起来不要照镜子,不然会被自己帅死"); }); } // 早上起来不要照镜子,不然会被自己帅死二话不说直接看源码构造
先看构造:其实比较简单就是构造一些属性 /** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ /* corePoolSize 核心线程数 maximumPoolSize 最大线程数 keepAliveTime 空闲线程的最大超时时间 unit 超时时间的单位 workQueue 工作队列,保存未执行的Runnable 任务 threadFactory 创建线程的工厂类 handler 拒绝策略 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }执行
来看执行,执行有2个方法一个是 execute()另一个是submit()。 先看 execute() public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: 分为3个步骤 * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn"t, by returning false. 1、当运行的线程数少于核心线程数的时候,尝试为当前的任务启动一个新的线程。 通过调用 addWorker 自动检查runState和 workerCount,防止可能增加的 错误警报当它不应该线程,返回false。 * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. 2、如果任务成功的排队,我们仍然需要做双重检查这个线程是否需要被加入。 池关闭后进入此方法。所以我们重新检查状态,如果有必要回滚排队停止, 或启动一个新线程 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. 3、如果新的新城不能排队任务,尝试添加一个新的线程。如果它失败了, 关闭或饱和的所以拒绝这个任务。 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 1、 if (addWorker(command, true)) return; c = ctl.get(); } // 2、 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3、 else if (!addWorker(command, false)) reject(command); }
addWorker() /* firstTask 任务 core 是否核心线程 */ private boolean addWorker(Runnable firstTask, boolean core) { // 1、自旋,确定是否可以创建worker。可以则跳出循环继续操作,否则返回false retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 检查线程池状态及队列是否空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS增长workerCount,成功则跳出循环 if (compareAndIncrementWorkerCount(c)) break retry; // 重新获取ctl, 状态改变则继续外层循环,否则在内层循环 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 2、创建worker,这部分使用ReentrantLock锁 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 获取到锁以后仍需要二次检查ctl, // 上一个获取到锁处理的线程可能会改变runState // 如 ThreadFactory 创建失败 或线程池被 shut down等 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 加入工作队列 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { // 释放锁 mainLock.unlock(); } // 启动线程 if (workerAdded) { t.start(); // 更改标志位 workerStarted = true; } } } finally { // 启动失败,处理启动失败线程。 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
来看看worker长什么样子。
Worker是ThreadPoolExecutor的内部类,实现了 AQS 并继承了 Runnable。 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. 每个worker有自己的内部线程,ThreadFactory创建失败时是null */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter 完成的任务数 */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { // inhibit interrupts until runWorker 禁止线程在启动前被打断 setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker run方法 */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. // state = 0 代表未锁;state = 1 代表已锁 protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } // 终端启动的线程 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
worker实现了简单的 非重入互斥锁 。worker 实现锁的目的与传统锁的意义不太一样。其主要是为了控制线程是否可中断
再来看下 worker的核心run方法 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 循环直到 task = null (线程池关闭、超时等) // 注意这里的 getTask() 方法,我们配置的阻塞队列会在这里起作用 while (task != null || (task = getTask()) != null) { // 执行前先加锁 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 为了确保线程池运行/停止的时候线程运行/中断,需要在第二种情况下进行重新获取ctl if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { // 执行真正的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 线程退出工作 processWorkerExit(w, completedAbruptly); } }
当没有任务的时候runWorker()方法会循环getTask() 这个方法可能会阻塞 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 检查是否还继续处理任务 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? 获取是否允许超市 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }回答疑问线程池参数有哪些都是什么意思corePoolSize 核心线程数 maximumPoolSize 最大线程数 keepAliveTime 空闲线程的最大超时时间 unit 超时时间的单位 workQueue 工作队列,保存未执行的Runnable 任务 threadFactory 创建线程的工厂类 handler 拒绝策略 线程池如何保证核心线程不死亡runWorker方法中调用了 getTask方法,这个方法会阻塞。 线程池执行过程
线程池有什么好处1、节省资源、提高效率。线程的创建和销毁相对都是比较消耗资源,利用线程池可以在需要的时候直接获取一个线程这样能尽可能的避免了重复的创建和销毁线程。 2、方便管理 可以编写线程池管理代码对池中的线程统一进行管理,避免因无休止的创建线程导致系统崩溃。 线程池状态()RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED
下面这两个问题算是留尾巴吧,后面再抽时间来看 常见的线程池以及使用场景 线程池的状态的流转 扩展
在阅读源码的时候其实能看到一些变量定义初始化以及状态的判断都是用位运算判断的。比如 // 不同系统这里可能不一样,这里假设是32 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 线程个数的表示 [位数](不同平台 int 类型范围不一样) // 不管int是多少位,反正高三位 就是表示线程状态,剩余的位数表示线程数量 // 线程数量 private static final int COUNT_BITS = Integer.SIZE - 3; // 线程最大数量 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 高三位表示 线程池状态 // 运行状态 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 获取线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取线程池有效线程数量 private static int workerCountOf(int c) { return c & CAPACITY; } // 获取上面提到的32位int类型的数值 private static int ctlOf(int rs, int wc) { return rs | wc; }
这里拿RUNNING状态具体,看下具体是怎么算的。RUNNING = -1 << COUNT_BITS
这里需要假设一下 Integer.SIZE =32 => COUNT_BITS = 29
那么 RUNNING = -1 << COUNT_BITS 表示 -1二进制左移29位。 -1的二进制是多少在计算机中,负数以其正值的补码形式表达。 补码 = 反码 + 11的源码0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001反码1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1110补码 = 反码 + 11111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 -1 左移29位溢出位丢弃,空余位补0得到下面的结果1110 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000
因此 RUNNING高三位为 111,同样方式可计算 SHUTDOWN :000 STOP:001``TIDYING: 010``TERMINATED:011 // 获取线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // CAPACITY 从上面可以得知为 1111…111 (29 bit),取反后就是 111 0000000(29个 0) // c & 上面的结果 就可以获取到高三位,而后29位全部为 0 // 获取线程池有效线程数量 private static int workerCountOf(int c) { return c & CAPACITY; } // CAPACITY 为 000 111111(29个1) // c & CAPACITY, 就可以获取变量c的低29位的值 // 获取上面提到的32位int类型的数值 // rs 为 线程状态, wc 表示 线程数量 private static int ctlOf(int rs, int wc) { return rs | wc; }
星辰大海也能黑?某信公关被抓现行,小米或将采取行动互联网一直都不是随便发表言论的,今天就有一家公司恶意抹黑小米电视。原因是这样的,某信煽动写手发文抹黑小米,说解读海报,实则讽刺小米的星辰大海的目标。而且某信的公关还被抓了个现行,这
我,13年专卖创业失败废品,年销售过亿作者丨陈晓编辑丨房煜图源丨摄图网第二树创业还是要谨慎些,要么顺应国家政策和大形势,要么在大平台有一定的积累和人脉,再去创业。因为我见过太多的九死一生了。第二树董事长吴海卡对创业邦说
人民更需要抖音神曲过滤器图片来源视觉中国文道总有理你有邀请码吗?去年,一个社交软件突然爆火,因为一码难求,圈内人在朋友圈悠闲地晒出邀请码,圈外人则挤破头地想进来,Clubhouse俨然成了一块辨别上流人士
上门按摩体验感怎么样?大家好,我用我的亲身经历,来和大家分享下,上门按摩的体验如何?现在如果想要找上门按摩主要有三个途径。第一。就是经常去的按摩店,然后和里面的技师比较熟悉了,彼此有一定了解,然后由于一
玩树莓派能学到什么?树莓派(raspberrypi)是一个微型电脑,拥有完善的图形操作系统,它诞生的意义就是普及编程,它可以做的事很多。题目问树莓派能学到什么,倒不如问树莓派能做什么?因为你做到的才是
买TWS一定要和手机选同品牌吗?近些年来,体积小携带方便没有束缚的真无线(TWS)蓝牙耳机成为了市场上最受欢迎的音频设备。但如今的TWS市场选择众多,单就品牌数量而言都可谓百花齐放,除了手机本品牌的配套产品外,也
无损音乐播放器哪个好?谢谢邀请。这个问题有点模糊。无损音乐播放器,可以理解成,无损音乐,音乐播放器。无损音乐,指的是音源,音源分几个当次,流畅,高品,超品,无损。无损音乐的音质最好。音乐播放器,一是音乐
你觉得高德地图百度地图腾讯地图哪个比较准确?高德地图我认为最好用这个嘛,我跑过一万多公里的高速,告诉你实际体验。高德是哪条路最短导哪条,你要做好走烂路的准备。在呼市有一天晚上被导航引到坑里(在修路),幸好是suv,不然出不来
甘肃省今年新能源汽车领域全面增长我省今年新能源汽车领域全面增长每日甘肃网兰州讯(新甘肃甘肃日报记者王占东)记者近日从国网甘肃综合能源服务有限公司获悉,今年以来,随着我国有关鼓励新能源产业发展政策的落地,我省新能源
直播科技艺术家2022三星家电新品发布会科技艺术家2022三星家电新品发布会直播时间2022年4月7日19302200直播简介用科技为艺术加冕,让艺术融入生活,让家拥有与众不同的质感与气息,一起把家居环境装点得更和谐。活
新势力3月销量出炉市场竞争越发激烈,头部品牌先发优势缩小近日,新能源品牌纷纷发布了3月交付情况。小鹏汽车哪吒汽车理想汽车和零跑汽车3月交付量均破万,广汽埃安首次突破2万台,蔚来汽车增速依然处在相对低位。从目前公布的数据来看,中国新能源市