

  ThreadPoolExecutor先抛出几个问题线程池参数有哪些都是什么意思 线程池状态 线程池如何保证核心线程不死亡 线程池执行过程 线程池有什么好处 常见的线程池以及使用场景
  public static void main(String[] args) {     BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);     ThreadFactory poolName = new ThreadFactoryBuilder().setNameFormat("handsame-guy-%d").build();     ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 1,                                                          TimeUnit.MINUTES, blockingQueue, poolName);     // 开心的话这里可以加一个随机休眠     executor.execute(() -> {         System.out.println("早上起来不要照镜子,不然会被自己帅死");     });  }
  先看构造:其实比较简单就是构造一些属性 /**  * Creates a new {@code ThreadPoolExecutor} with the given initial  * parameters.  *  * @param corePoolSize the number of threads to keep in the pool, even  *        if they are idle, unless {@code allowCoreThreadTimeOut} is set  * @param maximumPoolSize the maximum number of threads to allow in the  *        pool  * @param keepAliveTime when the number of threads is greater than  *        the core, this is the maximum time that excess idle threads  *        will wait for new tasks before terminating.  * @param unit the time unit for the {@code keepAliveTime} argument  * @param workQueue the queue to use for holding tasks before they are  *        executed.  This queue will hold only the {@code Runnable}  *        tasks submitted by the {@code execute} method.  * @param threadFactory the factory to use when the executor  *        creates a new thread  * @param handler the handler to use when execution is blocked  *        because the thread bounds and queue capacities are reached  * @throws IllegalArgumentException if one of the following holds:
  *         {@code corePoolSize < 0}
  *         {@code keepAliveTime < 0}
  *         {@code maximumPoolSize <= 0}
  *         {@code maximumPoolSize < corePoolSize}  * @throws NullPointerException if {@code workQueue}  *         or {@code threadFactory} or {@code handler} is null  */ /* corePoolSize    核心线程数 maximumPoolSize 最大线程数 keepAliveTime   空闲线程的最大超时时间 unit            超时时间的单位 workQueue       工作队列,保存未执行的Runnable 任务 threadFactory   创建线程的工厂类 handler         拒绝策略 */ public ThreadPoolExecutor(int corePoolSize,                           int maximumPoolSize,                           long keepAliveTime,                           TimeUnit unit,                           BlockingQueue workQueue,                           ThreadFactory threadFactory,                           RejectedExecutionHandler handler) {     if (corePoolSize < 0 ||         maximumPoolSize <= 0 ||         maximumPoolSize < corePoolSize ||         keepAliveTime < 0)         throw new IllegalArgumentException();     if (workQueue == null || threadFactory == null || handler == null)         throw new NullPointerException();     this.corePoolSize = corePoolSize;     this.maximumPoolSize = maximumPoolSize;     this.workQueue = workQueue;     this.keepAliveTime = unit.toNanos(keepAliveTime);     this.threadFactory = threadFactory;     this.handler = handler; }执行
  来看执行,执行有2个方法一个是 execute()另一个是submit()。 先看 execute() public void execute(Runnable command) {     if (command == null)         throw new NullPointerException();     /*      * Proceed in 3 steps:        分为3个步骤            * 1. If fewer than corePoolSize threads are running, try to      * start a new thread with the given command as its first      * task.  The call to addWorker atomically checks runState and      * workerCount, and so prevents false alarms that would add      * threads when it shouldn"t, by returning false.        1、当运行的线程数少于核心线程数的时候,尝试为当前的任务启动一个新的线程。           通过调用 addWorker 自动检查runState和 workerCount,防止可能增加的           错误警报当它不应该线程,返回false。            * 2. If a task can be successfully queued, then we still need      * to double-check whether we should have added a thread      * (because existing ones died since last checking) or that      * the pool shut down since entry into this method. So we      * recheck state and if necessary roll back the enqueuing if      * stopped, or start a new thread if there are none.        2、如果任务成功的排队,我们仍然需要做双重检查这个线程是否需要被加入。           池关闭后进入此方法。所以我们重新检查状态,如果有必要回滚排队停止,           或启动一个新线程            * 3. If we cannot queue task, then we try to add a new      * thread.  If it fails, we know we are shut down or saturated      * and so reject the task.        3、如果新的新城不能排队任务,尝试添加一个新的线程。如果它失败了,        关闭或饱和的所以拒绝这个任务。      */     int c = ctl.get();     if (workerCountOf(c) < corePoolSize) {         // 1、         if (addWorker(command, true))             return;         c = ctl.get();     }     // 2、     if (isRunning(c) && workQueue.offer(command)) {         int recheck = ctl.get();         if (! isRunning(recheck) && remove(command))             reject(command);         else if (workerCountOf(recheck) == 0)             addWorker(null, false);     }     // 3、     else if (!addWorker(command, false))         reject(command); }
  addWorker() /* firstTask 任务 core 是否核心线程 */ private boolean addWorker(Runnable firstTask, boolean core) {     // 1、自旋,确定是否可以创建worker。可以则跳出循环继续操作,否则返回false     retry:     for (;;) {         int c = ctl.get();         int rs = runStateOf(c);          // Check if queue empty only if necessary.         // 检查线程池状态及队列是否空         if (rs >= SHUTDOWN &&             ! (rs == SHUTDOWN &&                firstTask == null &&                ! workQueue.isEmpty()))             return false;                  //          for (;;) {             int wc = workerCountOf(c);             if (wc >= CAPACITY ||                 wc >= (core ? corePoolSize : maximumPoolSize))                 return false;             // CAS增长workerCount,成功则跳出循环             if (compareAndIncrementWorkerCount(c))                 break retry;             // 重新获取ctl, 状态改变则继续外层循环,否则在内层循环             c = ctl.get();  // Re-read ctl             if (runStateOf(c) != rs)                 continue retry;             // else CAS failed due to workerCount change; retry inner loop         }     }      // 2、创建worker,这部分使用ReentrantLock锁     boolean workerStarted = false;     boolean workerAdded = false;     Worker w = null;     try {         w = new Worker(firstTask);         final Thread t = w.thread;         if (t != null) {             final ReentrantLock mainLock = this.mainLock;             mainLock.lock();             try {                 // Recheck while holding lock.                 // Back out on ThreadFactory failure or if                 // shut down before lock acquired.                 // 获取到锁以后仍需要二次检查ctl,                 // 上一个获取到锁处理的线程可能会改变runState                 //  如 ThreadFactory 创建失败 或线程池被 shut down等                 int rs = runStateOf(ctl.get());                  if (rs < SHUTDOWN ||                     (rs == SHUTDOWN && firstTask == null)) {                     if (t.isAlive()) // precheck that t is startable                         throw new IllegalThreadStateException();                     // 加入工作队列                     workers.add(w);                     int s = workers.size();                     if (s > largestPoolSize)                         largestPoolSize = s;                     workerAdded = true;                 }             } finally {                 // 释放锁                 mainLock.unlock();             }             // 启动线程             if (workerAdded) {                 t.start();                 // 更改标志位                 workerStarted = true;             }         }     } finally {         // 启动失败,处理启动失败线程。         if (! workerStarted)             addWorkerFailed(w);     }     return workerStarted; }
  Worker是ThreadPoolExecutor的内部类,实现了 AQS 并继承了 Runnable。 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{     /**      * This class will never be serialized, but we provide a      * serialVersionUID to suppress a javac warning.      */     private static final long serialVersionUID = 6138294804551838833L;      /** Thread this worker is running in.  Null if factory fails.          每个worker有自己的内部线程,ThreadFactory创建失败时是null     */     final Thread thread;     /** Initial task to run.  Possibly null. */     Runnable firstTask;     /** Per-thread task counter  完成的任务数 */     volatile long completedTasks;      /**      * Creates with given first task and thread from ThreadFactory.      * @param firstTask the first task (null if none)      */     Worker(Runnable firstTask) {         // inhibit interrupts until runWorker 禁止线程在启动前被打断         setState(-1);          this.firstTask = firstTask;         this.thread = getThreadFactory().newThread(this);     }      /** Delegates main run loop to outer runWorker         run方法     */     public void run() {         runWorker(this);     }      // Lock methods     //     // The value 0 represents the unlocked state.     // The value 1 represents the locked state.     // state = 0 代表未锁;state = 1 代表已锁     protected boolean isHeldExclusively() {         return getState() != 0;     }      protected boolean tryAcquire(int unused) {         if (compareAndSetState(0, 1)) {             setExclusiveOwnerThread(Thread.currentThread());             return true;         }         return false;     }      protected boolean tryRelease(int unused) {         setExclusiveOwnerThread(null);         setState(0);         return true;     }      public void lock()        { acquire(1); }     public boolean tryLock()  { return tryAcquire(1); }     public void unlock()      { release(1); }     public boolean isLocked() { return isHeldExclusively(); }     // 终端启动的线程     void interruptIfStarted() {         Thread t;         if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {             try {                 t.interrupt();             } catch (SecurityException ignore) {             }         }     } }
  worker实现了简单的  非重入互斥锁 。worker 实现锁的目的与传统锁的意义不太一样。其主要是为了控制线程是否可中断
  再来看下 worker的核心run方法 final void runWorker(Worker w) {     Thread wt = Thread.currentThread();     Runnable task = w.firstTask;     w.firstTask = null;     w.unlock(); // allow interrupts     boolean completedAbruptly = true;     try {         // 循环直到 task = null (线程池关闭、超时等)         // 注意这里的 getTask() 方法,我们配置的阻塞队列会在这里起作用         while (task != null || (task = getTask()) != null) {             // 执行前先加锁             w.lock();             // If pool is stopping, ensure thread is interrupted;             // if not, ensure thread is not interrupted.  This             // requires a recheck in second case to deal with             // shutdownNow race while clearing interrupt             // 为了确保线程池运行/停止的时候线程运行/中断,需要在第二种情况下进行重新获取ctl             if ((runStateAtLeast(ctl.get(), STOP) ||                  (Thread.interrupted() &&                   runStateAtLeast(ctl.get(), STOP))) &&                 !wt.isInterrupted())                 wt.interrupt();             try {                 beforeExecute(wt, task);                 Throwable thrown = null;                 try {                     // 执行真正的run方法                     task.run();                 } catch (RuntimeException x) {                     thrown = x; throw x;                 } catch (Error x) {                     thrown = x; throw x;                 } catch (Throwable x) {                     thrown = x; throw new Error(x);                 } finally {                     afterExecute(task, thrown);                 }             } finally {                 task = null;                 w.completedTasks++;                 w.unlock();             }         }         completedAbruptly = false;     } finally {         // 线程退出工作         processWorkerExit(w, completedAbruptly);     } }
  当没有任务的时候runWorker()方法会循环getTask() 这个方法可能会阻塞 private Runnable getTask() {     boolean timedOut = false; // Did the last poll() time out?      for (;;) {         int c = ctl.get();         int rs = runStateOf(c);          // Check if queue empty only if necessary.         // 检查是否还继续处理任务         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {             decrementWorkerCount();             return null;         }          int wc = workerCountOf(c);          // Are workers subject to culling?  获取是否允许超市         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;          if ((wc > maximumPoolSize || (timed && timedOut))             && (wc > 1 || workQueue.isEmpty())) {             if (compareAndDecrementWorkerCount(c))                 return null;             continue;         }          try {             Runnable r = timed ?                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                 workQueue.take();             if (r != null)                 return r;             timedOut = true;         } catch (InterruptedException retry) {             timedOut = false;         }     } }回答疑问线程池参数有哪些都是什么意思corePoolSize 核心线程数 maximumPoolSize 最大线程数 keepAliveTime 空闲线程的最大超时时间 unit 超时时间的单位 workQueue 工作队列,保存未执行的Runnable 任务 threadFactory 创建线程的工厂类 handler 拒绝策略 线程池如何保证核心线程不死亡runWorker方法中调用了 getTask方法,这个方法会阻塞。 线程池执行过程
  线程池有什么好处1、节省资源、提高效率。线程的创建和销毁相对都是比较消耗资源,利用线程池可以在需要的时候直接获取一个线程这样能尽可能的避免了重复的创建和销毁线程。 2、方便管理 可以编写线程池管理代码对池中的线程统一进行管理,避免因无休止的创建线程导致系统崩溃。 线程池状态()RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED
  下面这两个问题算是留尾巴吧,后面再抽时间来看 常见的线程池以及使用场景 线程池的状态的流转 扩展
  在阅读源码的时候其实能看到一些变量定义初始化以及状态的判断都是用位运算判断的。比如 // 不同系统这里可能不一样,这里假设是32 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 线程个数的表示 [位数](不同平台 int 类型范围不一样)   // 不管int是多少位,反正高三位 就是表示线程状态,剩余的位数表示线程数量 // 线程数量 private static final int COUNT_BITS = Integer.SIZE - 3; // 线程最大数量 private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 高三位表示 线程池状态  // 运行状态 private static final int RUNNING    = -1 << COUNT_BITS; private static final int SHUTDOWN   =  0 << COUNT_BITS; private static final int STOP       =  1 << COUNT_BITS; private static final int TIDYING    =  2 << COUNT_BITS; private static final int TERMINATED =  3 << COUNT_BITS; // 获取线程池状态 private static int runStateOf(int c)     { return c & ~CAPACITY; } // 获取线程池有效线程数量 private static int workerCountOf(int c)  { return c & CAPACITY; } // 获取上面提到的32位int类型的数值 private static int ctlOf(int rs, int wc) { return rs | wc; }
  这里拿RUNNING状态具体,看下具体是怎么算的。RUNNING = -1 << COUNT_BITS
  这里需要假设一下 Integer.SIZE =32 => COUNT_BITS = 29
  那么 RUNNING = -1 << COUNT_BITS 表示 -1二进制左移29位。 -1的二进制是多少在计算机中,负数以其正值的补码形式表达。 补码 = 反码 + 11的源码0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001反码1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1110补码 = 反码 + 11111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 -1 左移29位溢出位丢弃,空余位补0得到下面的结果1110 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000
  因此 RUNNING高三位为 111,同样方式可计算 SHUTDOWN :000 STOP:001``TIDYING: 010``TERMINATED:011 // 获取线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // CAPACITY 从上面可以得知为 1111…111 (29 bit),取反后就是 111 0000000(29个 0) // c & 上面的结果 就可以获取到高三位,而后29位全部为 0  // 获取线程池有效线程数量 private static int workerCountOf(int c)  { return c & CAPACITY; } // CAPACITY 为 000 111111(29个1) //  c & CAPACITY, 就可以获取变量c的低29位的值  // 获取上面提到的32位int类型的数值 // rs 为 线程状态, wc 表示 线程数量 private static int ctlOf(int rs, int wc) { return rs | wc; }
