前几天 老板突然过来说,系统对账越来越慢了,能不能优化下,业务过程还是很简单的,用户在商城下单,生产电子订单,保存在订单库。之后会生成物流派送订单,为了防止漏送或者重复派送还需要每天对账。 对账系统的处理逻辑很简单,首先查询订单库,然后查询派送库,之后对比订单和派送订单,将差异写入差异库。 核心代码经过抽象后,如下: while(存在未对账订单){ // 查询未对账订单 pos = getPOrders(); // 查询派送单 dos = getDOrders(); // 执行对账操作 diff = check(pos, dos); // 差异写入差异库 save(diff); } 利用并行优化对账系统 要优化性能,首先要想到这个系统的瓶颈所在。 目前的对账系统,由于订单量和派送单量巨大,所以查询未对账订单 getPOrders() 和查询派送单 getDOrders() 相对较慢,那有没有办法快速优化一下呢?目前对账系统是单线程执行的,图形化后是下图这个样子。对于串行化的系统, 优化性能首先想到的是能否利用多线程并行处理 。 所以你可以看到这里对账系统的瓶颈。查询未对账订单 getPOrders() 和查询派送单 getDOrders() 是否可以并行处理呢?显然是可以的,因为这两个操作并没有先后顺序的依赖。这两个最耗时的操作并行之后,执行过程如下图所示。对比一下单线程的执行示意图,你会发现同等时间里,并行执行的吞吐量近乎单线程的 2 倍,优化效果还是相对明显的。 思路有了,那么代码我们如何实现呢?我们创建了两个线程 T1 和 T2,并行执行查询未对账订单 getPOrders() 和查询派送单 getDOrders() 这两个操作。在主线程中执行对账操作 check() 和差异写入 save() 两个操作。不过需要注意的是:主线程需要等待线程 T1 和 T2 执行完才能执行 check() 和 save() 这两个操作,为此我们通过调用 T1.join() 和 T2.join() 来实现等待,当 T1 和 T2 线程退出时,调用 T1.join() 和 T2.join() 的主线程就会从阻塞态被唤醒,从而执行之后的 check() 和 save()。 while(存在未对账订单){ // 查询未对账订单 Thread T1 = new Thread(()->{ pos = getPOrders(); }); T1.start(); // 查询派送单 Thread T2 = new Thread(()->{ dos = getDOrders(); }); T2.start(); // 等待T1、T2结束 T1.join(); T2.join(); // 执行对账操作 diff = check(pos, dos); // 差异写入差异库 save(diff); } 用 CountDownLatch 实现线程等待 经过上面的优化,基本可以跟老板汇报工作了,但是还是有点不足,while循环里面每次都是创建新线程,而创建线程是个耗时的动作,所以最好就是创建好的线程循环使用,这是你估计想到线程池了,是的,线程池可以解决这个问题。 下面的代码就是用线程池进行了优化: // 创建2个线程的线程池 Executor executor = Executors.newFixedThreadPool(2); while(存在未对账订单){ // 查询未对账订单 executor.execute(()-> { pos = getPOrders(); }); // 查询派送单 executor.execute(()-> { dos = getDOrders(); }); /* ??如何实现等待??*/ // 执行对账操作 diff = check(pos, dos); // 差异写入差异库 save(diff); } 前面线程通过调用线程t1 t2 join()方法来等待线程,但是在线程池里,线程根本就不会退出,所以join()方法失效了。那么如何解决这个问题呢?最直接的办法就是设置一个计数器,第一个操作完成计数器-1 ,第二个操作完成计数器-1 ,主线程等待计数器为 0 。等待计数器为0 其实就是一个条件变量,管程也可以实现。 Java 中已经提供了类似的功能, CountDownLatch ,实现代码如下: // 创建2个线程的线程池 Executor executor = Executors.newFixedThreadPool(2); while(存在未对账订单){ // 计数器初始化为2 CountDownLatch latch = new CountDownLatch(2); // 查询未对账订单 executor.execute(()-> { pos = getPOrders(); latch.countDown(); }); // 查询派送单 executor.execute(()-> { dos = getDOrders(); latch.countDown(); }); // 等待两个查询操作结束 latch.await(); // 执行对账操作 diff = check(pos, dos); // 差异写入差异库 save(diff); }进一步优化性能 经过上面优化 长舒了一口气,终于可以交付了。那么还有没有其他可以优化的地方呢?仔细一看还是有的。 前面我们只是把 两个查询操作并行了,那么check的动作是不是可以并行呢?当然了…… 对账的查询依赖于两次查询的结果,明显有点生产者 - 消费者的意思。为此 我们设计两个队列,一个订单队列,一个派送队列。 下面再来看如何用双队列来实现完全的并行。一个最直接的想法是:一个线程 T1 执行订单的查询工作,一个线程 T2 执行派送单的查询工作,当线程 T1 和 T2 都各自生产完 1 条数据的时候,通知线程 T3 执行对账操作。这个想法虽看上去简单,但其实还隐藏着一个条件,那就是线程 T1 和线程 T2 的工作要步调一致,不能一个跑得太快,一个跑得太慢,只有这样才能做到各自生产完 1 条数据的时候,通知线程 T3。 下面这幅图形象地描述了上面的意图:线程 T1 和线程 T2 只有都生产完 1 条数据的时候,才能一起向下执行,也就是说,线程 T1 和线程 T2 要互相等待,步调要一致;同时当线程 T1 和 T2 都生产完一条数据的时候,还要能够通知线程 T3 执行对账操作。 用 CyclicBarrier 实现线程同步 下面我们来实现以上方案,这个方案有两个难点,一个是 线程 T1 和 T2 要做到步调一致,另一个是要能够通知到线程 T3。 同样,还是建议你不要在实际项目中这么做,因为 Java 并发包里也已经提供了相关的工具类:CyclicBarrier。在下面的代码中,我们首先创建了一个计数器初始值为 2 的 CyclicBarrier,你需要注意的是创建 CyclicBarrier 的时候,我们还传入了一个回调函数,当计数器减到 0 的时候,会调用这个回调函数。 CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,这两个工具类用法的区别在这里还是有必要再强调一下: CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而 CyclicBarrier 是一组线程之间互相等待 ,更像是几个驴友之间不离不弃。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但 CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。 本章的示例代码中有两处用到了线程池,你现在只需要大概了解即可,因为线程池相关的知识咱们专栏后面还会有详细介绍。另外,线程池提供了 Future 特性,我们也可以利用 Future 特性来实现线程之间的等待,这个后面我们也会详细介绍。