前言 在Java开发的web项目中,我们经常会遇到接口响应耗时过长,或者定时任务处理过慢,那在Java中最常见的解决方法就是并行了,想必大家也都不陌生了。 今天的分享主要带大家从一个实际的串行场景出发,如何一步步优化,同时也会分享在Java中实现并行处理的多种方式,以及它们之间的区别和优缺点,通过对比总结更加深入的了解并且使用Java中并发编程的相关技术。一个串行调用的例子 现在我们有一个查询carrier下所有Load的接口,它需要查询Loads信息、Instruction信息、Stops信息、Actions信息后然后组装数据。privateListLoadgetHydratedLoads(OptionalPageablepageable,Stringpredicate,ListObjectparams){1。耗时3秒ListLoadloadsexecuteQuery(查询Loads列表);2。耗时4秒ListInstructioninstructionsexecuteQuery(查询instructions列表);3。耗时2秒ListStopstopsexecuteQuery(查询stops列表);4。耗时3秒ListactionsexecuteQuery(查询actions列表);MultimapString,InstructioninstructionsByLoadIdindex(instructions,ii。getLoad()。getId());MultimapString,StopstopsByLoadIdindex(stops,ss。getLoad()。getId());MultimapString,ActionactionsByStopIdindex(actions,aa。getStop()。getId());数据处理handle(loads,instructions,stops,actions);returnloads;}上面实现中查询Load、Instruction、Stop、Action等信息是串行的,那串行的系统要做性能优化很常见的就是利用多线程并行了。 这种相互之间没有影响的任务,利用并行处理后耗时就可以优化为4s。并行调用实现的几种方式 因为项目中多线程都用线程池,所以Thread。join()这种方式就不演示了。1。FutureCallable Future接口在Java5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操作把调用线程解放出来,让调用线程能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。 因为我们都是需要获取任务的返回值的,所以大家肯定想到是用FutureCallable来做。 ThreadPoolExecutor提供了3个submit方法支持我们需要获取任务执行结果的需求。TFutureTsubmit(CallableTtask);TFutureTsubmit(Runnabletask,Tresult);Futurelt;?submit(Runnabletask); 简单介绍下这三个submit方法:提交Runnable任务submit(Runnabletask),这个方法入参是Runnable接口,它只有一个run()方法没有返回值,所以它返回的Future只能用来判断任务是否结束;提交Callable任务submit(Callabletask),它的入参是Callable接口,只有一个call()方法,是有返回值的,所以可以获取任务执行结果;提交Runnable任务及结果引用submit(Runnabletask,Tresult),这个方法返回的Future,调用get()方法的返回值就是传入的result对象,一般用法就是实现Runnable接口时,声明一个有参构造函数,将result传进去,result相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。 这三个方法的返回值都是Future接口,Future提供了5个方法: 分别是取消任务的方法cancel()、判断任务是否已取消的方法isCancelled()、判断任务是否已结束的方法isDone()以及2个获得任务执行结果的get()和get(timeout,unit),其中最后一个get(timeout,unit)支持超时机制。 需要注意的是:这两个get()方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用get()方法的线程会阻塞,直到任务执行完才会被唤醒。2。FutureTask实现并行调用 我们再介绍下FutureTask工具类,这是一个实实在在的工具类,有两个构造函数,和上面类似,一看就明白了。FutureTask(Callablecallable);FutureTask(Runnablerunnable,Vresult);publicFutureTask(Runnablerunnable,Vresult){this。callableExecutors。callable(runnable,result);this。stateNEW;ensurevisibilityofcallable}publicstaticTCallableTcallable(Runnabletask,Tresult){if(tasknull)thrownewNullPointerException();returnnewRunnableAdapterT(task,result);}privatestaticfinalclassRunnableAdapterTimplementsCallableT{privatefinalRunnabletask;privatefinalTresult;RunnableAdapter(Runnabletask,Tresult){this。tasktask;this。resultresult;}publicTcall(){task。run();returnresult;}publicStringtoString(){returnsuper。toString()〔Wrappedtasktask〕;}} 这个类实现了Runnable和Future接口,可以理解就是将任务和结果结合起来了,变成一个可以有响应结果的任务进行提交,本质上FutureTask里面封装的还是一个Callable接口,它实现可以有返回值就是因为它的run方法里面调用了Callable的call()方法,将结果赋值给result,然后返回。 下面我们看下如何优化我们上面的查询接口,实现并行查询:privateListLoadgetHydratedLoadsUsingFutureTask()throwsExecutionException,InterruptedException{ExecutorServiceexecutorServiceExecutors。newCachedThreadPool();FutureTaskListLoadqueryLoadFutureTasknewFutureTask(()executeQuery(sql1));executorService。submit(queryLoadFutureTask);FutureTaskListInstructionqueryInstructionFutureTasknewFutureTask(()executeQuery(sql2));executorService。submit(queryInstructionFutureTask);FutureTaskListStopqueryStopFutureTasknewFutureTask(()executeQuery(sql3));executorService。submit(queryStopFutureTask);FutureTaskListqueryActionFutureTasknewFutureTask(()executeQuery(sql4));executorService。submit(queryActionFutureTask);获取结果ListLoadloadsqueryLoadFutureTask。get();ListInstructioninstructionsqueryInstructionFutureTask。get();ListStopstopsqueryStopFutureTask。get();ListactionsqueryActionFutureTask。get();Wegotalltheentitiesweneed,sonowletsfillinalloftheirreferencestoeachother。handleData(loads,instructions,stops,actions);returnloads;} 那你可能会想到,如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,该怎么处理呢? 这种问题基本上也都可以用Future来解决,但是需要将对应的FutureTask传入到当前任务中,然后调用get()方法即可。 比如,我们创建了两个FutureTaskft1和ft2,ft1需要等待ft2执行完毕后才能做最后的数据处理,所以ft1内部需要引用ft2,并在执行数据处理前,调用ft2的get()方法实现等待。创建任务T2的FutureTaskFutureTaskStringft2newFutureTask(newT2Task());创建任务T1的FutureTaskFutureTaskStringft1newFutureTask(newT1Task(ft2));线程T1执行任务ft1ThreadT1newThread(ft1);T1。start();线程T2执行任务ft2ThreadT2newThread(ft2);T2。start();等待线程T1执行结果System。out。println(ft1。get());T1Task需要执行的任务:classT1TaskimplementsCallableString{FutureTaskStringft2;T1任务需要T2任务的FutureTaskT1Task(FutureTaskStringft2){this。ft2ft2;}OverrideStringcall()throwsException{获取T2线程结果Stringtfft2。get();return处理完的数据结果;}}T2Task需要执行的任务:classT2TaskimplementsCallableString{OverrideStringcall()throwsException{return检验查询数据;}} 通过这上面的的例子,我们明显的发现Future实现异步编程时的一些不足之处:Future对于结果的获取很不方便,只能通过get()方法阻塞或者轮询的方式得到任务的结果。阻塞的方式显然是效率低下的,轮询的方式又十分耗费CPU资源,如果前一个任务执行比较耗时的话,get()方法会阻塞,形成排队等待的情况。将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。等待Future集合中的所有任务都完成。仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。 我们很难表述Future结果之间的依赖性,从文字描述上这很简单。比如,下面文字描述的关系,如果用Future去实现时还是很复杂的。 比如:当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并 在JDK8中引入了CompletableFuture,对Future进行了改进,可以在定义CompletableFuture时传入回调对象,任务在完成或者异常时,自动回调,再也不需要每次主动通过Future去询问结果了,我们接着往下看。3。CompletableFuture Java在1。8版本提供了CompletableFuture来支持异步编程,CompletableFuture类实现了CompletionStage和Future接口,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过完成时回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。Callable,有结果的同步行为,比如做饭,就能产出一盘菜;Runnable,无结果的同步行为,比如吃饭,仅仅就是吃就完事了;Future,异步封装CallableRunnable,比如委托给你媳妇去做饭(其他线程);CompletableFuture,封装Future,使其拥有回调功能,比如让你媳妇做好饭了,主动告诉你做好了; 为了体会到CompletableFuture异步编程的优势,我们还是先用CompletableFuture重新实现前面的程序。publicstaticListLoadgetHydratedLoadsUsingCompletableFuture()throwsExecutionException,InterruptedException{ExecutorServiceexecutorServiceExecutors。newCachedThreadPool();try{任务1:查询loads列表CompletableFutureListLoadqueryLoadsCompletableFuture。supplyAsync(()executeQuery(sql1),executorService);任务2:查询instructions列表CompletableFutureListInstructionqueryInstructionsCompletableFuture。supplyAsync(()executeQuery(sql2),executorService);任务3:查询stops列表CompletableFutureListStopqueryStopsCompletableFuture。supplyAsync(()executeQuery(sql3),executorService);任务4:查询actions列表CompletableFutureListqueryActionsCompletableFuture。supplyAsync(()executeQuery(sql4),executorService);任务1,2,3,4执行完成后执行数据组装CompletableFutureVoidcombineFutureCompletableFuture。allOf(queryLoads,queryInstructions,queryStops,queryActions)。thenRun(()handleData(queryLoads。join(),queryInstructions。join(),queryStops。join(),queryActions。join()));System。out。println(Thread。currentThread()。getName():主线程执行到这里了);combineFuture。get();System。out。println(String。format(queryLoads:s,queryInstructions:s,queryStops:s,queryActions:s,queryLoads。isDone(),queryInstructions。isDone(),queryStops。isDone(),queryActions。isDone()));returnqueryLoads。get();}finally{executorService。shutdown();}} 通过上面的代码我们可以发现CompletableFuture有以下优势:无需手工维护线程,省去了手工提交任务到线程池这一步;语义更清晰,例如CompletableFuture。allOf(f1,f2,f3,f4)能够清晰地表述需要等指定的4个任务都完成才能执行后续的任务;代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。CompletableFuture解析1。CompletableFuture创建 CompletableFuture提供了四个静态方法来创建一个异步操作:publicstaticCompletableFutureVoidrunAsync(Runnablerunnable)publicstaticCompletableFutureVoidrunAsync(Runnablerunnable,Executorexecutor)publicstaticUCompletableFutureUsupplyAsync(SupplierUsupplier)publicstaticUCompletableFutureUsupplyAsync(SupplierUsupplier,Executorexecutor) 这四个方法区别在于:runAsync方法以Runnable函数式接口类型为参数,没有返回结果,supplyAsync方法以Supplier函数式接口类型为参数,返回结果类型为U;没有指定Executor的方法会使用ForkJoinPool。commonPool()作为它的线程池执行异步代码。如果指定了线程池,则使用指定的线程池运行。 ForkJoinPool是JDK7提供的,叫做分支合并框架。可以通过将一个任务递归分成很多分子任务,形成不同的流,进行并行执行,同时还伴随着强大的工作窃取算法,极大的提高效率,这个不属于今天我们讨论的点,感兴趣的话可以后面再聊。 注意:如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的IO操作,就会导致线程池中所有线程都阻塞在IO操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。 问题:为什么supplyAsync方法接收一个Supplier函数式接口类型参数而不是一个Callable类型的参数呢?FunctionalInterfacepublicinterfaceCallableV{Vcall()throwsException;}FunctionalInterfacepublicinterfaceSupplierT{Tget();} 看了接口定义,我们发现它们其实都是一个不接受任何参数类型的函数式接口,在实践中它们做的是相同的事情(定义一个业务逻辑去处理然后有返回值),但在原则上它们的目的是做不同的事情:从语义上来看Callable是返回结果的任务,而Supplier是结果的供应商。可以理解为Callable引用了一个未执行的工作单元,Supplier引用了一个未知的值。侧重点可能不一样,如果关心的是提供一个什么值而不关心具体做了啥工作使用Supplier感觉更合适。例如,ExecutorService与Callable一起工作,因为它的主要目的是执行工作单元。CompletableFuture使用Supplier,因为它只关心提供的值,而不太关心可能需要做多少工作。两个接口定义之间的一个基本区别是,Callable允许从其实现中抛出检查异常,而Supplier不允许。2。理解CompletionStage接口 通过接口的继承关系,我们可以发现这里的异步操作到底什么时候结束、结果如何获取,都可以通过Future接口来解决。 另外CompletableFuture类还实现了CompletionStage接口,这个接口就比较关键了,之所以能实现响应式编程,都是通过这个接口提供的方法。 下面介绍下CompletionStage接口,看字面意思可以理解为完成动作的一个阶段,官方注释文档:CompletionStage是一个可能执行异步计算的阶段,这个阶段会在另一个CompletionStage完成时调用去执行动作或者计算,一个CompletionStage会以正常完成或者中断的形式完成,并且它的完成会触发其他依赖的CompletionStage。CompletionStage接口的方法一般都返回新的CompletionStage,因此构成了链式的调用。 这个看完还是有点懵逼的,不清楚什么是CompletionStage? 在Java中什么是CompletionStage? 一个Function、Comsumer、Supplier或者Runnable都会被描述为一个CompletionStage。stage。thenApply(xsquare(x))。thenAccept(xSystem。out。print(x))。thenRun(()System。out。println())xsquare(x)就是一个Function类型的Stage,它返回了x。xSystem。out。println(x)就是一个Comsumer类型的Stage,用于接收上一个Stage的结果x。()System。out。println()就是一个Runnable类型的Stage,既不消耗结果也不产生结果。 但是CompletionStage这里面一共有40多个方法,我们该如何理解呢? CompletionStage接口可以清晰的描述任务之间的关系,可以分为顺序串行、并行、汇聚关系以及异常处理。串行关系 CompletionStage接口里面描述串行关系,主要是thenApply、thenAccept、thenRun和thenCompose这四个系列的接口。publicUCompletionStageUthenApply(Functionlt;?superT,?extendsUfn);publicUCompletionStageUthenApplyAsync(Functionlt;?superT,?extendsUfn);publicUCompletionStageUthenApplyAsync(Functionlt;?superT,?extendsUfn,Executorexecutor);publicCompletionStageVoidthenAccept(Consumerlt;?superTaction);publicCompletionStageVoidthenAcceptAsync(Consumerlt;?superTaction);publicCompletionStageVoidthenAcceptAsync(Consumerlt;?superTaction,Executorexecutor);publicCompletionStageVoidthenRun(Runnableaction);publicCompletionStageVoidthenRunAsync(Runnableaction);publicCompletionStageVoidthenRunAsync(Runnableaction,Executorexecutor);publicUCompletionStageUthenCompose(Functionlt;?superT,?extendsCompletionStageUfn);publicUCompletionStageUthenComposeAsync(Functionlt;?superT,?extendsCompletionStageUfn);publicUCompletionStageUthenComposeAsync(Functionlt;?superT,?extendsCompletionStageUfn,Executorexecutor);thenApply系列方法里参数fn的类型是接口Function,使用该函数处理上一个CompletableFuture调用的结果,并返回一个具有处理结果的CompletionStage对象,这个方法既能接收参数也支持返回值,可以理解为对于结果的转换;thenAccept系列方法里参数action的类型是接口Consumer,这个方法虽然支持参数,但却不支持回值,可以理解为对于结果的消费;thenRun系列方法里action的参数是Runnable,所以action既不能接收参数也不支持返回值,也是对于结果的一种消费,和thenAccept区别在于Runnable并不使用前一步CompletableFuture计算的结果;thenCompose的参数为一个返回CompletableFuture实例的函数,该函数的参数是先前计算步骤的结果,和thenApply执行结果类似,区别在于会生成一个新的CompletableFuture返回,也可以理解为对于结果的转换; thenApply()和thenCompose()的区别?thenApply转换的是泛型中的类型,是同一个CompletableFuture,thenCompose用来连接两个CompletableFuture,是生成一个新的CompletableFuture。他们都是让CompletableFuture可以对返回的结果进行后续操作,就像Stream一样进行map和flatMap的转换。publicstaticvoidmain(String〔〕args)throwsInterruptedException,ExecutionException{CompletableFutureStringfutureCompletableFuture。supplyAsync(()Hello);CompletableFutureStringresult1future。thenApply(paramparamWorld);CompletableFutureStringresult2future。thenCompose(paramCompletableFuture。supplyAsync(()paramWorld));System。out。println(result1。get());System。out。println(result2。get());} 这些方法里面Async代表的是异步执行fn、consumer或者action。CompletableFutureStringf0CompletableFuture。supplyAsync(()HelloWorld)。thenApply(ssQQ)。thenApply(String::toUpperCase);System。out。println(f0。join());输出结果HELLOWORLDQQ 可以看一下thenApply()方法是如何使用的。首先通过supplyAsync()启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务却是串行执行的,依赖的执行结果,依赖的执行结果。 CompletableFuture中thenApply如何实现?先看下静态创建CompletableFuture的方法supplyAsync;静态方法,如果没有传入线程池,使用ForkJoinPool的common线程池publicstaticUCompletableFutureUsupplyAsync(SupplierUsupplier){returnasyncSupplyStage(ASYNCPOOL,supplier);}staticUCompletableFutureUasyncSupplyStage(Executore,SupplierUf){if(fnull)thrownewNullPointerException();新建CompletableFuture对象CompletableFutureUdnewCompletableFutureU();构造AsyncSupply对象,线程池提交AsyncSupply任务e。execute(newAsyncSupplyU(d,f));将CompletableFuture对象返回returnd;}staticfinalclassAsyncSupplyTextendsForkJoinTaskVoid可以看到AsyncSupply是一个Runnable对象implementsRunnable,AsynchronousCompletionTask{CompletableFutureTdep;Supplierlt;?extendsTfn;AsyncSupply(CompletableFutureTdep,Supplierlt;?extendsTfn){this。depdep;this。fnfn;}publicfinalVoidgetRawResult(){returnnull;}publicfinalvoidsetRawResult(Voidv){}publicfinalbooleanexec(){run();returnfalse;}publicvoidrun(){CompletableFutureTd;Supplierlt;?extendsTf;if((ddep)!null(ffn)!null){depnull;fnnull;CompletableFuture对象的result为空时if(d。resultnull){try{调用传入的supplier的get方法,并将结果放入result字段注意:这是在线程池中提交的,所以是异步处理的d。completeValue(f。get());}catch(Throwableex){d。completeThrowable(ex);}}处理完当前方法后,处理依赖它的栈顶方法,后面的回调方法入栈和这块呼应d。postComplete();}}}finalvoidpostComplete(){变量f存储的是当前已经完成的CompletableFutureCompletableFuturelt;?fthis;Completionh;while((hf。stack)!null(f!this(h(fthis)。stack)!null)){CompletableFuturelt;?d;Completiont;CAS操作,将依赖此阶段的栈顶元素取出,并且设置为下一个if(STACK。compareAndSet(f,h,th。next)){if(t!null){if(f!this){如果f不是this,将刚出栈的h入this的栈顶pushStack(h);continue;}将h剥离出来,h。nextnull,帮助gcNEXT。compareAndSet(h,t,null);trytodetach}调用tryFiref(dh。tryFire(NESTED))null?this:d;}}}再看下异步处理完supplyAsync后的回调方法thenApply方法,看看它是如何实现回调的;publicUCompletableFutureUthenApply(Functionlt;?superT,?extendsUfn){returnuniApplyStage(null,fn);}privateVCompletableFutureVuniApplyStage(Executore,Functionlt;?superT,?extendsVf){if(fnull)thrownewNullPointerException();Objectr;如果当前阶段结果已经返回,则直接运行回调方法if((rresult)!null)returnuniApplyNow(r,e,f);CompletableFutureVdnewIncompleteFuture();构造Completion放入等待栈的顶unipush(newUniApplyT,V(e,d,this,f));returnd;}privateVCompletableFutureVuniApplyNow(Objectr,Executore,Functionlt;?superT,?extendsVf){Throwablex;CompletableFutureVdnewIncompleteFuture();如果依赖的方法异常中断,则直接处理并返回异常if(rinstanceofAltResult){if((x((AltResult)r)。ex)!null){d。resultencodeThrowable(x,r);returnd;}rnull;}try{执行到这里说明依赖的任务已经有结果了,用它的结果当作参数调用回调方法注意这里都是线程池中的线程在执行,所以是异步执行if(e!null){e。execute(newUniApplyT,V(null,d,this,f));}else{SuppressWarnings(unchecked)Tt(T)r;d。resultd。encodeValue(f。apply(t));}}catch(Throwableex){d。resultencodeThrowable(ex);}returnd;}finalvoidunipush(Completionc){if(c!null){CAS自旋将回调方法压入栈顶while(!tryPushStack(c)){if(result!null){NEXT。set(c,null);break;}}可能在重试中完成,判断result不为空就执行if(result!null)c。tryFire(SYNC);}}再次尝试判断依赖方法是否处理完成,处理完成则调用目标回调方法finalCompletableFutureVtryFire(intmode){CompletableFutureVd;CompletableFutureTa;Objectr;Throwablex;Functionlt;?superT,?extendsVf;if((asrc)null(ra。result)null(ddep)null(ffn)null)returnnull;tryComplete:if(d。resultnull){if(rinstanceofAltResult){if((x((AltResult)r)。ex)!null){d。completeThrowable(x,r);breaktryComplete;}rnull;}try{if(mode0!claim())returnnull;else{SuppressWarnings(unchecked)Tt(T)r;d。completeValue(f。apply(t));}}catch(Throwableex){d。completeThrowable(ex);}}srcnull;depnull;fnnull;成功处理完依赖方法和回调方法后进行处理,可能唤醒其他的回调方法或者清理栈returnd。postFire(a,mode);}描述AND汇聚关系 CompletionStage接口里面描述AND汇聚关系,主要是thenCombine、thenAcceptBoth和runAfterBoth系列的接口,这些接口的区别是源自fn、consumer、action这三个核心参数不同。publicU,VCompletionStageVthenCombine(CompletionStagelt;?extendsUother,BiFunctionlt;?superT,?superU,?extendsVfn);publicU,VCompletionStageVthenCombineAsync(CompletionStagelt;?extendsUother,BiFunctionlt;?superT,?superU,?extendsVfn);publicU,VCompletionStageVthenCombineAsync(CompletionStagelt;?extendsUother,BiFunctionlt;?superT,?superU,?extendsVfn,Executorexecutor);publicUCompletionStageVoidthenAcceptBoth(CompletionStagelt;?extendsUother,BiConsumerlt;?superT,?superUaction);publicUCompletionStageVoidthenAcceptBothAsync(CompletionStagelt;?extendsUother,BiConsumerlt;?superT,?superUaction);publicUCompletionStageVoidthenAcceptBothAsync(CompletionStagelt;?extendsUother,BiConsumerlt;?superT,?superUaction,Executorexecutor);publicCompletionStageVoidrunAfterBoth(CompletionStagelt;?other,Runnableaction);publicCompletionStageVoidrunAfterBothAsync(CompletionStagelt;?other,Runnableaction);publicCompletionStageVoidrunAfterBothAsync(CompletionStagelt;?other,Runnableaction,Executorexecutor); Async后缀的方法表示,前面的CompletionStage执行完成,在执行后续操作时会提交到线程池处理,否则就还是使用同一个处理线程完成CompletableFuture的所有任务。 这三种方法意思都是等两个CompletionStage都完成了计算才会执行下一步的操作,区别在于参数接口类型不一样。thenCombine参数接口类型为BiFunction,可以拿到前一步两个CompletionStage的运算结果,进行下一步处理,同时有返回值(转化操作);thenAcceptBoth参数接口类型为BiConsumer,也可以拿到前一步的运算结果进行下一步处理,但是无返回值(消费操作);runAfterBoth参数接口类型为Runnable,即不能获取到上一步的执行结果,也无返回值(不关心运行结果); CompletableFuture中thenAcceptBoth如何实现?talkischeap!!publicUCompletableFutureVoidthenAcceptBoth(CompletionStagelt;?extendsUother,BiConsumerlt;?superT,?superUaction){returnbiAcceptStage(null,other,action);}privateUCompletableFutureVoidbiAcceptStage(Executore,CompletionStageUo,BiConsumerlt;?superT,?superUf){CompletableFutureUb;Objectr,s;if(fnull(bo。toCompletableFuture())null)thrownewNullPointerException();CompletableFutureVoiddnewIncompleteFuture();如果两个阶段有任何一个没有执行完成,则将回调方法分别放到两个互相依赖阶段的栈顶if((rresult)null(sb。result)null)bipush(b,newBiAcceptT,U(e,d,this,b,f));elseif(enull)如果两个依赖的阶段都执行完成则调用回调方法d。biAccept(r,s,f,null);elsetry{e。execute(newBiAcceptT,U(null,d,this,b,f));}catch(Throwableex){d。resultencodeThrowable(ex);}returnd;}描述OR汇聚关系 OR的关系,表示谁运行快就用谁的结果执行下一步操作。publicUCompletionStageUapplyToEither(CompletionStagelt;?extendsTother,Functionlt;?superT,Ufn);publicUCompletionStageUapplyToEitherAsync(CompletionStagelt;?extendsTother,Functionlt;?superT,Ufn);publicUCompletionStageUapplyToEitherAsync(CompletionStagelt;?extendsTother,Functionlt;?superT,Ufn,Executorexecutor);publicCompletionStageVoidacceptEither(CompletionStagelt;?extendsTother,Consumerlt;?superTaction);publicCompletionStageVoidacceptEitherAsync(CompletionStagelt;?extendsTother,Consumerlt;?superTaction);publicCompletionStageVoidacceptEitherAsync(CompletionStagelt;?extendsTother,Consumerlt;?superTaction,Executorexecutor);publicCompletionStageVoidrunAfterEither(CompletionStagelt;?other,Runnableaction);publicCompletionStageVoidrunAfterEitherAsync(CompletionStagelt;?other,Runnableaction);publicCompletionStageVoidrunAfterEitherAsync(CompletionStagelt;?other,Runnableaction,Executorexecutor); 同样也是有Async后缀的表示,当前面的CompletionStage执行完成,在执行后续操作时会提交到线程池处理。applyToEither、acceptEither、runAfterEither三个方法的区别还是来自于不同的接口参数类型:Function、Consumer、Runnable。 CompletableFuture中applyToEither如何实现?publicUCompletableFutureUapplyToEitherAsync(CompletionStagelt;?extendsTother,Functionlt;?superT,Ufn){returnorApplyStage(defaultExecutor(),other,fn);}privateUextendsT,VCompletableFutureVorApplyStage(Executore,CompletionStageUo,Functionlt;?superT,?extendsVf){CompletableFutureUb;if(fnull(bo。toCompletableFuture())null)thrownewNullPointerException();Objectr;CompletableFuturelt;?extendsTz;这块是重点,有任何一个阶段的结果不为空就直接执行functionif((r(zthis)。result)!null(r(zb)。result)!null)returnz。uniApplyNow(r,e,f);CompletableFutureVdnewIncompleteFuture();如果都为空则将回调方法分别push到被依赖的两个阶段的栈顶orpush(b,newOrApplyT,U,V(e,d,this,b,f));returnd;}异常处理 在Java编程中,异常处理当然是必不可少的一环,那你可能会想到如果在使用CompletableFuture进行异步链式编程时,如果出现异常该怎么处理呢? 首先上面我们提到的fn、consumer、action它们的核心方法是不允许抛出可检查异常的,但是却无法限制它们抛出运行时异常。在同步方法中,我们可以使用trycatch{}来捕获并处理异常,但在异步编程里面异常该如何处理?CompletionStage接口给我们提供的方案非常简单,比trycatch{}还要简单。 下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。publicCompletableFutureTwhenComplete(BiConsumerlt;?superT,?superThrowableaction)publicCompletableFutureTwhenCompleteAsync(BiConsumerlt;?superT,?superThrowableaction)publicCompletableFutureTwhenCompleteAsync(BiConsumerlt;?superT,?superThrowableaction,Executorexecutor)publicCompletableFutureTexceptionally(FunctionThrowable,?extendsTfn)参数的类型是BiConsumerlt;?superT,?superThrowable,它可以处理正常的计算结果,或者异常情况,可以获取到上一步的执行结果作为参数;无论是否发生异常都会执行whenComplete()中的回调函数action;方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行;这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。CompletableFutureVoidfutureCompletableFuture。runAsync((){try{TimeUnit。SECONDS。sleep(1);}catch(InterruptedExceptione){}if(newRandom()。nextInt()20){inti120;}System。out。println(执行结束!);});future。whenComplete(newBiConsumerVoid,Throwable(){Overridepublicvoidaccept(Voidt,Throwableaction){System。out。println(执行完成!);}});future。exceptionally(newFunctionThrowable,Void(){OverridepublicVoidapply(Throwablet){System。out。println(执行失败:t。getMessage());returnnull;}})。join(); handle也是执行任务完成时对结果的处理,whenComplete()和handle()的区别在于whenComplete()不支持返回结果,而handle()是支持返回结果的。 当上一个的CompletableFuture的值计算完成或者抛出异常的时候,会触发handle方法中定义的函数,结果由BiFunction参数计算而得,因此这组方法兼有whenComplete和转换的两个功能。publicUCompletionStageUhandle(BiFunctionlt;?superT,Throwable,?extendsUfn);publicUCompletionStageUhandleAsync(BiFunctionlt;?superT,Throwable,?extendsUfn);publicUCompletionStageUhandleAsync(BiFunctionlt;?superT,Throwable,?extendsUfn,Executorexecutor);JDK8流式编程结合publicstaticListStringexampleCompletableFutureAndStream(){ExecutorServiceexecutorServiceExecutors。newCachedThreadPool();ListStringloadsnull;try{所有需要查询远程服务的load列表ListStringrequestListLists。newArrayList(load1,load2,load3,load4);ListCompletableFutureStringcompletableFuturesrequestList。stream()使用CompletableFuture以异步方式查询数据。map(reqCompletableFuture。supplyAsync(()invokeReq(req),executorService))。map(futurefuture。thenApply(Load::getStatus))。map(futurefuture。thenCompose(statusCompletableFuture。supplyAsync(()status。name()。toUpperCase())))。toList();loadscompletableFutures。stream()。map(CompletableFuture::join)。toList();System。out。println(Thread。currentThread()。getName():CompletableFuture异步方式查询请求已完成:loads。size());}finally{executorService。shutdown();}returnloads;} 注意到了吗?这里使用了两个不同的Stream流水线,是否可以在同一个处理流的流水线上一个接一个地放置多个map操作。publicstaticListStringexampleCompletableFutureAndStream(){ExecutorServiceexecutorServiceExecutors。newCachedThreadPool();ListStringloadsnull;try{所有需要查询远程服务的load列表ListStringrequestListLists。newArrayList(load1,load2,load3,load4);loadsrequestList。stream()使用CompletableFuture以异步方式查询数据。map(reqCompletableFuture。supplyAsync(()invokeReq(req),executorService))。map(futurefuture。thenApply(Load::getStatus))。map(futurefuture。thenCompose(statusCompletableFuture。supplyAsync(()status。name()。toUpperCase())))。map(CompletableFuture::join)。toList();System。out。println(Thread。currentThread()。getName():CompletableFuture异步方式查询请求已完成:loads。size());}finally{executorService。shutdown();}returnloads;} 这其实是有原因的。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,不同的请求只能以同步、顺序执行的方式才会成功。因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定服务请求的动作、通知join方法返回结果。 再来看一个例子: 我们的系统提供的运费价格是以美元计价的,但是你希望以人民币(RMB)的方式提供给你的客户。你可以用异步的方式向计费中心查询指定Load的价格,同时从远程的汇率服务那里查到人民币和美元之间的汇率。当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以人民币计价的商品价格。publicclassMultiThreadTest{Testpublicvoidtest18(){longstartSystem。nanoTime();ListCompletableFutureDoublefuturesloads。stream()。map(laodCompletableFuture查商品价格操作和查兑换汇率操作同时进行,当两者都完成时将结果进行整合。supplyAsync(()load。getPrice(load1))。thenCombine(CompletableFuture。supplyAsync(()RateService。getRate(RMB,USD)),(price,rate)pricerate))。collect(toList());ListDoubleusdPricesfutures。stream()。map(CompletableFuture::join)。collect(toList());}} 通过上述例子,可以看到相对于采用Java8之前提供的Future实现,CompletableFuture版本实现所具备的巨大优势。CompletableFuture利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。 为了更直观地感受一下使用CompletableFuture在代码可读性上带来的巨大提升,下面尝试仅使用Java7中提供的特性,重新实现上述例子的功能。publicclassMultiThreadTest{Testpublicvoidtest19()throwsExecutionException,InterruptedException{longstartSystem。nanoTime();ListFutureDoubleusdFuturePricesnewArrayList(shops。size());for(Shopshop:shops){创建一个查询人民币到美元转换汇率的FuturefinalFutureDoubleusdFutureRateexecutor。submit(newCallableDouble(){publicDoublecall(){returnRateService。getRate(RMB,USD);}});在第二个Future中查询指定商店中特定商品的价格FutureDoubleusdFuturePriceexecutor。submit(newCallableDouble(){publicDoublecall()throwsExecutionException,InterruptedException{doublermbPriceshop。getPrice(肥皂);在查找价格操作的同一个Future中,将价格和汇率做乘法计算出汇后价格returnrmbPriceusdFutureRate。get();}});usdFuturePrices。add(usdFuturePrice);}ListDoubleusdPricesnewArrayList(usdFuturePrices。size());for(FutureDoubleusdFuturePrice:usdFuturePrices){usdPrices。add(usdFuturePrice。get());}}} 这里我们思考这样一个问题:并行使用流还是CompletableFuture? 对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待IO而发生阻塞。同时也可以提供更多描述任务之间关系的接口,我们不需要为之编写更多的代码。 这里对使用这些API的建议如下:如果你进行的是计算密集型的操作,并且没有IO,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。反之,如果你并行的工作单元还涉及等待IO的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好。总结 今天大家学到了哪些知识呢?如何优化接口性能?某些场景下可以使用多线程并行代替串行。如何实现接口并行调用?通过今天的学习可以使用FutureCallable、FutureTask、CompletableFuture。详细介绍了CompletableFuture的强大,掌握CompletableFuture提供的函数式编程的能力,以及与JDK8流式编程结合使用,使代码更加美观优雅,写起来简洁和便利;在接口设计时可以参考CompletableFuture的实现,将两个无关的接口能力组装在一起以实现更加强大的功能。