范文健康探索娱乐情感热点
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

WebFlux前置知识(四)

  1.Backpressure
  Backpressure 在国内被翻译成背压,这个翻译在网上被很多人吐槽,我觉得大家的吐槽是有道理的,背压单纯从字面上确实看不出来有什么意思。所以松哥这里直接用英文 Backpressure 吧。
  Backpressure 是一种现象:当数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure。
  换句话说,上游生产数据,生产完成后通过管道将数据传到下游,下游消费数据,当下游消费速度小于上游数据生产速度时,数据在管道中积压会对上游形成一个压力,这就是 Backpressure,从这个角度来说,Backpressure 翻译成反压、回压似乎更合理一些。
  Backpressure 会出现在有 Buffer 上限的系统中,当出现 Buffer 溢出的时候,就会有 Backpressure,对于 Backpressure,它的应对措施只有一个:丢弃新事件。那么什么是 Buffer 溢出呢?例如我的服务器可以同时处理 2000 个用户请求,那么我就把请求上限设置为 2000,这个 2000 就是我的 Buffer,当超出 2000 的时候,就产生了 Backpressure。  2.Flow API
  JDK9 中推出了 Flow API,用以支持 Reactive Programming,即响应式编程。
  在响应式编程中,会有一个数据发布者 Publisher 和数据订阅者 Subscriber,Subscriber 接收 Publisher 发布的数据并进行消费,在 Subscriber 和 Publisher 之间还存在一个 Processor,类似于一个过滤器,可以对数据进行中间处理。
  JDK9 中提供了 Flow API 用以支持响应式编程,另外 RxJava 和 Reactor 等框架也提供了相关的实现。
  我们来看看 JDK9 中的 Flow 类:
  非常简洁,基本上就是按照 Reactive Programming 的设计来的:
  Publisher
  Publisher 为数据发布者,这是一个函数式接口,里边只有一个方法,通过这个方法将数据发布出去,Publisher 的定义如下:  @FunctionalInterface public static interface Publisher {     public void subscribe(Subscriber<? super T> subscriber); }
  Subscriber
  Subscriber 为数据订阅者,这个里边有四个方法,如下:  public static interface Subscriber {     public void onSubscribe(Subscription subscription);     public void onNext(T item);     public void onError(Throwable throwable);     public void onComplete(); } onSubscribe:这个是订阅成功的回调方法,用于初始化 Subscription,并且表明可以开始接收订阅数据了。  onNext:接收下一项订阅数据的回调方法。  onError:在 Publisher 或 Subcriber 遇到不可恢复的错误时调用此方法,之后 Subscription 不会再调用 Subscriber 其他的方法。  onComplete:当接收完所有订阅数据,并且发布者已经关闭后会回调这个方法。
  Subscription
  Subscription 为发布者和订阅者之间的订阅关系,用来控制消息的消费,这个里边有两个方法:  public static interface Subscription {     public void request(long n);     public void cancel(); } request:这个方法用来向数据发布者请求 n 个数据。  cancel:取消消息订阅,订阅者将不再接收数据。
  Processor
  Processor 是一个空接口,不过它同时继承了 Publisher 和 Subscriber,所以它既能发布数据也能订阅数据,因此我们可以通过 Processor 来完成一些数据转换的功能,先接收数据进行处理,处理完成后再将数据发布出去,这个也有点类似于我们 JavaEE 中的过滤器。  public static interface Processor extends Subscriber, Publisher { } 2.1 消息订阅初体验
  我们通过如下一段代码体验一下消息的订阅与发布:  public class FlowDemo {     public static void main(String[] args) {         SubmissionPublisher publisher = new SubmissionPublisher<>();         Flow.Subscriber subscriber = new Flow.Subscriber() {             private Flow.Subscription subscription;             @Override             public void onSubscribe(Flow.Subscription subscription) {                 this.subscription = subscription;                 //向数据发布者请求一个数据                 this.subscription.request(1);             }             @Override             public void onNext(String item) {                 System.out.println("接收到 publisher 发来的消息了:" + item);                 //接收完成后,可以继续接收或者不接收                 //this.subscription.cancel();                 this.subscription.request(1);             }             @Override             public void onError(Throwable throwable) {                 //出现异常,就会来到这个方法,此时直接取消订阅即可                 this.subscription.cancel();             }             @Override             public void onComplete() {                 //发布者的所有数据都被接收,并且发布者已经关闭                 System.out.println("数据接收完毕");             }         };         //配置发布者和订阅者         publisher.subscribe(subscriber);         for (int i = 0; i < 5; i++) {             //发送数据             publisher.submit("hello:" + i);         }         //关闭发布者         publisher.close();         new Scanner(System.in).next();     } }
  松哥稍微解释一下上面这段代码:  首先创建一个 SubmissionPublisher 对象作为消息发布者。  接下来创建 Flow.Subscriber 对象作为消息订阅者,实现消息订阅者里边的四个方法,分别进行处理。  为 publisher 配置上 subscriber。  发送消息。  消息发送完成后关闭 publisher。  最后是让程序不要停止,观察消息订阅者打印情况。  2.2 模拟 Backpressure
  Backpressure 问题在 Flow API 中得到了很好的解决。Subscriber 会将 Publisher 发布的数据缓存在 Subscription 中,其长度默认为256,相关源码如下:  public final class Flow {     static final int DEFAULT_BUFFER_SIZE = 256;     public static int defaultBufferSize() {         return DEFAULT_BUFFER_SIZE;     }     ... }
  一旦超出这个数据量,publisher 就会降低数据发送速度。
  我们对上面的案例进行修改,如下:  public class FlowDemo {     public static void main(String[] args) {         SubmissionPublisher publisher = new SubmissionPublisher<>();          Flow.Subscriber subscriber = new Flow.Subscriber() {             private Flow.Subscription subscription;              @Override             public void onSubscribe(Flow.Subscription subscription) {                 this.subscription = subscription;                 //向数据发布者请求一个数据                 this.subscription.request(1);             }              @Override             public void onNext(String item) {                 System.out.println("接收到 publisher 发来的消息了:" + item);                 //接收完成后,可以继续接收或者不接收                 //this.subscription.cancel();                 try {                     Thread.sleep(2000);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 this.subscription.request(1);             }              @Override             public void onError(Throwable throwable) {                 //出现异常,就会来到这个方法,此时直接取消订阅即可                 this.subscription.cancel();             }              @Override             public void onComplete() {                 //发布者的所有数据都被接收,并且发布者已经关闭                 System.out.println("数据接收完毕");             }         };         publisher.subscribe(subscriber);         for (int i = 0; i < 500; i++) {             System.out.println("i--------->" + i);             publisher.submit("hello:" + i);         }         //关闭发布者         publisher.close();         new Scanner(System.in).next();     } }
  一共修改了三个地方:  Subscriber#onNext 方法中,每次休息两秒再处理下一条数据。  发布数据时,一共发布 500 条数据。  打印数据发布的日志。
  修改完成后,我们再次启动项目,观察控制台输出:
  可以看到,生产者先是一股脑生产了 257 条数据(hello0 在一开始就被消费了,所以缓存中实际上是 256 条),消息则是一条一条的来,由于消费的速度比较慢,所以当缓存中的数据超过 256 条之后,接下来都是消费一条,再发送一条。  2.3 数据处理
  Flow.Processor  可以像过滤器一样,对数据进行预处理,数据从 publisher 出来之后,先进入 Flow.Processor  中进行预处理,然后再进入 Subscriber。
  修改后的代码如下:  public class FlowDemo {     public static void main(String[] args) {          class DataFilter extends SubmissionPublisher implements Flow.Processor{              private Flow.Subscription subscription;              @Override             public void onSubscribe(Flow.Subscription subscription) {                 this.subscription = subscription;                 this.subscription.request(1);             }              @Override             public void onNext(String item) {                 this.submit("【这是一条被处理过的数据】" + item);                 this.subscription.request(1);             }              @Override             public void onError(Throwable throwable) {                 this.subscription.cancel();             }              @Override             public void onComplete() {                 this.close();             }         }          SubmissionPublisher publisher = new SubmissionPublisher<>();         DataFilter dataFilter = new DataFilter();         publisher.subscribe(dataFilter);          Flow.Subscriber subscriber = new Flow.Subscriber() {             private Flow.Subscription subscription;              @Override             public void onSubscribe(Flow.Subscription subscription) {                 this.subscription = subscription;                 //向数据发布者请求一个数据                 this.subscription.request(1);             }              @Override             public void onNext(String item) {                 System.out.println("接收到 publisher 发来的消息了:" + item);                 //接收完成后,可以继续接收或者不接收                 //this.subscription.cancel();                 try {                     Thread.sleep(2000);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 this.subscription.request(1);             }              @Override             public void onError(Throwable throwable) {                 //出现异常,就会来到这个方法,此时直接取消订阅即可                 this.subscription.cancel();             }              @Override             public void onComplete() {                 //发布者的所有数据都被接收,并且发布者已经关闭                 System.out.println("数据接收完毕");             }         };         dataFilter.subscribe(subscriber);         for (int i = 0; i < 500; i++) {             System.out.println("发送消息 i--------->" + i);             publisher.submit("hello:" + i);         }         //关闭发布者         publisher.close();         new Scanner(System.in).next();     } }
  简单起见,我这里创建了一个局部内部类 DataFilter,DataFilter 继承自 SubmissionPublisher 并实现了 Flow.Processor 接口,由于 DataFilter 继承自 SubmissionPublisher,所以它也兼具 SubmissionPublisher 的功能。
  在 DataFilter 中完成消息的处理并重新发送出去。接下来定义 publisher,让 dataFilter 作为其订阅者,再定义新的订阅者,作为 dataFilter 的订阅者。
  最终运行效果如下:
  3.小结
  好啦,这就是今天和大家介绍的 Java9 中的 Reactive Stream,那么至此,我们的 WebFlux 前置知识差不多告一段落了,下篇文章开始,正式开整 WebFlux。
  转载自:https://mp.weixin.qq.com/s/BfgQ760h_WeUOBRrgx1ubA

DNF为什么有玩家称剑魂传说勋章的守护珠要打双属强,不可以打负重,命中,暴击吗?DNF传说勋章是所有装备中比较重要的装备,搭配完美守护珠的情况下对于伤害能带来23的提升,一般来说守护珠的选择都是属强,暴击,移动,攻速释放,但是对于剑魂来说却是个例外。如果你去翻为什么大蛇丸忍术是无属性的?你好,这个问题就让小火来回答吧!首先在动漫火影忍者中,大蛇丸的官方查克拉属性设定是火风雷土水阴阳。而并非是无属性的。其次当我们回顾大蛇丸的历次战斗后,却惊奇的发现大蛇丸在战斗中所使热血传奇为何说30区是变态区?这里爆的装备属性你知道是多少吗?我们玩游戏时经常会听到一些特殊的服务器,什么变态服村服鬼服等等。当年盛大的热血传奇也存在这样的服务器,被玩家戏称为变态区。这些区服以极品装备泛滥而闻名,普通玩家做梦都得不到的极品,论WCG赛事的影响力,重启的WCG会让魔兽争霸3重新点燃赛事氛围吗?2019年,对于魔兽争霸爱好者来说,将是兴奋的一年具有电竞奥运会之称的WCG,自2013决赛结束后停办,暂离六年之久,今重启赛事,war3回归WCG正式比赛项目同时,魔兽争霸3重制LOL新手推荐去哪个区?先来给大家介绍一下联盟各大区英雄联盟国服总共有27个区电信艾欧尼亚祖安诺克萨斯班德尔城皮尔特沃夫战争学院巨神峰雷瑟守备钢铁烈阳裁决之地黑色玫瑰暗影岛均衡教派水晶之痕影流守望之海征服3。50棱镜更新,迈向次世代的无人深空还能再战十年近日画饼界巨人无人深空,迎来了最新的大型免费更新,该版本命名为棱镜(Prism),是游戏的3。50版本。这次更新有别于此前游戏每次更新都会添加新的玩法,或者干脆改变玩法的特点,此次梦嫂到底有多爱梦泪,看到她直播间取得一个标题,情侣羡慕极了在KPL的众多队伍中,AG超玩会有着很多的粉丝。之所以造成这种情况,主要还是因为2点原因。首先AG的成立时间比较早,在第一届KPL的时候,他们就已经存在了。其次AG超玩会的实力很强净化虽好可别乱用!遇到这3位英雄开净化,会适得其反害死你在王者荣耀中,净化是一个出场率很高的召唤师技能但是你知道吗?净化还隐藏着一些神奇作用,比如当你受到防御塔伤害的时候,使用净化可以一瞬间无敌,免疫掉防御塔伤害,不过呢净化虽好也需要慎体验服地震级更新,扁鹊米莱狄史诗级加强,登顶T0的梦奇凉凉在王者荣耀中,体验服又成为先行服,是正式服的先行版,也是正式服上线之前各项数据测试的重要依据。因此,充分了解体验服的更新信息,有助于我们更好地掌握未来版本的趋势。尤其是在S23赛季吃鸡新步枪FMR全面解析,自带8倍子弹不下坠,近战化身喷子大家好,欢迎来到由小鱼干开讲的吃鸡新鲜事速报,6月体验服终于是更新了,这个版本光子继续保持有多花哨搞多花哨的做法,弄出了重启未来的新版本。正如大家所料,重启未来玩法再次加入经典海岛热血传奇总是打不到自己用的装备,也别总怀疑是GM捣鬼那些年我们一起玩过的传奇第十二期奇怪的爆率不知道喜欢玩传奇的小伙伴们你们在玩传奇的时候,遇没遇到过这种事情,打到的装备很多时候都不是自己用的!很多玩传奇的小伙伴们相信都有类似的经历
Faker一打四击碎KZ翻盘梦,连续单杀胖将军,摘掉飞皇名号LCK春季赛季后赛,在SKT与KZ的季后赛争夺战中,KZ因为自己的失误,白白的让SKT拿到了两场的胜利,尤其是第二局,KZ本来前面就有了八千多的优势,但是在中期的时候因为deft以QQ飞车永久A车不再是梦想,3天攒1W点券很轻松,老玩家很无语QQ飞车永久A车不再是梦想,3天攒1W点券很轻松,老玩家很无语QQ飞车是无数90后青少年的童年梦想,因为在QQ飞车火爆的时候,90后们刚好处在上学的青春年华,这个时候的青少年时间比只狼里有哪些实力弱到手残也能随便虐的BOSS?只狼里的BOSS数量众多,而且普遍很具挑战性。但也并非个个都是精英,其中也有一些滥竽充数的假BOSS。它们看上去气势汹汹,其实就是个战五渣。只要你不怂,很轻松就可以把它们干翻。雾隐刺激战场它比M416伤害高,还比M416稳,近期非常推荐大家使用!雷猴啊!我是阁主!说到刺激战场中那个枪稳,首先想到的就是M416了,其实还有一把枪,它不仅比M416的伤害高,而且比M416稳,近期非常推荐大家使用!它就是DP28(大盘鸡),这把剑网3!这么多年了,玩家粘性为什么这么大?剑网3做为一款网络游戏,到现在历经9年有余。可是为什么这款游戏到现在还是有很多新人加入?说说我的游戏经历吧我其实在很多年前就知道这款游戏了,记得当初在网吧看上了这款游戏,在游戏里轻任天堂我不做手游,五年后手游真香据新浪科技消息,来自SensorTower商店情报预估数据显示,上季度任天堂手游在全球AppStore和GooglePlay的总营收达到8500万美元,较2018年第一季度8000绝地求生陪玩的小姐姐火爆了,月收入超六位,你怎么看呢?绝地求生陪玩的小姐姐火爆了,月收入超六位数,你怎么看呢?绝地求生的横空出世,真的让吃鸡类游戏火爆了相当长的一段时间!随着也出现了一堆相关的游戏,像刺激战场荒野求生等等!也是相当火爆王者荣耀1级时打蓝谁最快?猴子16秒,李元芳13秒,他只用8秒王者荣耀中,打野英雄通常都会围绕buff开局。打野英雄根据自己的英雄特性暴君的位置,决定蓝开或者红开。一般来说,对蓝buff依赖程度比较高的打野,都喜欢蓝开。那1级时打蓝速度最快的LOL智慧末刃改版改变排位生态蛮易信又能无脑上分了!在即将到来的9。7版本中,有一件重要的装备得到了重做,这件装备就是之前比较冷门的智慧末刃,在重做之后,这件装备所提供的属性和特殊效果,将会让一部分英雄得到非常大的加强。接下来我们就网友晒王思聪吃热狗的皮肤,没想到王思聪直接回复你号没了!八年逐梦终夺冠,iG冠军皮肤来了!!!这是iG电子竞技俱乐部的冠军皮肤也是我们的冠军皮肤,thisisforus,ForLPL英雄联盟赛事iGNing的青钢影,回城会亮出羽翼,抱起小学生爱吃鸡,父亲用纸皮制作吃鸡神器,试用一次后很满意!大家好,我是南美小猴子!绝地求生刺激战场这款手游上线已经有一年了,很多玩家从去年2月份公测就一直玩到现在,其中包括了学生上班族和宝妈都有不少吃鸡爱好者。最近有一位小学生粉丝投稿时和