池化技术学习
池化技术学习
参考文章:https://www.jianshu.com/p/b47222f89c2f
其实看上面那一篇文章就会对池化技术有一个大概的了解,先不要急着离开去看上面那篇(✿◕‿◕✿)。 前言
池化技术提出来已经很久了,我们在实际工作中都使用过。什么数据库连接池啊,线程池啊等等。相信都听过也用过,但是池化技术是如何实现的。估计就有一些人没有看过。这里就有我,虽然一直想看,但是没时间(其实就是懒。好吧现在来还债了)。
我们先从一个实际生活的场景来看。毕竟艺术源于生活,需求来自生活,灵感来自生活。
小明家有三口人,平时吃饭的时候都是四副碗筷(多一副备用)。过年了,家里来了客人。很热闹。现在家里的人数是10个人。这个时候小明在准备吃饭前就要多拿7副碗筷(这里就不要提公筷啥的了)。有的人吃得快就先吃完了,并且把碗筷洗干净了。这个时候又来了几个客人,这个时候小明就将刚才洗干净的碗筷发给新来的客人(饭菜是足够吃到地老天荒)。新来的客人还在吃,结果又来了一波客人(小明很受欢迎)。小明去取了新的碗筷来,但是还是有人没有拿到碗筷。那这几个人就只能等待其他人吃完。但是此时小明家里已经没有碗筷了,于是新来的客人小明也只能拒绝了。当客人都走了,家里又剩下小明家的三口人,小明将多余的碗筷收起来。
例子不太恰当凑合用。 从JAVA线程池来看池化技术
这里看 JDK1.8 的线程池
ThreadPoolExecutor : ThreadFactory threadFactory :负责创建新的线程HashSet works : 保存当前所有的Worker(对thread的包装)BlockingQueue workQueue : 当前的corePoolSize达到的时候,新提交的任务保存在这里int corePoolSize : 核心Worker数量int maxPoolSize : 最大的Worker数量
线程池的常用调用方式: ThreadPoolUtils.getThreadPool().execute(() -> { });
这里我们看下 execute() 方法(这里同上面的博客):// 家里来客人啦,现在的碗筷都够 addWorker(command, true); // 记下来从现有的碗筷中拿一副碗筷 new Worker(firstTask); workers.add(w); t = w.thread; // 拿碗筷 t.start();
当前worker数量大于等于corePoolSize的时候把任务添加到workQueue (碗筷不够了,要增加碗筷来) workQueue.offer(command); // 当前worker数量超过了workQueue的capacity的时候创建新的线程并用这个线程执行提交的任务 // 这里注意addWorker的第二个参数为false addWorker(command, false); // 内部使用逻辑: if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; new Worker(firstTask); workers.add(w); t = w.thread; t.start();
当前worker数量大于maxPoolSize的时候执行拒绝策略:(没有可以使用的碗筷了,在来的客人都拒之门外) reject(command);
线程重复利用以及回收
Worker核类心属性及方法: Thread thread : 用于执行任务的线程Runnable firstTask : 提交时候的任务Worker(Runnable firstTask) : 创建一个Workervoid run() : 启动thread执行任务
Worker(Runnable firstTask)代码片段 通过构造方法调用threadFactory创建新的线程 Worker(Runnable firstTask) { // 标记碗筷使用状态 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
run()代码片段 直接调用 ThreadPoolExecutor 的 runWorker(this)方法 while (task != null || (task = getTask()) != null) { beforeExecute(wt, task); task.run(); afterExecute(task, thrown); 线程执行抛出的一些异常处理 } processWorkerExit(w, completedAbruptly); 从works移除work
getTask()代码片段如下: boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); druid
DruidDataSource核心类属性及方法: DruidConnectionHolder[] connections : 保存空闲的连接int poolingCount :当前池内资源对象的计算DruidConnectionHolder[] evictConnections :要被移除的连接ReentrantLock lock : 重入锁保证connections数组的安全访问Condition notEmpty : 空闲连接全部被使用等待其他客户释放链接,当poolingCount为0的时候await,归还连接的时候signal。Condition empty : 创建连接的线程里面控制,当poolingCount为0的时候signal创建连接,当前active的连接超过maxActive进行await。DruidPooledConnection getConnectionInternal(long maxWait) : 获取一个空闲连接recycle(DruidPooledConnection pooledConnection) : 回收连接
创建连接 DruidDataSource: init -> CreateConnectionThread.run -> createPhysicalConnection
从数组获取一个空闲连接 DruidDataSource: getConnection(long maxWaitMillis) -> getConnectionInternal(maxWaitMillis) -> pollLast(nanos) -> DruidConnectionHolder last = connections[poolingCount] 这里可以看到每次都是获取数组的最后一个元素
归还连接 DruidPooledConnection: DruidPooledConnection 实现javax.sql.PooledConnection;这个方法在框架(mybatis)里面会执行sql操作然后在finally代码块执行 javax.sql.PooledConnection.close() close() -> recycle() -> dataSource.recycle(this) -> putLast(holder, lastActiveTimeMillis) -> connections[poolingCount] = e
关闭过期的连接 DruidDataSource: 在shrink方法内部会判断idleTime是否满足条件 init -> createAndStartDestroyThread() -> run -> shrink(true, keepAlive) -> evictConnections[evictCount++] = connection -> close() 注意这里的close和上面归还连接的close是不同的,这里是物理关闭 总结:
池化技术有一下几个特点。 有公共资源的集合。 资源的获取、释放、回收、销毁。 资源状态的监控 拒绝策略 过期策略