专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

并发编程CompletableFuture异步编程详解

  前言
  在Java开发的web项目中,我们经常会遇到接口响应耗时过长,或者定时任务处理过慢,那在Java中最常见的解决方法就是并行了,想必大家也都不陌生了。
  今天的分享主要带大家从一个实际的串行场景出发,如何一步步优化,同时也会分享在Java中实现并行处理的多种方式,以及它们之间的区别和优缺点,通过对比总结更加深入的了解并且使用Java中并发编程的相关技术。一个串行调用的例子
  现在我们有一个查询carrier下所有Load的接口,它需要查询Loads信息、Instruction信息、Stops信息、Actions信息后然后组装数据。privateListLoadgetHydratedLoads(OptionalPageablepageable,Stringpredicate,ListObjectparams){1。耗时3秒ListLoadloadsexecuteQuery(查询Loads列表);2。耗时4秒ListInstructioninstructionsexecuteQuery(查询instructions列表);3。耗时2秒ListStopstopsexecuteQuery(查询stops列表);4。耗时3秒ListactionsexecuteQuery(查询actions列表);MultimapString,InstructioninstructionsByLoadIdindex(instructions,ii。getLoad()。getId());MultimapString,StopstopsByLoadIdindex(stops,ss。getLoad()。getId());MultimapString,ActionactionsByStopIdindex(actions,aa。getStop()。getId());数据处理handle(loads,instructions,stops,actions);returnloads;}上面实现中查询Load、Instruction、Stop、Action等信息是串行的,那串行的系统要做性能优化很常见的就是利用多线程并行了。
  这种相互之间没有影响的任务,利用并行处理后耗时就可以优化为4s。并行调用实现的几种方式
  因为项目中多线程都用线程池,所以Thread。join()这种方式就不演示了。1。FutureCallable
  Future接口在Java5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操作把调用线程解放出来,让调用线程能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。
  因为我们都是需要获取任务的返回值的,所以大家肯定想到是用FutureCallable来做。
  ThreadPoolExecutor提供了3个submit方法支持我们需要获取任务执行结果的需求。TFutureTsubmit(CallableTtask);TFutureTsubmit(Runnabletask,Tresult);Futurelt;?submit(Runnabletask);
  简单介绍下这三个submit方法:提交Runnable任务submit(Runnabletask),这个方法入参是Runnable接口,它只有一个run()方法没有返回值,所以它返回的Future只能用来判断任务是否结束;提交Callable任务submit(Callabletask),它的入参是Callable接口,只有一个call()方法,是有返回值的,所以可以获取任务执行结果;提交Runnable任务及结果引用submit(Runnabletask,Tresult),这个方法返回的Future,调用get()方法的返回值就是传入的result对象,一般用法就是实现Runnable接口时,声明一个有参构造函数,将result传进去,result相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。
  这三个方法的返回值都是Future接口,Future提供了5个方法:
  分别是取消任务的方法cancel()、判断任务是否已取消的方法isCancelled()、判断任务是否已结束的方法isDone()以及2个获得任务执行结果的get()和get(timeout,unit),其中最后一个get(timeout,unit)支持超时机制。
  需要注意的是:这两个get()方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用get()方法的线程会阻塞,直到任务执行完才会被唤醒。2。FutureTask实现并行调用
  我们再介绍下FutureTask工具类,这是一个实实在在的工具类,有两个构造函数,和上面类似,一看就明白了。FutureTask(Callablecallable);FutureTask(Runnablerunnable,Vresult);publicFutureTask(Runnablerunnable,Vresult){this。callableExecutors。callable(runnable,result);this。stateNEW;ensurevisibilityofcallable}publicstaticTCallableTcallable(Runnabletask,Tresult){if(tasknull)thrownewNullPointerException();returnnewRunnableAdapterT(task,result);}privatestaticfinalclassRunnableAdapterTimplementsCallableT{privatefinalRunnabletask;privatefinalTresult;RunnableAdapter(Runnabletask,Tresult){this。tasktask;this。resultresult;}publicTcall(){task。run();returnresult;}publicStringtoString(){returnsuper。toString()〔Wrappedtasktask〕;}}
  这个类实现了Runnable和Future接口,可以理解就是将任务和结果结合起来了,变成一个可以有响应结果的任务进行提交,本质上FutureTask里面封装的还是一个Callable接口,它实现可以有返回值就是因为它的run方法里面调用了Callable的call()方法,将结果赋值给result,然后返回。
  下面我们看下如何优化我们上面的查询接口,实现并行查询:privateListLoadgetHydratedLoadsUsingFutureTask()throwsExecutionException,InterruptedException{ExecutorServiceexecutorServiceExecutors。newCachedThreadPool();FutureTaskListLoadqueryLoadFutureTasknewFutureTask(()executeQuery(sql1));executorService。submit(queryLoadFutureTask);FutureTaskListInstructionqueryInstructionFutureTasknewFutureTask(()executeQuery(sql2));executorService。submit(queryInstructionFutureTask);FutureTaskListStopqueryStopFutureTasknewFutureTask(()executeQuery(sql3));executorService。submit(queryStopFutureTask);FutureTaskListqueryActionFutureTasknewFutureTask(()executeQuery(sql4));executorService。submit(queryActionFutureTask);获取结果ListLoadloadsqueryLoadFutureTask。get();ListInstructioninstructionsqueryInstructionFutureTask。get();ListStopstopsqueryStopFutureTask。get();ListactionsqueryActionFutureTask。get();Wegotalltheentitiesweneed,sonowletsfillinalloftheirreferencestoeachother。handleData(loads,instructions,stops,actions);returnloads;}
  那你可能会想到,如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,该怎么处理呢?
  这种问题基本上也都可以用Future来解决,但是需要将对应的FutureTask传入到当前任务中,然后调用get()方法即可。
  比如,我们创建了两个FutureTaskft1和ft2,ft1需要等待ft2执行完毕后才能做最后的数据处理,所以ft1内部需要引用ft2,并在执行数据处理前,调用ft2的get()方法实现等待。创建任务T2的FutureTaskFutureTaskStringft2newFutureTask(newT2Task());创建任务T1的FutureTaskFutureTaskStringft1newFutureTask(newT1Task(ft2));线程T1执行任务ft1ThreadT1newThread(ft1);T1。start();线程T2执行任务ft2ThreadT2newThread(ft2);T2。start();等待线程T1执行结果System。out。println(ft1。get());T1Task需要执行的任务:classT1TaskimplementsCallableString{FutureTaskStringft2;T1任务需要T2任务的FutureTaskT1Task(FutureTaskStringft2){this。ft2ft2;}OverrideStringcall()throwsException{获取T2线程结果Stringtfft2。get();return处理完的数据结果;}}T2Task需要执行的任务:classT2TaskimplementsCallableString{OverrideStringcall()throwsException{return检验查询数据;}}
  通过这上面的的例子,我们明显的发现Future实现异步编程时的一些不足之处:Future对于结果的获取很不方便,只能通过get()方法阻塞或者轮询的方式得到任务的结果。阻塞的方式显然是效率低下的,轮询的方式又十分耗费CPU资源,如果前一个任务执行比较耗时的话,get()方法会阻塞,形成排队等待的情况。将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。等待Future集合中的所有任务都完成。仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。
  我们很难表述Future结果之间的依赖性,从文字描述上这很简单。比如,下面文字描述的关系,如果用Future去实现时还是很复杂的。
  比如:当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并
  在JDK8中引入了CompletableFuture,对Future进行了改进,可以在定义CompletableFuture时传入回调对象,任务在完成或者异常时,自动回调,再也不需要每次主动通过Future去询问结果了,我们接着往下看。3。CompletableFuture
  Java在1。8版本提供了CompletableFuture来支持异步编程,CompletableFuture类实现了CompletionStage和Future接口,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过完成时回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。Callable,有结果的同步行为,比如做饭,就能产出一盘菜;Runnable,无结果的同步行为,比如吃饭,仅仅就是吃就完事了;Future,异步封装CallableRunnable,比如委托给你媳妇去做饭(其他线程);CompletableFuture,封装Future,使其拥有回调功能,比如让你媳妇做好饭了,主动告诉你做好了;
  为了体会到CompletableFuture异步编程的优势,我们还是先用CompletableFuture重新实现前面的程序。publicstaticListLoadgetHydratedLoadsUsingCompletableFuture()throwsExecutionException,InterruptedException{ExecutorServiceexecutorServiceExecutors。newCachedThreadPool();try{任务1:查询loads列表CompletableFutureListLoadqueryLoadsCompletableFuture。supplyAsync(()executeQuery(sql1),executorService);任务2:查询instructions列表CompletableFutureListInstructionqueryInstructionsCompletableFuture。supplyAsync(()executeQuery(sql2),executorService);任务3:查询stops列表CompletableFutureListStopqueryStopsCompletableFuture。supplyAsync(()executeQuery(sql3),executorService);任务4:查询actions列表CompletableFutureListqueryActionsCompletableFuture。supplyAsync(()executeQuery(sql4),executorService);任务1,2,3,4执行完成后执行数据组装CompletableFutureVoidcombineFutureCompletableFuture。allOf(queryLoads,queryInstructions,queryStops,queryActions)。thenRun(()handleData(queryLoads。join(),queryInstructions。join(),queryStops。join(),queryActions。join()));System。out。println(Thread。currentThread()。getName():主线程执行到这里了);combineFuture。get();System。out。println(String。format(queryLoads:s,queryInstructions:s,queryStops:s,queryActions:s,queryLoads。isDone(),queryInstructions。isDone(),queryStops。isDone(),queryActions。isDone()));returnqueryLoads。get();}finally{executorService。shutdown();}}
  通过上面的代码我们可以发现CompletableFuture有以下优势:无需手工维护线程,省去了手工提交任务到线程池这一步;语义更清晰,例如CompletableFuture。allOf(f1,f2,f3,f4)能够清晰地表述需要等指定的4个任务都完成才能执行后续的任务;代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。CompletableFuture解析1。CompletableFuture创建
  CompletableFuture提供了四个静态方法来创建一个异步操作:publicstaticCompletableFutureVoidrunAsync(Runnablerunnable)publicstaticCompletableFutureVoidrunAsync(Runnablerunnable,Executorexecutor)publicstaticUCompletableFutureUsupplyAsync(SupplierUsupplier)publicstaticUCompletableFutureUsupplyAsync(SupplierUsupplier,Executorexecutor)
  这四个方法区别在于:runAsync方法以Runnable函数式接口类型为参数,没有返回结果,supplyAsync方法以Supplier函数式接口类型为参数,返回结果类型为U;没有指定Executor的方法会使用ForkJoinPool。commonPool()作为它的线程池执行异步代码。如果指定了线程池,则使用指定的线程池运行。
  ForkJoinPool是JDK7提供的,叫做分支合并框架。可以通过将一个任务递归分成很多分子任务,形成不同的流,进行并行执行,同时还伴随着强大的工作窃取算法,极大的提高效率,这个不属于今天我们讨论的点,感兴趣的话可以后面再聊。
  注意:如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的IO操作,就会导致线程池中所有线程都阻塞在IO操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
  问题:为什么supplyAsync方法接收一个Supplier函数式接口类型参数而不是一个Callable类型的参数呢?FunctionalInterfacepublicinterfaceCallableV{Vcall()throwsException;}FunctionalInterfacepublicinterfaceSupplierT{Tget();}
  看了接口定义,我们发现它们其实都是一个不接受任何参数类型的函数式接口,在实践中它们做的是相同的事情(定义一个业务逻辑去处理然后有返回值),但在原则上它们的目的是做不同的事情:从语义上来看Callable是返回结果的任务,而Supplier是结果的供应商。可以理解为Callable引用了一个未执行的工作单元,Supplier引用了一个未知的值。侧重点可能不一样,如果关心的是提供一个什么值而不关心具体做了啥工作使用Supplier感觉更合适。例如,ExecutorService与Callable一起工作,因为它的主要目的是执行工作单元。CompletableFuture使用Supplier,因为它只关心提供的值,而不太关心可能需要做多少工作。两个接口定义之间的一个基本区别是,Callable允许从其实现中抛出检查异常,而Supplier不允许。2。理解CompletionStage接口
  通过接口的继承关系,我们可以发现这里的异步操作到底什么时候结束、结果如何获取,都可以通过Future接口来解决。
  另外CompletableFuture类还实现了CompletionStage接口,这个接口就比较关键了,之所以能实现响应式编程,都是通过这个接口提供的方法。
  下面介绍下CompletionStage接口,看字面意思可以理解为完成动作的一个阶段,官方注释文档:CompletionStage是一个可能执行异步计算的阶段,这个阶段会在另一个CompletionStage完成时调用去执行动作或者计算,一个CompletionStage会以正常完成或者中断的形式完成,并且它的完成会触发其他依赖的CompletionStage。CompletionStage接口的方法一般都返回新的CompletionStage,因此构成了链式的调用。
  这个看完还是有点懵逼的,不清楚什么是CompletionStage?
  在Java中什么是CompletionStage?
  一个Function、Comsumer、Supplier或者Runnable都会被描述为一个CompletionStage。stage。thenApply(xsquare(x))。thenAccept(xSystem。out。print(x))。thenRun(()System。out。println())xsquare(x)就是一个Function类型的Stage,它返回了x。xSystem。out。println(x)就是一个Comsumer类型的Stage,用于接收上一个Stage的结果x。()System。out。println()就是一个Runnable类型的Stage,既不消耗结果也不产生结果。
  但是CompletionStage这里面一共有40多个方法,我们该如何理解呢?
  CompletionStage接口可以清晰的描述任务之间的关系,可以分为顺序串行、并行、汇聚关系以及异常处理。串行关系
  CompletionStage接口里面描述串行关系,主要是thenApply、thenAccept、thenRun和thenCompose这四个系列的接口。publicUCompletionStageUthenApply(Functionlt;?superT,?extendsUfn);publicUCompletionStageUthenApplyAsync(Functionlt;?superT,?extendsUfn);publicUCompletionStageUthenApplyAsync(Functionlt;?superT,?extendsUfn,Executorexecutor);publicCompletionStageVoidthenAccept(Consumerlt;?superTaction);publicCompletionStageVoidthenAcceptAsync(Consumerlt;?superTaction);publicCompletionStageVoidthenAcceptAsync(Consumerlt;?superTaction,Executorexecutor);publicCompletionStageVoidthenRun(Runnableaction);publicCompletionStageVoidthenRunAsync(Runnableaction);publicCompletionStageVoidthenRunAsync(Runnableaction,Executorexecutor);publicUCompletionStageUthenCompose(Functionlt;?superT,?extendsCompletionStageUfn);publicUCompletionStageUthenComposeAsync(Functionlt;?superT,?extendsCompletionStageUfn);publicUCompletionStageUthenComposeAsync(Functionlt;?superT,?extendsCompletionStageUfn,Executorexecutor);thenApply系列方法里参数fn的类型是接口Function,使用该函数处理上一个CompletableFuture调用的结果,并返回一个具有处理结果的CompletionStage对象,这个方法既能接收参数也支持返回值,可以理解为对于结果的转换;thenAccept系列方法里参数action的类型是接口Consumer,这个方法虽然支持参数,但却不支持回值,可以理解为对于结果的消费;thenRun系列方法里action的参数是Runnable,所以action既不能接收参数也不支持返回值,也是对于结果的一种消费,和thenAccept区别在于Runnable并不使用前一步CompletableFuture计算的结果;thenCompose的参数为一个返回CompletableFuture实例的函数,该函数的参数是先前计算步骤的结果,和thenApply执行结果类似,区别在于会生成一个新的CompletableFuture返回,也可以理解为对于结果的转换;
  thenApply()和thenCompose()的区别?thenApply转换的是泛型中的类型,是同一个CompletableFuture,thenCompose用来连接两个CompletableFuture,是生成一个新的CompletableFuture。他们都是让CompletableFuture可以对返回的结果进行后续操作,就像Stream一样进行map和flatMap的转换。publicstaticvoidmain(String〔〕args)throwsInterruptedException,ExecutionException{CompletableFutureStringfutureCompletableFuture。supplyAsync(()Hello);CompletableFutureStringresult1future。thenApply(paramparamWorld);CompletableFutureStringresult2future。thenCompose(paramCompletableFuture。supplyAsync(()paramWorld));System。out。println(result1。get());System。out。println(result2。get());}
  这些方法里面Async代表的是异步执行fn、consumer或者action。CompletableFutureStringf0CompletableFuture。supplyAsync(()HelloWorld)。thenApply(ssQQ)。thenApply(String::toUpperCase);System。out。println(f0。join());输出结果HELLOWORLDQQ
  可以看一下thenApply()方法是如何使用的。首先通过supplyAsync()启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务却是串行执行的,依赖的执行结果,依赖的执行结果。
  CompletableFuture中thenApply如何实现?先看下静态创建CompletableFuture的方法supplyAsync;静态方法,如果没有传入线程池,使用ForkJoinPool的common线程池publicstaticUCompletableFutureUsupplyAsync(SupplierUsupplier){returnasyncSupplyStage(ASYNCPOOL,supplier);}staticUCompletableFutureUasyncSupplyStage(Executore,SupplierUf){if(fnull)thrownewNullPointerException();新建CompletableFuture对象CompletableFutureUdnewCompletableFutureU();构造AsyncSupply对象,线程池提交AsyncSupply任务e。execute(newAsyncSupplyU(d,f));将CompletableFuture对象返回returnd;}staticfinalclassAsyncSupplyTextendsForkJoinTaskVoid可以看到AsyncSupply是一个Runnable对象implementsRunnable,AsynchronousCompletionTask{CompletableFutureTdep;Supplierlt;?extendsTfn;AsyncSupply(CompletableFutureTdep,Supplierlt;?extendsTfn){this。depdep;this。fnfn;}publicfinalVoidgetRawResult(){returnnull;}publicfinalvoidsetRawResult(Voidv){}publicfinalbooleanexec(){run();returnfalse;}publicvoidrun(){CompletableFutureTd;Supplierlt;?extendsTf;if((ddep)!null(ffn)!null){depnull;fnnull;CompletableFuture对象的result为空时if(d。resultnull){try{调用传入的supplier的get方法,并将结果放入result字段注意:这是在线程池中提交的,所以是异步处理的d。completeValue(f。get());}catch(Throwableex){d。completeThrowable(ex);}}处理完当前方法后,处理依赖它的栈顶方法,后面的回调方法入栈和这块呼应d。postComplete();}}}finalvoidpostComplete(){变量f存储的是当前已经完成的CompletableFutureCompletableFuturelt;?fthis;Completionh;while((hf。stack)!null(f!this(h(fthis)。stack)!null)){CompletableFuturelt;?d;Completiont;CAS操作,将依赖此阶段的栈顶元素取出,并且设置为下一个if(STACK。compareAndSet(f,h,th。next)){if(t!null){if(f!this){如果f不是this,将刚出栈的h入this的栈顶pushStack(h);continue;}将h剥离出来,h。nextnull,帮助gcNEXT。compareAndSet(h,t,null);trytodetach}调用tryFiref(dh。tryFire(NESTED))null?this:d;}}}再看下异步处理完supplyAsync后的回调方法thenApply方法,看看它是如何实现回调的;publicUCompletableFutureUthenApply(Functionlt;?superT,?extendsUfn){returnuniApplyStage(null,fn);}privateVCompletableFutureVuniApplyStage(Executore,Functionlt;?superT,?extendsVf){if(fnull)thrownewNullPointerException();Objectr;如果当前阶段结果已经返回,则直接运行回调方法if((rresult)!null)returnuniApplyNow(r,e,f);CompletableFutureVdnewIncompleteFuture();构造Completion放入等待栈的顶unipush(newUniApplyT,V(e,d,this,f));returnd;}privateVCompletableFutureVuniApplyNow(Objectr,Executore,Functionlt;?superT,?extendsVf){Throwablex;CompletableFutureVdnewIncompleteFuture();如果依赖的方法异常中断,则直接处理并返回异常if(rinstanceofAltResult){if((x((AltResult)r)。ex)!null){d。resultencodeThrowable(x,r);returnd;}rnull;}try{执行到这里说明依赖的任务已经有结果了,用它的结果当作参数调用回调方法注意这里都是线程池中的线程在执行,所以是异步执行if(e!null){e。execute(newUniApplyT,V(null,d,this,f));}else{SuppressWarnings(unchecked)Tt(T)r;d。resultd。encodeValue(f。apply(t));}}catch(Throwableex){d。resultencodeThrowable(ex);}returnd;}finalvoidunipush(Completionc){if(c!null){CAS自旋将回调方法压入栈顶while(!tryPushStack(c)){if(result!null){NEXT。set(c,null);break;}}可能在重试中完成,判断result不为空就执行if(result!null)c。tryFire(SYNC);}}再次尝试判断依赖方法是否处理完成,处理完成则调用目标回调方法finalCompletableFutureVtryFire(intmode){CompletableFutureVd;CompletableFutureTa;Objectr;Throwablex;Functionlt;?superT,?extendsVf;if((asrc)null(ra。result)null(ddep)null(ffn)null)returnnull;tryComplete:if(d。resultnull){if(rinstanceofAltResult){if((x((AltResult)r)。ex)!null){d。completeThrowable(x,r);breaktryComplete;}rnull;}try{if(mode0!claim())returnnull;else{SuppressWarnings(unchecked)Tt(T)r;d。completeValue(f。apply(t));}}catch(Throwableex){d。completeThrowable(ex);}}srcnull;depnull;fnnull;成功处理完依赖方法和回调方法后进行处理,可能唤醒其他的回调方法或者清理栈returnd。postFire(a,mode);}描述AND汇聚关系
  CompletionStage接口里面描述AND汇聚关系,主要是thenCombine、thenAcceptBoth和runAfterBoth系列的接口,这些接口的区别是源自fn、consumer、action这三个核心参数不同。publicU,VCompletionStageVthenCombine(CompletionStagelt;?extendsUother,BiFunctionlt;?superT,?superU,?extendsVfn);publicU,VCompletionStageVthenCombineAsync(CompletionStagelt;?extendsUother,BiFunctionlt;?superT,?superU,?extendsVfn);publicU,VCompletionStageVthenCombineAsync(CompletionStagelt;?extendsUother,BiFunctionlt;?superT,?superU,?extendsVfn,Executorexecutor);publicUCompletionStageVoidthenAcceptBoth(CompletionStagelt;?extendsUother,BiConsumerlt;?superT,?superUaction);publicUCompletionStageVoidthenAcceptBothAsync(CompletionStagelt;?extendsUother,BiConsumerlt;?superT,?superUaction);publicUCompletionStageVoidthenAcceptBothAsync(CompletionStagelt;?extendsUother,BiConsumerlt;?superT,?superUaction,Executorexecutor);publicCompletionStageVoidrunAfterBoth(CompletionStagelt;?other,Runnableaction);publicCompletionStageVoidrunAfterBothAsync(CompletionStagelt;?other,Runnableaction);publicCompletionStageVoidrunAfterBothAsync(CompletionStagelt;?other,Runnableaction,Executorexecutor);
  Async后缀的方法表示,前面的CompletionStage执行完成,在执行后续操作时会提交到线程池处理,否则就还是使用同一个处理线程完成CompletableFuture的所有任务。
  这三种方法意思都是等两个CompletionStage都完成了计算才会执行下一步的操作,区别在于参数接口类型不一样。thenCombine参数接口类型为BiFunction,可以拿到前一步两个CompletionStage的运算结果,进行下一步处理,同时有返回值(转化操作);thenAcceptBoth参数接口类型为BiConsumer,也可以拿到前一步的运算结果进行下一步处理,但是无返回值(消费操作);runAfterBoth参数接口类型为Runnable,即不能获取到上一步的执行结果,也无返回值(不关心运行结果);
  CompletableFuture中thenAcceptBoth如何实现?talkischeap!!publicUCompletableFutureVoidthenAcceptBoth(CompletionStagelt;?extendsUother,BiConsumerlt;?superT,?superUaction){returnbiAcceptStage(null,other,action);}privateUCompletableFutureVoidbiAcceptStage(Executore,CompletionStageUo,BiConsumerlt;?superT,?superUf){CompletableFutureUb;Objectr,s;if(fnull(bo。toCompletableFuture())null)thrownewNullPointerException();CompletableFutureVoiddnewIncompleteFuture();如果两个阶段有任何一个没有执行完成,则将回调方法分别放到两个互相依赖阶段的栈顶if((rresult)null(sb。result)null)bipush(b,newBiAcceptT,U(e,d,this,b,f));elseif(enull)如果两个依赖的阶段都执行完成则调用回调方法d。biAccept(r,s,f,null);elsetry{e。execute(newBiAcceptT,U(null,d,this,b,f));}catch(Throwableex){d。resultencodeThrowable(ex);}returnd;}描述OR汇聚关系
  OR的关系,表示谁运行快就用谁的结果执行下一步操作。publicUCompletionStageUapplyToEither(CompletionStagelt;?extendsTother,Functionlt;?superT,Ufn);publicUCompletionStageUapplyToEitherAsync(CompletionStagelt;?extendsTother,Functionlt;?superT,Ufn);publicUCompletionStageUapplyToEitherAsync(CompletionStagelt;?extendsTother,Functionlt;?superT,Ufn,Executorexecutor);publicCompletionStageVoidacceptEither(CompletionStagelt;?extendsTother,Consumerlt;?superTaction);publicCompletionStageVoidacceptEitherAsync(CompletionStagelt;?extendsTother,Consumerlt;?superTaction);publicCompletionStageVoidacceptEitherAsync(CompletionStagelt;?extendsTother,Consumerlt;?superTaction,Executorexecutor);publicCompletionStageVoidrunAfterEither(CompletionStagelt;?other,Runnableaction);publicCompletionStageVoidrunAfterEitherAsync(CompletionStagelt;?other,Runnableaction);publicCompletionStageVoidrunAfterEitherAsync(CompletionStagelt;?other,Runnableaction,Executorexecutor);
  同样也是有Async后缀的表示,当前面的CompletionStage执行完成,在执行后续操作时会提交到线程池处理。applyToEither、acceptEither、runAfterEither三个方法的区别还是来自于不同的接口参数类型:Function、Consumer、Runnable。
  CompletableFuture中applyToEither如何实现?publicUCompletableFutureUapplyToEitherAsync(CompletionStagelt;?extendsTother,Functionlt;?superT,Ufn){returnorApplyStage(defaultExecutor(),other,fn);}privateUextendsT,VCompletableFutureVorApplyStage(Executore,CompletionStageUo,Functionlt;?superT,?extendsVf){CompletableFutureUb;if(fnull(bo。toCompletableFuture())null)thrownewNullPointerException();Objectr;CompletableFuturelt;?extendsTz;这块是重点,有任何一个阶段的结果不为空就直接执行functionif((r(zthis)。result)!null(r(zb)。result)!null)returnz。uniApplyNow(r,e,f);CompletableFutureVdnewIncompleteFuture();如果都为空则将回调方法分别push到被依赖的两个阶段的栈顶orpush(b,newOrApplyT,U,V(e,d,this,b,f));returnd;}异常处理
  在Java编程中,异常处理当然是必不可少的一环,那你可能会想到如果在使用CompletableFuture进行异步链式编程时,如果出现异常该怎么处理呢?
  首先上面我们提到的fn、consumer、action它们的核心方法是不允许抛出可检查异常的,但是却无法限制它们抛出运行时异常。在同步方法中,我们可以使用trycatch{}来捕获并处理异常,但在异步编程里面异常该如何处理?CompletionStage接口给我们提供的方案非常简单,比trycatch{}还要简单。
  下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。publicCompletableFutureTwhenComplete(BiConsumerlt;?superT,?superThrowableaction)publicCompletableFutureTwhenCompleteAsync(BiConsumerlt;?superT,?superThrowableaction)publicCompletableFutureTwhenCompleteAsync(BiConsumerlt;?superT,?superThrowableaction,Executorexecutor)publicCompletableFutureTexceptionally(FunctionThrowable,?extendsTfn)参数的类型是BiConsumerlt;?superT,?superThrowable,它可以处理正常的计算结果,或者异常情况,可以获取到上一步的执行结果作为参数;无论是否发生异常都会执行whenComplete()中的回调函数action;方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行;这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。CompletableFutureVoidfutureCompletableFuture。runAsync((){try{TimeUnit。SECONDS。sleep(1);}catch(InterruptedExceptione){}if(newRandom()。nextInt()20){inti120;}System。out。println(执行结束!);});future。whenComplete(newBiConsumerVoid,Throwable(){Overridepublicvoidaccept(Voidt,Throwableaction){System。out。println(执行完成!);}});future。exceptionally(newFunctionThrowable,Void(){OverridepublicVoidapply(Throwablet){System。out。println(执行失败:t。getMessage());returnnull;}})。join();
  handle也是执行任务完成时对结果的处理,whenComplete()和handle()的区别在于whenComplete()不支持返回结果,而handle()是支持返回结果的。
  当上一个的CompletableFuture的值计算完成或者抛出异常的时候,会触发handle方法中定义的函数,结果由BiFunction参数计算而得,因此这组方法兼有whenComplete和转换的两个功能。publicUCompletionStageUhandle(BiFunctionlt;?superT,Throwable,?extendsUfn);publicUCompletionStageUhandleAsync(BiFunctionlt;?superT,Throwable,?extendsUfn);publicUCompletionStageUhandleAsync(BiFunctionlt;?superT,Throwable,?extendsUfn,Executorexecutor);JDK8流式编程结合publicstaticListStringexampleCompletableFutureAndStream(){ExecutorServiceexecutorServiceExecutors。newCachedThreadPool();ListStringloadsnull;try{所有需要查询远程服务的load列表ListStringrequestListLists。newArrayList(load1,load2,load3,load4);ListCompletableFutureStringcompletableFuturesrequestList。stream()使用CompletableFuture以异步方式查询数据。map(reqCompletableFuture。supplyAsync(()invokeReq(req),executorService))。map(futurefuture。thenApply(Load::getStatus))。map(futurefuture。thenCompose(statusCompletableFuture。supplyAsync(()status。name()。toUpperCase())))。toList();loadscompletableFutures。stream()。map(CompletableFuture::join)。toList();System。out。println(Thread。currentThread()。getName():CompletableFuture异步方式查询请求已完成:loads。size());}finally{executorService。shutdown();}returnloads;}
  注意到了吗?这里使用了两个不同的Stream流水线,是否可以在同一个处理流的流水线上一个接一个地放置多个map操作。publicstaticListStringexampleCompletableFutureAndStream(){ExecutorServiceexecutorServiceExecutors。newCachedThreadPool();ListStringloadsnull;try{所有需要查询远程服务的load列表ListStringrequestListLists。newArrayList(load1,load2,load3,load4);loadsrequestList。stream()使用CompletableFuture以异步方式查询数据。map(reqCompletableFuture。supplyAsync(()invokeReq(req),executorService))。map(futurefuture。thenApply(Load::getStatus))。map(futurefuture。thenCompose(statusCompletableFuture。supplyAsync(()status。name()。toUpperCase())))。map(CompletableFuture::join)。toList();System。out。println(Thread。currentThread()。getName():CompletableFuture异步方式查询请求已完成:loads。size());}finally{executorService。shutdown();}returnloads;}
  这其实是有原因的。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,不同的请求只能以同步、顺序执行的方式才会成功。因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定服务请求的动作、通知join方法返回结果。
  再来看一个例子:
  我们的系统提供的运费价格是以美元计价的,但是你希望以人民币(RMB)的方式提供给你的客户。你可以用异步的方式向计费中心查询指定Load的价格,同时从远程的汇率服务那里查到人民币和美元之间的汇率。当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以人民币计价的商品价格。publicclassMultiThreadTest{Testpublicvoidtest18(){longstartSystem。nanoTime();ListCompletableFutureDoublefuturesloads。stream()。map(laodCompletableFuture查商品价格操作和查兑换汇率操作同时进行,当两者都完成时将结果进行整合。supplyAsync(()load。getPrice(load1))。thenCombine(CompletableFuture。supplyAsync(()RateService。getRate(RMB,USD)),(price,rate)pricerate))。collect(toList());ListDoubleusdPricesfutures。stream()。map(CompletableFuture::join)。collect(toList());}}
  通过上述例子,可以看到相对于采用Java8之前提供的Future实现,CompletableFuture版本实现所具备的巨大优势。CompletableFuture利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。
  为了更直观地感受一下使用CompletableFuture在代码可读性上带来的巨大提升,下面尝试仅使用Java7中提供的特性,重新实现上述例子的功能。publicclassMultiThreadTest{Testpublicvoidtest19()throwsExecutionException,InterruptedException{longstartSystem。nanoTime();ListFutureDoubleusdFuturePricesnewArrayList(shops。size());for(Shopshop:shops){创建一个查询人民币到美元转换汇率的FuturefinalFutureDoubleusdFutureRateexecutor。submit(newCallableDouble(){publicDoublecall(){returnRateService。getRate(RMB,USD);}});在第二个Future中查询指定商店中特定商品的价格FutureDoubleusdFuturePriceexecutor。submit(newCallableDouble(){publicDoublecall()throwsExecutionException,InterruptedException{doublermbPriceshop。getPrice(肥皂);在查找价格操作的同一个Future中,将价格和汇率做乘法计算出汇后价格returnrmbPriceusdFutureRate。get();}});usdFuturePrices。add(usdFuturePrice);}ListDoubleusdPricesnewArrayList(usdFuturePrices。size());for(FutureDoubleusdFuturePrice:usdFuturePrices){usdPrices。add(usdFuturePrice。get());}}}
  这里我们思考这样一个问题:并行使用流还是CompletableFuture?
  对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待IO而发生阻塞。同时也可以提供更多描述任务之间关系的接口,我们不需要为之编写更多的代码。
  这里对使用这些API的建议如下:如果你进行的是计算密集型的操作,并且没有IO,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。反之,如果你并行的工作单元还涉及等待IO的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好。总结
  今天大家学到了哪些知识呢?如何优化接口性能?某些场景下可以使用多线程并行代替串行。如何实现接口并行调用?通过今天的学习可以使用FutureCallable、FutureTask、CompletableFuture。详细介绍了CompletableFuture的强大,掌握CompletableFuture提供的函数式编程的能力,以及与JDK8流式编程结合使用,使代码更加美观优雅,写起来简洁和便利;在接口设计时可以参考CompletableFuture的实现,将两个无关的接口能力组装在一起以实现更加强大的功能。

做好战斗准备恩比德每个人都必须站出来北京时间12月25日,在昨日击败洛杉矶快船赛后,费城76人队内讨论了他们即将迎来的自2019年以来的首场圣诞大战。谈到本场比赛,76人球星乔尔恩比德难掩兴奋这是(麦迪逊广场)花园,CCTV5直播!辽宁战同曦双子星给双枪上课,阿丘尔内线无人匹敌北京时间12月25日,CBA常规赛第2阶段比赛将会继续展开争夺,在今天晚上1935CBA将会迎来一场焦点之战,那就是卫冕冠军辽宁男篮将会对阵本赛季大黑马之一,同时也有着网红球队之称一周球鞋!莫兰特1代来了,KD16发售确定莫兰特1代要来了?耐克和莫兰特都在个人社交账号上,晒出了印有贾莫兰特个人Logo的礼盒。小贾大概率将在明天对阵勇士的圣诞大战上穿上自己的签名鞋!莫兰特1代签名鞋的鞋盒之前就有曝光了33岁!生涯赚了2。7亿,从顶流到板凳,这次他真的想争冠了年轻的时候也是在天上摘星星的人,今天宁愿在泥地里打滚也要把这个球给救回来。这是2021年篮网与雄鹿的东部半决赛第一场第二节中,格里芬飞身抢断波蒂斯的过程中,管泽元的一句解说词。作为NBA全明星投票开始!时代的交接!威少还能否再进一次全明星?全明星投票,如火如荼的进行着。技巧君爱看全明星,不仅仅是因为这是NBA一年一度的大会,更因为全明星投票的结果,正印证了一个时代的交接。昔日满满少年意气的新人王们走下九重天,留下满地特殊时期,不管阳或者阴,多吃这5种食物,避免复阳疫情就喜欢欺负老实人,全民都在阳的特殊时期,老实人只会戴口罩躲在家。对此不少专家都在呼吁应该多吃富含维生素C的食物,因为维生素C对于我们的免疫力来说有着非常重要的作用。很多食物中都这两个习惯,拉开了普通人和优秀者的差距人和人的差距有多大?能有多大?但又能有多大?但诚如这两个公式,说明了不同人的微小的选择,将会在时间的积累下,发生多么巨大的变化。有人坚持每天进步一点点,一年下来,就获得了超过37倍3D食物打印机不管你信不信,3D打印正在颠覆我们的生活!不仅是工业科技,3D打印已经开始入侵我们的食物领地,目前,已有不少公司进军餐饮业的食物打印技术,研发了通过3D打印机制造出各种形状且营养丰改革开放,中国经济活力足行稳致远二二二年终经济观察改革开放,当代中国发展进步的活力之源。深入推进改革创新,坚定不移扩大开放,着力破解深层次体制机制障碍,不断彰显中国特色社会主义制度优势,不断增强社会主义现代化建设的动力和活力,把我数据发布谢菲联考文垂罗瑟汉姆斯托克城普雷斯顿哈德斯抽了点时间,给我的头条粉丝们再整理了3场比赛的数据。一谢菲联VS考文垂1基本信息谢菲尔德联VS考文垂交战历史双方近6次交战,谢菲联2胜1平3负,进6球,失9球,大球3次,小球3次谢刀片嗓太难受?试试这个缓解症状的方法宝娟,我的嗓子疼!近来,不少市民表示自己阳了以后开始出现喉咙痛喉咙干,喝水时喉咙疼得犹如吞刀片痛苦经历一言难尽。长沙市第三医院口腔科主任徐红表示,新冠病毒可引起上呼吸道咽喉部粘膜的
欧文密谋与詹皇联手?名记曝湖人秘密计划,欧文詹皇是我偶像大家都知道,今夏湖人队想要操作威少交易得到欧文,不过随着杜兰特确认留队之后,这笔交易随之搁浅,但是问题也来了,难道欧文真的不会再跟詹姆斯联手了吗?实际上也未必,至少休赛季想要看到湖复杂多元新能源电网智能协同测控关键技术研究项目顺利通过科技成果鉴定近日,中国计量测试学会组织对北京市检验检测认证中心所属北京市计量检测科学研究院复杂多元新能源电网智能协同测控关键技术研究项目进行科技成果鉴定。鉴定专家组由哈尔滨工业大学谭久彬院士等法甲巴黎圣日耳曼胜图卢兹8月31日,巴黎圣日耳曼队球员在比赛中庆祝进球。新华社路透当日,在20222023赛季法国足球甲级联赛第五轮比赛中,巴黎圣日耳曼队客场以3比0战胜图卢兹队。8月31日,巴黎圣日耳曼官厅水库附近的房子奔赴山川湖海享受自然与爱,热爱可抵岁月漫长,热爱家人热爱生活热爱自然,热爱每一个热爱,这也许就是这片山湖给予每个人的力量盼着盼着,中秋佳节终于快到了。还记得去年中秋佳节一直待在家中与新能源同行开启万亿级自动驾驶干线物流市场光明网讯日前,商用车自动驾驶公司卡睿智行Corage宣布,已完成数千万人民币的天使轮融资,投资方为辰韬资本丰元资本和日本PKSHASPARXAlgorithm基金。卡睿智行首次提出红米K60mini渲染图,四边等宽2K屏天玑8200,小屏旗舰有了新体验如今的手机市场相当繁杂,为了凸现优势,各大手机纷纷开启了内卷模式,均在不断创新,研发众多黑科技,以此来吸引用户。毕竟如今的手机市场千篇一律,无论是产品设计还是性能配置都非常接近,很中国99。63的人在银行存款不超过50万买不起房,结不起婚,养不起孩子,生不起病,才是绝大多数人生活的真实状态。根据咱们央行的统计,中国99。63的人在银行的存款不超过50万,到目前为止,中国人均存款也就是55000元,我军将领率600人投日寇,公然吃喝嫖赌,粟裕大喜给他连升三级他头顶汉奸的骂名,率领600名官兵奉命假投降,打入敌军内部,多次打破敌人的清乡计划和对我方的经济封锁。他将敌军重要信息枪支弹药送给我党。在假投降的160多天后,接到我方指令发动攻击成龙与刘德华为什么会屹立不倒,而李易峰与吴亦凡为什么会翻车?这几天被李易峰PC刷了屏,其实明星也是人,他落到如今这个下场,不是什么新鲜事。六七十年代,大家追捧的明星是雷锋王进喜等榜样式的人物八十年代初,随着改革开放的推进,外国及港台明星纷纷秋季保湿,男人该怎么用乳液面霜?季节变换,皮肤也需要不同的护理。与炎热多雨的夏季相比,秋季的空气湿度降低了很多,尤其是皮肤干燥的朋友。很多男人总是误以为自己的皮肤爱出油,就不去做,也不需要保湿。其实健康人要做到水22个护肤小常识,送给想要变美的你1护肤之前一定要了解自己的肤质,根据自己的肤质选择适合自己的护肤品。2少吃油炸和甜食,多吃水果和蔬菜,有利于皮肤的保养。3每天洗脸不能过于频繁,早晚一次就好。4洗面奶选择氨基酸,温
友情链接:快好知快生活快百科快传网中准网文好找聚热点快软网