JavaJava8的CompletableFuture介绍
【参考】 Introduction to CompletableFuture in Java 8:www.youtube.com/watch?v=Imt… 1. non-blocking操作
这里的asynchronous操作即non-blocking操作,非阻塞式的操作,表示主线程可以提交task给独立的线程,独立的线程可以自行运行该task,主线程则会继续运行别的业务逻辑代码。
提交的task可以是Runnable实现,也可以是Callable实现(有返回值):
2. 通过ExecutorService进行task的提交
ExecutorService 提交Callable的task后,可以通过future.get() 拿到task的执行结果,get() 方法是阻塞式的获取结果,即如果该task还没有执行完毕,此时main thread需要等待,直到task执行完毕并返回结果。
ExecutorService存在的问题是如果一次性提交了4个tasks,那么在拿结果的时候,需要依次拿(future.get()),但这个方法是阻塞式的,假如task3或4提交完成了,也需要等待main thread从task1先拿到结果:
更为复杂的例子,假如处理1个order分5个步骤,即拿到order、补齐order必要的属性、付款、发送order,最后是邮件发送:
代码如下,因为get是阻塞式的,所以for循环里order1的处理,会影响后续order的速度,因为main thread有可能会卡在order1,而这时候后续的order可能已经处理完毕,这也使得使用多线程处理变的没有必要,因为order的处理顺序依旧是线性执行的: public void orderLifeCycle() { ExecutorService service = Executors.newFixedThreadPool(10); for (int i = 0; i < 5; i ++) { try { Future future = service.submit(getOrderTask()); Order order = future.get(); // 阻塞 Future future1 = service.submit(enrichTask(order)); order = future1.get(); // 阻塞 Future future2 = service.submit(performPaymentTask(order)); order = future2.get(); // 阻塞 Future future3 = service.submit(dispatchTask(order)); order = future3.get(); // 阻塞 Future future4 = service.submit(sendEmailTask(order)); order = future4.get(); // 阻塞 } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
而我们想要的效果是,order本身的flow是一个整体(虽然它可能分了很多个Callable步骤),但order与order之间,不应该相互等待,比如order1可以由某个线程执行并正在执行sendEmail方法,而order2可以由另外一个线程执行并正在执行payment方法,即每个order都是独立的流水线(independent flows):
3. CompletableFuture
可以使用 CompletableFuture 来实现上述想要的期望:public void orderSubmit() { for (int i =0; i < 5; i ++) { CompletableFuture.supplyAsync(() -> getOrder()) .thenApply(order -> enrich(order)) .thenApply(order -> performPayment(order)) .thenApply(order -> dispatch(order)) .thenAccept(order -> sendEmail(order)); } } 3.1 supplyAsync方法
supplyAsync() 方法是CompletableFuture 中的static 方法,从定义可以看到它接受Supplier 接口的参数,Supplier 接口位于java的java.util.function 包中,也是Java 8引入的函数式接口,与之配套的还有另外两个接口:Function 和Consumer ,具体来说:Supplier 接口:不接收参数,但返回一个对象(用消息组件来类比,可以看作是生产者)。Function 接口:接收参数后,再返回另一个对象(用消息组件来类比,即接收一个消息,进行处理后再发送至另一个地方)。Consumer 接口:接收参数,但不会再返回对象(用消息组件来类比,可以看作是消费者)。public static CompletableFuture supplyAsync(Supplier supplier) { return asyncSupplyStage(ASYNC_POOL, supplier); }
在我们order的例子中, supplyAsync(() -> getOrder()) 即通过线程来处理拿到一个order的业务,然后返回。
supplyAsync() 除了接收suppilier函数外,还可接收线程池,即传入自定义的线程池来处理。如果没有指定,则会创建一个:Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor() 。3.2 thenApply方法
thenApply() 方法接收的是Function 接口,Function 接口在上面介绍过了,它能接收一个参数,并返回一个结果。在我们的order例子中,它接收了getOrder() 返回的order参数,并进行处理后(通过方法enrich(order) )再返回order结果。public CompletableFuture thenApply(Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); }
与 thenApply() 方法类似的还有一个方法叫thenApplyAsync() ,两者的区别是:thenApply() 会沿用上一个方法相同的线程。thenApplyAsync() 会使用另一个线程来执行方法体内的task,也可以传入线程池。
具体来说,假设我们的 getOrder() 方法是IO开销比较大的(比如需要从DB里查询),而我们的enrich(order) 方法只是一些计算相关,即CPU开销比较大,那么我们在执行的时候,可能会用两个不同的线程池来执行,IO开销大的可以设置线程数多一些。public void orderSubmit() { ExecutorService cpuService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); ExecutorService ioService = Executors.newCachedThreadPool(); for (int i =0; i < 5; i ++) { CompletableFuture.supplyAsync(() -> getOrder(), ioService) .thenApplyAsync(order -> enrich(order), cpuService) .thenApply(order -> performPayment(order)) .thenApply(order -> dispatch(order)) .thenAccept(order -> sendEmail(order)); } } 3.3 thenAccept方法
thenAccept() 方法接收Comsumer 接口(接收一个参数,但没有返回值)。
与 thenAccept() 方法相对的,有thenAcceptAsync() 方法,同样的,该方法能传入一个线程池,使得方法里的task可以在传入的线程池中进行提交执行。public CompletableFuture thenAccept(Consumer<? super T> action) { return uniAcceptStage(null, action); } 3.4 exceptionally方法
异常处理。如果 getOrder() 或enrich(order) 或performPayment(order) 中出现了一些异常,我们可以使用exceptionally() 来捕获异常,然后返回别的类,这样dispatch(order) 方法就知道在前面的步骤中出现了异常(比如拿到参数后先进行类型判断,是normal的Order 还是FailedOrder 。CompletableFuture.supplyAsync(() -> getOrder(), ioService) .thenApplyAsync(order -> enrich(order), cpuService) .thenApply(order -> performPayment(order)) .exceptionally(e -> new FailedOrder()) .thenApply(order -> dispatch(order)) .thenAccept(order -> sendEmail(order)); 3.5 其它方法,如thenCombine
这个可以连接两个stage的结果。但这个写起来比较复杂,推荐java的另一个框架,叫 Reactor 框架,实现起来可能更加便捷,代码也更易读。4. 总结
CompletableFuture 中suplyAsnc() 或thenApply() 或thenAccept() 都不会阻塞main thread,如果我们有100个order flow需要处理,那么每个order的流程都不会相互阻塞。
给易建联打工,出任易建联训练营总教头,王仕鹏的年薪有多少呢?广东五虎曾经带给我们无数的记忆和感动,时光荏苒,曾经的青葱少年,只剩下三旬老汉易建联还在为中国男篮遮风挡雨,朱芳雨杜锋继续为广东的辉煌延续努力着,陈江华回家老婆孩子热炕头顺便做着小
一代神机小米8钉子户还有多少?小米8不是一代神机吧?小米6是神机。小米6钉子户路过,2017年9月入手,到今天为止,除了系统无法更新,电池不太耐用了之外,没有啥毛病。耐摔耐磨,小米6实在不舍得换现在还在坚持小米
有哪些面膜是值得回购的?不管是懒宅还是现充,姑娘们的冬季模式里,一定少不了穿着厚睡衣,敷面膜煲电视剧这夜间生活三件套。双十一过了怎么办?双十二囤上一波面膜也是赶得及的呢!如果你还在一堆高人气爆款面膜前摇摆
泰安新泰的房子是未来第二个鹤岗的房价吗?山东省泰安市新泰市新汶良庄煤矿西岭社区的房子3万元一套都卖不出去,想抄房的去买吧,保你赔个底朝天。新泰市的煤炭都快挖没了,有很多塌陷地,新汶矿业集团把很多煤矿工人都分流到内蒙,山西
20万左右的君越迈腾和雅阁,选哪个比较好?这三辆车有两辆我都开过很长时间,迈腾稍微开的少一点,目前来说这三辆车都是现阶段合资车型比较热门的,也都是各自品牌主打的车,三辆车型优惠力度也很大,配置选择也都很丰富,下面扳手爸爸就
现在的昆明滇池,还值得一去吗?现在的滇池还真值得去看一看,尤其是每年的十一月至次年的二月,成群的红嘴欧在滇池过冬。与游人亲密接近嬉戏。现在的水质也较从前有明显改善。滇池大坝和海埂公园都是免费开放的景区,海欧到来
洛阳612所(中国空空导弹研究院)怎么样?西工大硕士毕业好进吗?1014基地,承担kkdd研发及生产,位置解放路2202厂,承担kkdd生产,90年代初由汉中搬至洛阳涧西丽春西路90年代中期,202厂与014基地合并为新的612所3中航光电所(
中国女篮的功勋教练李亚光在哪里?为什么不执教女篮?李亚光作为中国篮球著名的少帅和女篮最成功的主教练,在球员和教练期间为中国篮球做出了巨大的贡献,在球员时期,李亚光帮助中国男篮获得第十届世锦赛的第九名,在教练期间,李亚光帮助中国女篮
同为亚洲人,为什么日本韩国足球会比中国高一个档次?原因在于同样是砸钱,日本砸钱都砸对了地方。1。烧钱有道日本J联赛开张之时,各个职业球队背后都有世界级的大企业作金主。日本人用巨量的资金招兵买马,买来的都是过了职业黄金期的一大票过气
你对王一博的印象是怎样的?说说我的真实感受,看有翡之前,每次翻到他的视频我都会直接划过去,看了有翡之后,我突然发现,咦,这个人不错哦,然后就开始翻看有关他的视频,看到他参加的这就是街舞,就彻底哇塞了呲牙,本
4am战队到底稳不稳定?4am在国内算不算顶尖的吃鸡战队?4AM战队应该是PUBG绝地求生上最具争议性的战队了,强的时候根本不讲道理,弱的时候会让粉丝心痛到无法呼吸。上下限极高,只要赛事没有结束,大家永远不知道下一刻会发生什么。在PCPI