专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

一网打尽异步神器CompletableFuture

  最近一直畅游在RocketMQ的源码中,发现在RocketMQ中很多地方都使用到了CompletableFuture,所以今天就跟大家来聊一聊JDK1。8提供的异步神器CompletableFuture,并且最后会结合RocketMQ源码分析一下CompletableFuture的使用。Future接口以及它的局限性
  我们都知道,Java中创建线程的方式主要有两种方式,继承Thread或者实现Runnable接口。但是这两种都是有一个共同的缺点,那就是都无法获取到线程执行的结果,也就是没有返回值。于是在JDK1。5以后为了解决这种没有返回值的问题,提供了Callable和Future接口以及Future对应的实现类FutureTask,通过FutureTask的就可以获取到异步执行的结果。
  于是乎,我们想要开启异步线程,执行任务,获取结果,就可以这么实现。FutureTaskStringfutureTasknewFutureTask(()三友);newThread(futureTask)。start();System。out。println(futureTask。get());
  或者使用线程池的方式ExecutorServiceexecutorServiceExecutors。newSingleThreadExecutor();FutureStringfutureexecutorService。submit(()三友);System。out。println(future。get());executorService。shutdown();
  线程池底层也是将提交的Callable的实现先封装成FutureTask,然后通过execute方法来提交任务,执行异步逻辑。Future接口的局限性
  虽然通过Future接口的get方法可以获取任务异步执行的结果,但是get方法会阻塞主线程,也就是异步任务没有完成,主线程会一直阻塞,直到任务结束。
  Future也提供了isDone方法来查看异步线程任务执行是否完成,如果完成,就可以获取任务的执行结果,代码如下。
  ExecutorServiceexecutorServiceExecutors。newSingleThreadExecutor();FutureStringfutureexecutorService。submit(()三友);while(!future。isDone()){任务有没有完成,没有就继续循环判断}System。out。println(future。get());executorService。shutdown();
  但是这种轮询查看异步线程任务执行状态,也是非常消耗cpu资源。
  同时对于一些复杂的异步操作任务的处理,可能需要各种同步组件来一起完成。
  所以,通过上面的介绍可以看出,Future在使用的过程中还是有很强的局限性,所以为了解决这种局限性,在JDK1。8的时候,DougLea大神为我们提供了一种更为强大的类CompletableFuture。什么是CompletableFuture?
  CompletableFuture在JDK1。8提供了一种更加强大的异步编程的api。它实现了Future接口,也就是Future的功能特性CompletableFuture也有;除此之外,它也实现了CompletionStage接口,CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。
  CompletableFuture相比于Future最大的改进就是提供了类似观察者模式的回调监听的功能,也就是当上一阶段任务执行结束之后,可以回调你指定的下一阶段任务,而不需要阻塞获取结果之后来处理结果。CompletableFuture常见api详解
  CompletableFuture的方法api多,但主要可以分为以下几类。1、实例化CompletableFuture构造方法创建
  CompletableFutureStringcompletableFuturenewCompletableFuture();System。out。println(completableFuture。get());
  此时如果有其它线程执行如下代码,就能执行打印出三友completableFuture。complete(三友)静态方法创建
  除了使用构造方法构造,CompletableFuture还提供了静态方法来创建publicstaticUCompletableFutureUsupplyAsync(SupplierUsupplier);publicstaticUCompletableFutureUsupplyAsync(SupplierUsupplier,Executorexecutor);publicstaticCompletableFutureVoidrunAsync(Runnablerunnable);publicstaticCompletableFutureVoidrunAsync(Runnablerunnable,Executorexecutor);
  supply和run的主要区别就是supply可以有返回值,run没有返回值。至于另一个参数Executor就是用来执行异步任务的线程池,如果不传Executor的话,默认是ForkJoinPool这个线程池的实现。
  一旦通过静态方法来构造,会立马开启异步线程执行Supplier或者Runnable提交的任务。CompletableFutureStringcompletableFutureCompletableFuture。supplyAsync(()三友);System。out。println(completableFuture。get());
  一旦任务执行完成,就可以打印返回值,这里的使用方法跟Future是一样的。
  所以对比两个两种实例化的方法,使用静态方法的和使用构造方法主要区别就是,使用构造方法需要其它线程主动调用complete来表示任务执行完成,因为很简单,因为在构造的时候没有执行异步的任务,所以需要其它线程主动调用complete来表示任务执行完成。2、获取任务执行结果publicTget();publicTget(longtimeout,TimeUnitunit);publicTgetNow(TvalueIfAbsent);publicTjoin();
  get()和get(longtimeout,TimeUnitunit)是实现了Future接口的功能,两者主要区别就是get()会一直阻塞直到获取到结果,get(longtimeout,TimeUnitunit)值可以指定超时时间,当到了指定的时间还未获取到任务,就会抛出TimeoutException异常。
  getNow(TvalueIfAbsent):就是获取任务的执行结果,但不会产生阻塞。如果任务还没执行完成,那么就会返回你传入的valueIfAbsent参数值,如果执行完成了,就会返回任务执行的结果。
  join():跟get()的主要区别就是,get()会抛出检查时异常,join()不会。3、主动触发任务完成
  publicbooleancomplete(Tvalue);publicbooleancompleteExceptionally(Throwableex);
  complete:主动触发当前异步任务的完成。调用此方法时如果你的任务已经完成,那么方法就会返回false;如果任务没完成,就会返回true,并且其它线程获取到的任务的结果就是complete的参数值。
  completeExceptionally:跟complete的作用差不多,complete是正常结束任务,返回结果,而completeExceptionally就是触发任务执行的异常。4、对任务执行结果进行下一步处理只能接收任务正常执行后的回调publicUCompletionStageUthenApply(Functionlt;?superT,?extendsUfn);publicCompletableFutureVoidthenRun(Runnableaction);publicCompletionStageVoidthenAccept(Consumerlt;?superTaction);
  这类回调的特点就是,当任务正常执行完成,没有异常的时候就会回调。
  thenApply:可以拿到上一步任务执行的结果进行处理,并且返回处理的结果thenRun:拿不到上一步任务执行的结果,但会执行Runnable接口的实现thenAccept:可以拿到上一步任务执行的结果进行处理,但不需要返回处理的结果
  thenApply示例:CompletableFutureStringcompletableFutureCompletableFuture。supplyAsync(()10)。thenApply(v(上一步的执行的结果为:v));System。out。println(completableFuture。join());
  执行结果:上一步的执行的结果为:10
  thenRun示例:CompletableFutureVoidcompletableFutureCompletableFuture。supplyAsync(()10)。thenRun(()System。out。println(上一步执行完成));
  执行结果:
  上一步执行完成
  thenAccept示例:
  CompletableFutureVoidcompletableFutureCompletableFuture。supplyAsync(()10)。thenAccept(vSystem。out。println(上一步执行完成,结果为:v));
  执行结果:上一步执行完成,结果为:10
  thenApply有异常示例:CompletableFutureStringcompletableFutureCompletableFuture。supplyAsync((){模拟异常inti10;return10;})。thenApply(v(上一步的执行的结果为:v));System。out。println(completableFuture。join());
  执行结果:Exceptioninthreadmainjava。util。concurrent。CompletionException:java。lang。ArithmeticException:byzeroatjava。util。concurrent。CompletableFuture。encodeThrowable(CompletableFuture。java:273)atjava。util。concurrent。CompletableFuture。completeThrowable(CompletableFuture。java:280)atjava。util。concurrent。CompletableFutureAsyncSupply。run(CompletableFuture。java:1606)
  当有异常时是不会回调的只能接收任务处理异常后的回调publicCompletionStageTexceptionally(FunctionThrowable,?extendsTfn);
  当上面的任务执行过程中出现异常的时候,会回调exceptionally方法指定的回调,但是如果没有出现异常,是不会回调的。
  exceptionally能够将异常给吞了,并且fn的返回值会返回回去。
  其实这个exceptionally方法有点像降级的味道。当出现异常的时候,走到这个回调,可以返回一个默认值回去。
  没有异常情况下:CompletableFutureIntegercompletableFutureCompletableFuture。supplyAsync((){return100;})。exceptionally(e{System。out。println(出现异常了,返回默认值);return110;});System。out。println(completableFuture。join());
  执行结果:100
  有异常情况下:CompletableFutureIntegercompletableFutureCompletableFuture。supplyAsync((){inti10;return100;})。exceptionally(e{System。out。println(出现异常了,返回默认值);return110;});System。out。println(completableFuture。join());
  执行结果:出现异常了,返回默认值110能同时接收任务执行正常和异常的回调
  publicUCompletionStageUhandle(BiFunctionlt;?superT,Throwable,?extendsUfn);publicCompletionStageTwhenComplete(BiConsumerlt;?superT,?superThrowableactin);
  不论前面的任务执行成功还是失败都会回调的这类方法指定的回调方法。
  handle:跟exceptionally有点像,但是exceptionally是出现异常才会回调,两者都有返回值,都能吞了异常,但是handle正常情况下也能回调。
  whenComplete:能接受正常或者异常的回调,并且不影响上个阶段的返回值,也就是主线程能获取到上个阶段的返回值;当出现异常时,whenComplete并不能吞了这个异常,也就是说主线程在获取执行异常任务的结果时,会抛出异常。
  这里演示一下whenComplete处理异常示例情况,handle跟exceptionally对异常的处理差不多。
  whenComplete处理异常示例:CompletableFutureIntegercompletableFutureCompletableFuture。supplyAsync((){inti10;return10;})。whenComplete((r,e){System。out。println(whenComplete被调用了);});System。out。println(completableFuture。join());
  执行结果:whenComplete被调用了Exceptioninthreadmainjava。util。concurrent。CompletionException:java。lang。ArithmeticException:byzeroatjava。util。concurrent。CompletableFuture。encodeThrowable(CompletableFuture。java:273)atjava。util。concurrent。CompletableFuture。completeThrowable(CompletableFuture。java:280)atjava。util。concurrent。CompletableFutureAsyncSupply。run(CompletableFuture。java:1606)
  5、对任务结果进行合并publicU,VCompletionStageVthenCombine(CompletionStagelt;?extendsUother,BiFunctionlt;?superT,?superU,?extendsVfn);
  这个方法的意思是,当前任务和other任务都执行结束后,拿到这两个任务的执行结果,回调BiFunction,然后返回新的结果。
  thenCombine的例子请往下继续看。6、以Async结尾的方法
  上面说的一些方法,比如说thenAccept方法,他有两个对应的Async结尾的方法,如下:publicCompletionStageVoidthenAcceptAsync(Consumerlt;?superTaction,Executorexecutor);publicCompletionStageVoidthenAcceptAsync(Consumerlt;?superTaction);
  thenAcceptAsync跟thenAccept的主要区别就是thenAcceptAsync会重新开一个线程来执行下一阶段的任务,而thenAccept还是用上一阶段任务执行的线程执行。
  两个thenAcceptAsync主要区别就是一个使用默认的线程池来执行任务,也就是ForkJoinPool,一个是使用方法参数传入的线程池来执行任务。
  当然除了thenAccept方法之外,上述提到的方法还有很多带有Async结尾的对应的方法,他们的主要区别就是执行任务是否开启异步线程来执行的区别。
  当然,还有一些其它的api,可以自行查看CompletableFuture在RocketMQ中的使用
  CompletableFuture在RocketMQ中的使用场景比较多,这里我举一个消息存储的场景。
  在RocketMQ中,Broker接收到生产者产生的消息的时候,会将消息持久化到磁盘和同步到从节点中。持久化到磁盘和消息同步到从节点是两个独立的任务,互不干扰,可以相互独立执行。当消息持久化到磁盘和同步到从节点中任务完成之后,需要统计整个存储消息消耗的时间,所以统计整个存储消息消耗的时间是依赖前面两个任务的完成。
  实现代码如下
  消息存储刷盘任务和主从复制任务:PutMessageResultputMessageResultnewPutMessageResult(PutMessageStatus。PUTOK,result);提交刷盘的请求CompletableFuturePutMessageStatusflushResultFuturesubmitFlushRequest(result,msg);提交主从复制的请求CompletableFuturePutMessageStatusreplicaResultFuturesubmitReplicaRequest(result,msg);刷盘和主从复制两个异步任务通过thenCombine联合returnflushResultFuture。thenCombine(replicaResultFuture,(flushStatus,replicaStatus){当两个刷盘和主从复制任务都完成的时候,就会回调如果刷盘没有成功,那么就将消息存储的状态设置为失败if(flushStatus!PutMessageStatus。PUTOK){putMessageResult。setPutMessageStatus(flushStatus);}如果主从复制没有成功,那么就将消息存储的状态设置为失败if(replicaStatus!PutMessageStatus。PUTOK){putMessageResult。setPutMessageStatus(replicaStatus);}最终返回消息存储的结果returnputMessageResult;});
  对上面两个合并的任务执行结果通过thenAccept方法进行监听,统计消息存储的耗时:消息存储的开始时间longbeginTimethis。getSystemClock()。now();存储消息,然后返回CompletableFuture,也就是上面一段代码得返回值CompletableFuturePutMessageResultputResultFuturethis。commitLog。asyncPutMessage(msg);监听消息存储的结果putResultFuture。thenAccept((result){消息存储完成之后会回调longelapsedTimethis。getSystemClock()。now()beginTime;if(elapsedTime500){log。warn(putMessagenotinlockelapsedtime(ms){},bodyLength{},elapsedTime,msg。getBody()。length);}this。storeStatsService。setPutMessageEntireTimeMax(elapsedTime);if(nullresult!result。isOk()){this。storeStatsService。getPutMessageFailedTimes()。add(1);}});
  CompletableFuture的优点
  1、异步函数式编程,实现优雅,易于维护;
  2、它提供了异常管理的机制,让你有机会抛出、管理异步任务执行中发生的异常,监听这些异常的发生;
  3、拥有对任务编排的能力。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。
  文章来自https:www。cnblogs。comzzyangp16446007。html

iPhone14和智能手表等将会在今年的第一场苹果秋季发布会发布苹果的发布会现在距离越来越近,关于苹果的消息也就越来越多,现在有外媒表示苹果已经开始录制今年秋季发布会的内容视频,并表示这次的发布会将会开两场,就像之前iPhone13发布的那次。揭秘2699元起的华为nova10系列为何人气如此之高?在Z时代,智能手机市场的购买主力都是90后00后,而年轻人对于手机的要求也会更高。为了满足年轻人的用机要求,各大手机品牌在科技领域不断创新,而首当其冲的就是华为品牌。华为旗下的no陌生人半夜敲门!VOC猫眼可视智能门锁i系列为家门安全加分上周,小李晚上将睡之际突然听到自己的宠物狗在客厅疯狂吼叫,下床正准备大声呵斥的时候,发现大门的内门把手突然动了一下!随后调取监控发现一名陌生男子一直站在门外,偶尔下压门把手,试图开华为Mate50系列手机入网,三款机型疑似均不支持5G最近有消息称,华为Mate50系列将于9月12日发布,随着时间推进,网上也爆出了更多关于此款手机的消息,近日,华为有三款型号手机已入网,预计就是即将发布的华为Mate50系列手机。ampampquot金马影后ampampquot马思纯被周冬雨嘲讽走后门,千亿身家倒贴海王张哲轩年纪轻轻就躺赚1。2亿被京圈富婆周冬雨都直呼有钱的马思纯身价比想象的还要惊人当我们起早贪黑赶地铁上班为房租发愁时马思纯早已背靠大山住上了千万豪宅这里是京圈最高档成熟的香江别墅商圈有月升沧海同是才貌俱佳,文帝为何对越妃疼爱有加,却对宣后无感月升沧海凌不疑出征寿春的时候,在众多将士面前送给了程少商自己的私印。文帝与越妃宣皇后三人在高台上注视着这一幕。文帝称大庭广众之下成何体统,怎么就那么的难舍难分越妃则回怼表示也不知道好声音学员毕夏官宣生子,丈夫是同季同战队的张恒远2022好声音正在热播中,没想到好声音第二季的两个学员却传来好消息了,张恒远毕夏居然生孩子了。看到这里,不少网友一脸懵他们结婚了吗?确实,两人自好声音第二季出道以后事业发展平平,几韩国女星韩素希在社交平台发布求救信号,很像已逝崔雪梨近日,韩国女星韩素希因面部受伤所以处于修养状态,而她也在社交平台上分享自己的画作,被网友指出有些诡异,且不少网友担心其状况。同时还有网友指出,韩素希晒出的人物肖像图和已经逝世的崔雪从中国产品到中国品牌,看中国男士皮鞋品牌崛起的底气近日,中研产业研究院公布20222027年中国皮鞋行业深度调研及投资前景预测研究报告。报告显示,从20202021年皮鞋行业运行情况来看,行业销售收入有所增加。其中,2021年中国新消费观察乐高涨价带动国产积木品牌关注度上升,精品化IP化成崛起关键词封面新闻记者吴雨佳日前,乐高集团旗下超百款积木宣布涨价,涨幅最高达25,不少积木玩家纷纷将目光转向国产积木品牌。消费内容社区什么值得买数据显示,自1月1日至7月31日,站内国产积木汽车出口年内大增50。6,中国汽车都被谁买走了?中国车企正加速出海,已有企业开始订购船舶以运送汽车出口图视觉中国文财经记者郭怀毅编辑王静仪国内销量好国外出口佳,在刚刚过去的7月,汽车市场如同天气一样火热。8月11日,中国汽车工业
春天的中国,让世界再次惊艳阳春三月,春暖花开,这世间所能想象到的颜色都将在960万平方公里的土地上绽放,真可谓是目之所及,皆是美好!可恨的新冠疫情虽然阻挡了我们踏春的脚步,但是它挡不住我们那颗春暖花开,面朝春日限定浪漫,枫林花海郁金香惊艳绽放春天是远道而来的浪漫,而郁金香是春天最浪漫的文案不知从何时起,每过一段时间郁金香就会刮起一阵潮流。短视频的拍照教程,朋友圈的打卡照但这并非无迹可寻,毕竟,这种植物已经火了好几个世纪2010年,火箭选择送走麦迪,和尼克斯交易,他们得到了什么回报?2010年,因姚麦组合长期未能创造佳绩,火箭管理层决定打破僵局。当赛季交易市场大门刚开启,他们便与其余球队管理层开始了交易谈判。最终,在2月19日的时候,他们摁下决定键,将麦迪送去17分大胜,4人得分20!联盟第一不讲理,火箭惨遭3连败太阳照常升起联盟第一的菲尼克斯太阳,依旧在连胜的道路上,克里斯保罗的受伤,并没有影响到这支球队的脚步。佩恩很好的扮演起了指挥官的角色,布克和艾顿两大天才少年,也总是能够挺身而出。可平价游戏主播设备!网友该给的都给了什么设备玩手游最舒服?很多网友第一时间想到的是iPad或者游戏手机,但是小编今天为大家带来的是3。10日拯救者发布的第一款游戏平板拯救者Y700。拯救者作为联想旗下的游戏设备系列,吝啬鬼!火箭老板欲再购一支球队,赚钱才是目的吝啬鬼!火箭老板欲再购一支球队,赚钱才是目的作者小吕本人目前火箭队17胜51负排在西部道说第一的位置,但是这看起来很糟糕的战绩却和他们的建队目标非常的吻合。因为火箭队本赛季处于一个3。15曝光!智能设备危机潜伏万众瞩目的央视315晚会如期而至,各行各业又将掀起一场打假曝光风暴。近年来,网络安全逐步成为国民关注的领域,315晚会曝光所涉及的数据泄露数据滥用个人隐私泄露等问题的案例不断增多。双赢!火箭奇才酝酿1换1交易!小波特东契奇接班人互换东家?据美媒华盛顿邮报透露,奇才队在送走丁威迪之后,在控卫位置上严重缺乏人手,为了补强这个位置的空缺,奇才管理层准备在下赛季交易日开启时向火箭队提供一份1换1的交易报价,目的是得到火箭队大页面才看的清晰汉印U100作业打印机学习版轻体验云瑞将军图文版权声明图文均属原创,版权所有,未经许可或授权,禁止转载或引用。人生,就是在不断感受,不断体验,不断修行!感谢在这一路上给予我帮助的老师和朋友们。时代发展很快,孩子们的女赌神赵苏茜赢遍美国各大赌场,一局赢400万,33岁被焚尸相信不少八零后九零后包括七零后,都对港台曾经火爆一时的赌神系列电影印象深刻。不过比起运筹帷幄的男赌神们,我印象最深的还是邱淑贞在赌神2中一袭红衣的叼牌场面。不论电影还是现实,男赌王红极一时到无人问津仅仅一刹那,有人28岁就凉凉,有人蹭网红热度娱乐圈是最不缺乏帅哥美女的地方,尤其是在如今这个看脸的时代,不管你有没有才华,有没有实力,只要有一张优越的脸蛋或者傲人的身材,就很容易获得大众和市场的青睐,轻松一夜爆红。有不少我们
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网