范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

Java9特性响应式流(ReactiveStream)

  什么是流
  形象的比喻来说就是如同水一样绵绵不绝的数据形式。而抽象点来说,是有一个生产者(source)产生,由一个或者多个消费者(sink)消费的数据元素(item)序列。那从这个抽象的描述就可以看出,使用流来承担数据交互的模式就是咱们经常说的生产者/消费者模型,而这种模型也可以称之为发布者/订阅者模型(后文将使用这个名字,因为JDK中使用的是这个名字)。
  对于流数据来说,一般有两种的数据流转方式: 拉(pull)数据模式:订阅者向发布者索要数据。 推(push)数据模式:发布者向订阅者推送数据(push)。
  这两种模式都是描述的单次信息传递的方式。如果发布者产生信息的速度和订阅者消费信息的速度一致的话,那这两种方法都将是十分有效的数据流转方式。 流有什么问题
  流的问题在于当两端的速度不匹配的时候(考虑一下各种mq主要处理的问题削峰平谷)。而速度的不匹配自然存在以下两种情况:
  订阅者消费速度快
  这种情况的时候会出现订阅者有处理能力了,但是订阅者无信息可以处理的情况。如果这种时候是同步的调用模式,则订阅者将会阻塞,直到有新的信息可以进行处理。而如果这时候是异步的信息处理模式,则订阅者可以在无消息处理的时候挂起,直接切换到其他的任务处理中(对于多核CPU的多线程来说)。也就是说,对于这种情况,比较理想的是 异步推模式 。
  发布者发布速度快
  当发布者发布速度快的时候,会发生订阅者来不及处理数据的情况。如果是同步的情况下发布者会一直阻塞,而如果是异步模式则对于订阅者来说有两种处理方式(可以类比一下线程池设计)可以处理: 损失数据:丢弃数据(在有限的队列缓存已经满了的情况下) 不损失数据:加入队列缓存数据(订阅者需要有拥有无限的缓冲队列暂存数据,以确保不会溢出)
  而还有另一种需要发布者加入的处理方式叫做背压(backpressure)。背压的实现方式是:由订阅者发出信号,让发布者降低信息的发布速度,从而让信息速度之间匹配。背压的优点是同样可以处理信息流速不一致问题。而更有意思的是,这时候信息的处理策略可以由发布者来选择: 损失数据:丢弃数据(在有限的队列缓存已经满了的情况下) 不损失数据:加入队列缓存数据(发布者需要有拥有无限的缓冲队列暂存数据,以确保不会溢出)
  没错,这两种情况是和订阅者一致的,不过选择权则由订阅者变成了发布者。
  也就是说,在发布者发布速度快的时候,要么发布者直接同步阻塞,要么可以先根据消息的主要关心方(是发布者还是订阅者)来确定是否使用背压,然后再根据数据的类型判断是否接受数据丢弃(不丢弃可能会导致系统崩溃)。往往我们的发布者可以由上层的mq或者程序的应答机制保护消息的可用性。
  那么结论是什么,我们需要异步非阻塞(订阅者消费快)、以及背压(发布者发布快)。 什么是响应式流
  Reactive Streams 是一项非阻塞背压的异步流处理标准的倡议,当然,如果我这个翻译看的不清楚的话就还是看原文吧(http://www.reactive-streams.org/)。
  Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.
  响应式流(Reactive Streams)概念被提出是在2013年,旨在处理上一小节中由于流速问题而产生的几种问题:订阅者订阅者的阻塞、由订阅者(数据下游)来选择是依赖无限队列(数据不丢)或直接丢弃数据。
  而对于一项标准而言,它的目是自然是用更少的协议来描述交互。而响应式流的模型也是十分简单: 订阅者异步的向发布者请求N个元素。 发布者一步的向订阅者发送M(0
  对他们的定义为:
  Publisher(发布者)
  是一个假定上游会产生无限数据的信息发布者。他们会向有发送请求的订阅者推送元素。
  Subscriber(订阅者)
  订阅者会从发布者那里领取令牌,然后根据令牌向发布者发送"获取请求"。同时当发布者部分准备好元素的时候,会通过令牌对订阅者进行调用,进行数据消费。
  Subscription(令牌)
  发布者和订阅者通过令牌来进行信息通信的约定。主要有:开始订阅、信息获取、信息推送、异常、结束、取消订阅。
  Processor(处理器)
  可以通过处理器连接发布者、订阅者以及其他处理器。Processor本身同时继承了Publisher与Subscriber接口,所以可以对元素进行处理转发。主要用于让数据从T转换为R。同时,由于Processor本身也可以接入Processor,所以Processor可以组成链来对数据进行处理。
  一次完整的调用流程大概可以描述为: 订阅者向发布者发送订阅请求。 发布者根据订阅请求生成令牌发送给订阅者。 订阅者根据令牌向发布者发送请求N个数据。 发送者根据订阅者的请求数量返回M(M<=N)个数据 重复3,4 数据发送完毕后由发布者发送给订阅者结束信号
  而Java API中的接口如下所示,其中所有的方法都是void,因为所有的方法都是异步执行的。 public interface Publisher {     //用于1.中订阅请求     public void subscribe(Subscriber<? super T> s); } public interface Subscriber {     //用于2.中回调发送令牌     public void onSubscribe(Subscription s);     //用于3.用于接受4中发送过来的数据     public void onNext(T t);     //用于3,4,5接收中间异常了之后的调用     public void onError(Throwable t);     //用于6.中结束信号的回调     public void onComplete(); } public interface Subscription {     //用于3.的发送请求N个数据     public void request(long n);     //用于3,4,5订阅者异步的向     public void cancel(); } public interface Processor extends Subscriber, Publisher { }JDK中的响应式流
  Java API中的流程使用方式看起来比较简单,但API背后的具体实现由于是全异步交互以及涉及具体背压处理而很困难。而JDK9中为用户提供了Publisher接口的简单实现,让开发人员可以基于此来扩展出自己的实际需求。
  JDK 9中的响应式流功能提供在java.util.concurrent包下,全响应式流的API接口被封装到 Flow接口中,其中包括需要使用的接口以及静态方法,关于上一小节中接口方法的详细描述也可以参见该接口上的方法描述。其中的静态接口为: Flow.Processor Flow.Publisher Flow.Subscriber Flow.Subscription
  除去上一小节说的4个接口外,Flow中还包含了一个默认方法defaultBufferSize(),用于返回默认的令牌中的缓冲区大小,而默认的值为DEFAULT_BUFFER_SIZE = 256。
  除去Flow外,其中还有一个刚刚说到的Publisher的简单实现类SubmissionPublisher。该接口在实现了publisher之外还实现了AutoCloseable接口,所以可以直接用try块来进行资源的管理。
  尽管JDK 9中没有提供Subscriber的简单实现,但是在SubmissionPublisher中提供了一个consume(Consumer<? super T> consumer)方法,用于让开发人员可以直接消费消息发布者的所有元素。实际上是在内部实现了简单的Subscriber为ConsumerSubscriber,但是并不是public的,所以不能直接使用。 简单的例子
  根据JDK 9中提供的SubmissionPublisher咱们来写一个小例子。     public static void main(String[] args) {         // 用于承接返回值的任务         CompletableFuture task;         // try-with-resource来控制资源         try (SubmissionPublisher publisher = new SubmissionPublisher<>()) {             System.out.println("默认缓冲容量: " + publisher.getMaxBufferCapacity());             // 传入打印方法来处理元素             task = publisher.consume(System.out::println);             // 打印数字,调用发布者进行信息处理             IntStream.range(1, 6)                     .forEach(publisher::submit);         }         if (task != null) {             try {                 // 当所有订阅者处理完毕后调用                 task.get();             } catch (InterruptedException | ExecutionException e) {                 e.printStackTrace();             }         }     }
  在这个例子里面进行了以下几件事。 声明一个CompletableFuture用于捕获后续的处理事件。 开启资源用于进行流消息订阅 设置流的订阅方法(订阅者) 进行发布者的信息发送 阻塞主方法等待处理完毕后结束
  其中pub.getMaxBufferCapacity()会打印默认的缓存空间256。在调用publisher.consume的时候,是奖传入的Consumer在内部封装成一个Subscribr的简单实现类,用于订阅信息的发送,实时上后续数据的订阅者就是在这步创建的。
  当publisher进行调用的时候,调用submit发送数据,publisher有两个方法用于发送数据,一个是submit,一个是offer。两个方法下面实际都是调用的doOffer方法,所以,offer方法提供了置顶延迟时间后丢弃的策略,而submit是offer的简单实现,是一致阻塞不丢弃。 最后
  不得不说响应式流是java中响应式编程的基础,而JDK 9中也提供了Reactive Streams的"简单"实现。之所示简单是打引号的是因为实际上还有点绕的,有兴趣的同学可以追一下SubmissionPublisher的实现,有一些思想的经典实现,比如用整数中的7位来作为状态机。在下一篇中我们再聊一下JDK 9中的数据交互顺序。

Deepin操作使用体验最近,小编尝试使用了我国团队研发的Linux发行版Deepin操作系统感觉还可以,和小伙伴们分享一下使用感受。系统安装步骤比较简单,网上搜索一下有很多详细教程,可以选择安装在虚拟机传奇耳机升级亮相Bose推出全新QuietComfort45消噪耳机2021年9月29日,Bose发布全新QuietComfort45消噪耳机,取代享誉盛名的QuietComfort35,带来即刻安静的全新消噪体验新增通透模式,一键清晰感知周遭具有绝绝子,升级iOS15。1Beta2后,iphone13多项BUG被修复果不其然,时隔一周苹果操作系统又准时更新了,此次带来的是iOS15。1Beta2,版本号为19B5052f。在使用近一天后,和大家分享我的体验。作为iphone13的首发用户,手机运动健康管理出色潮流百搭升级小米WatchColor2体验凭借多彩时尚的设计和丰富的个性化表盘,小米手表Color去年受到了市场的广泛好评。9月27日小米新品发布会上,新一代的小米WatchColor2正式到来,相信大家也都很好奇新品是否今年校招社招,最值得加入的公司有哪些?互联网领域,推荐腾讯阿里百度滴滴美团这些互联网大厂。软件领域,推荐万兴科技金山360这些第一梯队的软件大厂。像万兴科技(300624。SZ)为创意软件A股上市,万兴喵影Wonder你敢信?iPhone14挖孔屏长这样网友慌了不可能的手机中国新闻虽然现在iPhone13系列才刚发布不久,但是网上已经有了不少关于iPhone14系列的爆料。目前,爆料多是集中在iPhone14系列的屏幕设计上。不少爆料都显示,明年2021深圳少儿医保怎么扣费?一。在园和在校的学生,由学校统一收取。二。非在校幼儿从银行卡自动扣费,9。20前需要往扣费银行卡存入大于480元。扣费银行卡(1)在2016年4月14日之后激活了参保人本人的金融社现在社会上有许多50多岁的人,没有缴纳社保,老了怎么办?熬日子吧。社会上有很多50多岁的人(特别是农民),他们没有钱去买社保(不是不想买)。我们当地的现实情况是,60岁每月可以领近200元的养老金,说够用那是违背良心讲假话。钱不够用也没数据采集与分析神策数据VS友盟统计用户画像源于数据采集,在技术上名词可叫数据埋点。根据用户在使用App功能时候,对某些关键性的业务功能操作进行轨迹追踪,日志记录。交互事件,以及页面停留时长将被无痕记录。通过后台异步微软正式推送Windows11,符合条件的Win10用户可免费升级文电脑手机那些事儿据微软的最新消息,微软今日宣布开始正式向用户推送Windows11,等待多时的新版桌面操作系统终于要和大家见面了。按照微软官方的发布计划,满足基本硬件条件的Win最受年轻人关注的华为nova9系列手机发布100W超级快充续航无忧当前年轻群体在网络社交急速发展与强大的影响下,早已不再满足于单纯的文字语言交流恰此时机,华为正式发布了全新一代华为nova9系列旗舰手机,从更优秀的画面质量更流畅的拍摄体验更丰富的
反华势力又攻上来了,中国人警惕起来亚太日报牧之即将过去的这一周,以美国为首的境外反华势力们可谓是狂欢的一周。芯片产业巨头英特尔和国内份额巨大的仓储型超市山姆纷纷或明或暗的对我国新疆所谓的人权问题和强迫劳动发了难。英中车时代电气发布国内首款基于自主碳化硅大功率电驱产品澎湃财讯12月26日,由中车时代电气CCar平台孵化的全新一代产品CPower220s正式发布,该产品是国内首款基于自主碳化硅(SiC)大功率电驱产品,具有系统功率密度高系统损耗少内卷太内卷了十二月份刚换工作了,去了一家做机器人的公司。现在是周六的晚上2115,周围还有同事在加班,我得走了,家离的远,明天还得继续。周四晚2145和长城汽车(SH601633)子品牌沙龙汽花呗借呗是不是高利贷?支付宝有放高利贷的嫌疑吗?花呗分期是一场贷款诱导游戏吗?蚂蚁金服一年级几百亿的收入是怎么从各位身上所赚到的呢?为什么说蚂蚁金服是一场盛大的全民收割游戏呢?我们接下来分析一下。说蚂蚁国资60亿投资入场,成为第二大股东,蚂蚁集团真正大股东是谁?支付宝和微信都是国民级别的应用,支付宝母公司蚂蚁集团曾经是全球最大的独角兽公司,备受各方资本的期待和青睐,但是去年IPO暂缓之后至今也没有消息。不过,近期蚂蚁旗下公司获得了国资的投浅谈OPPOReno7Pro与VIVOS12Pro的配置优劣今年的最后一个月份,国内的手机市场是格外的热闹。继OPPO在月初发布发布的Reno7Pro之后,在接近月未的时候VIVO就发布了S12Pro。两款手机都是针对中端手机市场的布局,在关于要联想倒还是联想管理层倒的一些看法我的看法是,联想要倒,但不是立刻倒,管理层要倒,而且是要立刻倒。所谓上行下效,柳氏在联想深耕了这么些年,上上下下都已经换成了自己人,要更换管理层就要上上下下都换掉,这种情况还不如从只要钱不要脸!人民日报表态,联想该反思了热点科技咖原创谁也没想到,曾经被誉为国货之光的联想集团会因为产业大众化被推上风口浪尖,而作为联想创始人之一的商业传奇柳传志也因此成为了人们纷纷议论的焦点。在国内科技企业界,民间素来记一次。NET程序的性能优化实战(3)深入。NET源码前言前两篇文章part1和part2基本上理清了IsSplitter()运行缓慢的原因在函数内部使用了带Compile选项的正则表达式。但是没想到在IsSplitter()内部使用处理异常的标准姿势,你学废了吗?前言在Java中应该如何处理异常,这个话题看似简单,不就是trycatch嘛,但是往往BUG更容易出现在一些简单的容易忽略的地方。大多数成熟的开发团队对于如何进行异常处理都有一套规EffectiveC读书笔记条款16在operator中对所有数据成员赋值背景如果没写赋值运算符的话,编译器会提供默认的,但这个默认的有时效果不好(参见条款11)。所以能否有个两全其美的办法,让编译器生成一个缺省的赋值运算符,然后再有选择地重写不喜欢的部