JDK9响应式流使用详解
上文中咱们简单提到了JDK9中Flow接口中的静态内部类实现了响应式流的JAVA API,并且提供了一个一个Publisher的实现类SubmissionPublisher。本文将先梳理一下接口中具体的处理流程,然后再以几个调用者的例子来帮助大家理解。 JDK9中的实现
再放上一下上文中的响应式流的交互流程: 订阅者向发布者发送订阅请求。 发布者根据订阅请求生成令牌发送给订阅者。 订阅者根据令牌向发布者发送请求N个数据。 发送者根据订阅者的请求数量返回M(M<=N)个数据 重复3,4 数据发送完毕后由发布者发送给订阅者结束信号
该流程的角度是以接口调用的交互来说的,而考虑实际的coding工作中,我们的调用流程其实为: 创建发布者 创建订阅者 订阅令牌交互 发送信息
接下来我们按照这个流程来梳理一下代码细节。 创建发布者
对于实现响应流的最开始的步骤,便是创建一个发布者。之前提到在JDK9中提供了一个发布者的简单实现SubmissionPublisher。SubmissionPublisher继承自Flow.Publisher,他有三种构造函数: public SubmissionPublisher() { this(ASYNC_POOL, Flow.defaultBufferSize(), null); } public SubmissionPublisher(Executor executor, int maxBufferCapacity) { this(executor, maxBufferCapacity, null); } public SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler)
SubmissionPublisher将使用Executor作为"线程池"向订阅者发送信息。如果需要需要设置线程池的话可以自己传入,否则的话再无参的构造函数中将默认使用ForkJoinPool类的commonPool()方法获取,即无餐构造方法中的ASYNC_POOL静态变量。
SubmissionPublisher会为 每一个订阅者 单独的建立一个缓冲空间,其大小由入参maxBufferCapacity决定。默认情况下直接使用Flow.defaultBufferSize()来设置,默认为256。如果缓冲区满了之后会根据发送信息时候的策略确定是阻塞等待还是抛弃数据。
SubmissionPublisher会在订阅者发生异常的时候(onNext处理中),会调用最后一个参数handler方法,然后才会取消订阅。默认的时候为null,也就是不会处理异常。
最简单的创建SubmissionPublisher的方法就是直接使用无参构造方法: SubmissionPublisher publisher = new SubmissionPublisher<>();
上文书说到,因为SubmissionPublisher实现了AutoCloseable接口,所以可以用try来进行资源回收可以省略close()的调用: try (SubmissionPublisher publisher = new SubmissionPublisher<>()){ }
但是也可以手动的调用close()方法来显示的关闭发布者,关闭后再发送数据就会抛出异常: if (complete) throw new IllegalStateException("Closed");创建订阅者
上文中咱们没有手动创建订阅者,而是直接调用SubmissionPublisher中的consume方法使用其内部的订阅者来消费消息。在本节可以实现接口Flow.Subscriber创建一个SimpleSubscriber类: public class SimpleSubscriber implements Flow.Subscriber { private Flow.Subscription subscription; /** * 订阅者名称 */ private String name; /** * 定义最大消费数量 */ private final long maxCount; /** * 计数器 */ private long counter; public SimpleSubscriber(String name, long maxCount) { this.name = name; this.maxCount = maxCount <= 0 ? 1 : maxCount; } @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; System.out.printf("订阅者:%s,最大消费数据: %d。%n", name, maxCount); // 实际上是等于消费全部数据 subscription.request(maxCount); } @Override public void onNext(Integer item) { counter++; System.out.printf("订阅者:%s 接收到数据:%d.%n", name, item); if (counter >= maxCount) { System.out.printf("准备取消订阅者: %s。已处理数据个数:%d。%n", name, counter); // 处理完毕,取消订阅 subscription.cancel(); } } @Override public void onError(Throwable t) { System.out.printf("订阅者: %s,出现异常: %s。%n", name, t.getMessage()); } @Override public void onComplete() { System.out.printf("订阅者: %s 处理完成。%n", name); } }
SimpleSubscriber是一个简单订阅者类,其逻辑是根据构造参数可以定义其名称name与最大处理数据值maxCount,最少处理一个数据。
当发布者进行一个订阅的时候会生成一个令牌Subscription作为参数调用onSubscribe方法。在订阅者需要捕获该令牌作为后续与发布者交互的纽带。一般来说在onSubscribe中至少调用一次request且参数需要>0,否则发布者将无法向订阅者发送任何信息,这也是为什么maxCount需要大于0。
当发布者开始发送数据后,会异步的调用onNext方法并将数据传入。该类中使用了一个计数器对数据数量进行校验,当达到最大值的时候,则会通过令牌(subscription)异步通知发布者订阅结束,然后发送者再异步的调用发订阅者的onComplete方法,以处理完成流程。
其中的onError和onComplete方法只进行打印,这里就不再说了。
以上的这个订阅者可以看作是一个push模型的实现,因为当开始订阅时订阅者就约定了需要接受的数量,然后在后续的处理(onNext)中不再请求新数据。
我们可以用以下的代码创建一个名称为S1,消费2个元素的订阅者: SimpleSubscriber sub1 = new SimpleSubscriber("S1", 2);订阅令牌交互
当我们可以创建了发送者和订阅者之后,我们需要确认一下进行交互的顺序,由于响应流的处理就是对于事件的处理,所以事件的顺序十分重要,具体顺序如下: 我们创建一个发布者publisher一个订阅者subscriber 订阅者subscriber通过调用发布者的subscribe()方法进行信息订阅。如果订阅成功,则发布者将生成一个令牌(Subscription)并作为入参调用订阅者的订阅事件方法onSubscribe()。如果调用异常则会直接调用订阅者的onError错误处理方法,并抛出IllegalStateException异常然后结束订阅。 在onSubscribe()中,订阅者需要通过调用令牌(Subscription)的请求方法request(long)来异步的向发布者请求数据。 当发布者有数据可以发布的时候,则会异步的调用订阅者的onNext()方法,直到所有消息的总数已经满足了订阅者调用request的数据请求上限。所以当订阅者请求订阅的消息数为Long.MAX_VALUE时,实际上是消费所有数据,即push模式。如果发布者没有数据要发布了,则可以会调用发布者自己的close()方法并异步的调用所有订阅者的onComplete()方法来通知订阅结束。 发布者可以随时向发布者请求更多的元素请求(一般在onNext里),而不用等到之前的处理完毕,一般是与之前的数据数量进行累加。 放发布者遇到异常的时候会调用订阅者的onError()方法。
上面的描述中是只使用的一个订阅者来进行描述的,后面的例子中将说明发布者可以拥有多个订阅者(甚至0个订阅者)。 发送信息
当发布者需要推送消息的时候会调用submit方法或者offer方法,上文中我们提到submit实际上是offer的一种简单实现,本节咱们自己比较一下。
首先他们的方法签名为: int offer(T item, long timeout, TimeUnit unit, BiPredicate,? super T> onDrop) int offer(T item, BiPredicate,? super T> onDrop) int submit(T item)
而submit 和 offer的直接方法为: public int submit(T item) { return doOffer(item, Long.MAX_VALUE, null); } public int offer(T item, BiPredicate, ? super T> onDrop) { return doOffer(item, 0L, onDrop);
可以看到他们的底层调用的都是 doOffer 方法,而doOffer的方法签名为: private int doOffer(T item, long nanos, BiPredicate, ? super T> onDrop)
所以我们可以直接看doOffer()方法。doOffer()方法是可选阻塞时长的,而时长根据入参数nanos来决定。而onDrop()是一个删除判断器,如果调用BiPredicate的test()方法结果为true则会再次重试(根据令牌中的nextRetry属性与发布器中的retryOffer()方法组合判断,但是具体实现还没梳理明白);如果结果为flase则直接删除内容。doOffer()返回的结果为正负两种,正数的结果为发送了数据,但是订阅者还未消费的数据(估计值,因为是异步多线程的);如果为负数,则返回的是重拾次数。
所以,根据submit()的参数我们可以发现,submit会一直阻塞直到数据可以被消费(因为不会阻塞超时,所以不需要传入onDrop()方法)。而我们可以根据需要配置offer()选择器。如果必须要求数据都要被消费的话,那就可以直接选择submit(),如果要设置重试次数的话就可以选择使用offer() 异步调用的例子
下面看一个具体的程序例子,程序将以3秒为周期进行数据发布: public class PeriodicPublisher { public static final int WAIT_TIME = 2; public static final int SLEEP_TIME = 3; public static void main(String[] args) { SubmissionPublisher publisher = new SubmissionPublisher<>(); // 创建4订阅者 SimpleSubscriber subscriber1 = new SimpleSubscriber("S1", 2); SimpleSubscriber subscriber2 = new SimpleSubscriber("S2", 4); SimpleSubscriber subscriber3 = new SimpleSubscriber("S3", 6); SimpleSubscriber subscriber4 = new SimpleSubscriber("S4", 10); // 前三个订阅者直接进行订阅 publisher.subscribe(subscriber1); publisher.subscribe(subscriber2); publisher.subscribe(subscriber3); // 第四个方法延迟订阅 delaySubscribeWithWaitTime(publisher, subscriber4); // 开始发送消息 Thread pubThread = publish(publisher, 5); try { // 等待处理完成 pubThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } public static Thread publish(SubmissionPublisher publisher, int count) { Thread t = new Thread(() -> { IntStream.range(1,count) .forEach(item ->{ publisher.submit(item); sleep(item); }); publisher.close(); }); t.start(); return t; } private static void sleep(Integer item) { try { System.out.printf("推送数据:%d。休眠 3 秒。%n", item); TimeUnit.SECONDS.sleep(SLEEP_TIME); } catch (InterruptedException e) { e.printStackTrace(); } } private static void delaySubscribeWithWaitTime(SubmissionPublisher publisher, Flow.Subscriber sub) { new Thread(() -> { try { TimeUnit.SECONDS.sleep(WAIT_TIME); publisher.subscribe(sub); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
代码后是运行结果如下: 订阅者:S1,最大消费数据: 2。 推送数据:1。休眠 3 秒。 订阅者:S3,最大消费数据: 6。 订阅者:S2,最大消费数据: 4。 订阅者:S2 接收到数据:1. 订阅者:S3 接收到数据:1. 订阅者:S1 接收到数据:1. 订阅者:S4,最大消费数据: 10。 推送数据:2。休眠 3 秒。 订阅者:S2 接收到数据:2. 订阅者:S3 接收到数据:2. 订阅者:S1 接收到数据:2. 订阅者:S4 接收到数据:2. 准备取消订阅者: S1。已处理数据个数:2。 推送数据:3。休眠 3 秒。 订阅者:S4 接收到数据:3. 订阅者:S2 接收到数据:3. 订阅者:S3 接收到数据:3. 推送数据:4。休眠 3 秒。 订阅者:S4 接收到数据:4. 订阅者:S3 接收到数据:4. 订阅者:S2 接收到数据:4. 准备取消订阅者: S2。已处理数据个数:4。 推送数据:5。休眠 3 秒。 订阅者:S3 接收到数据:5. 订阅者:S4 接收到数据:5. 订阅者: S3 处理完成。 订阅者: S4 处理完成。
由于是异步执行,所以在"接收数据"部分的顺序可能不同。
我们分析一下程序的执行流程。 创建一个发布者实例 创建四个订阅者实例S1、S2、S3、S4,可以接收数据的数量分别为:2、4、6、10。 前三个订阅者立即订阅消息。 S4的订阅者单独创建一个线程等待WAIT_TIME秒(2秒)之后进行数据的订阅。 新建一个线程来以SLEEP_TIME秒(3秒)为间隔发布5个数据。 将publish线程join()住等待流程结束。
执行的日志满足上述流程而针对一些关键点为: S4在发送者推送数据"1"的时候还未订阅,所以S4没有接收到数据"1"。 当发送数据"2"的时候S1已经接收够了预期数据2个,所以取消了订阅。之后只剩下S2、S3、S4。 当发送数据"4"的时候S2已经接收够了预期数据4个,所以取消了订阅。之后只剩下S3、S4。 当发送数据"5"的时候只剩下S3、S4,当发送完毕后publisher调用close()方法,通知S3、S4数据处理完成。
需要注意的是,如果在最后submit完毕之后直接close()然后结束进行的话可能订阅者并不能执行完毕。但是由于在任意一次submit()之后都有一次3秒的等待,所以本程序是可以执行完毕的。 最后
本文中的例子是是简单的实现,可以通过调整订阅者中的request的参数,与在onNext中添加request调用来测试背压的效果,还可以将submit调整为offer并添加onDrop方法以观察抛弃信息时的流程。同时本文没有提供Processor的例子,各位也可以自行学习。
总结一下流程: 订阅者向发布者进行订阅,然后发布者向订阅者发送令牌。订阅者使用令牌请求消息,发送者根据请求消息的数量推送消息。订阅者可以随时异步追加需要的更多信息。
JDK9中在Flow接口中实现了Java API的4个接口,并提供了SubmissionPublisher作为Publisher接口的简单实现。
一个新手如何做抖音?新手入门三大坑很多人刚开始玩抖音,并没有受过专业的培训,只觉得抖音可以赚钱,就进来了,但是由于有好多细节不知道,导致走了很多弯路,今天我就给大家普及一下,做抖音的是三个大坑,你看你有没有中过。1
一键采集阿里巴巴商品评论图,1688阿里巴巴商品评论图片下载1688阿里巴巴上有着丰富的商品种类与数量,因此阿里巴巴上的商品信息也是十分丰富的,而其中就不乏大量客户对于商品的评论图片等图片信息。利用这些1688阿里巴巴商品评论图片,我们能够
有什么可以批量去除片头片尾的软件,一键处理追剧现在是大部人日常的放松方式,在忙碌了一天的后,宅在床上追着剧,想想简直就是人人间仙境,但是由于漫长的广告很多人都会选择下载后本地观看,现在是一个快节奏的时代,去除掉啰嗦的广告以
呼伦贝尔益丰祥泰比亚迪4S店尽享优惠本周秦最新报价比亚迪益丰祥泰店即日起到10月31日限时促销,限时特惠4。62,如此优惠的降幅,大家可千万不要错过,店铺地址内蒙古自治区呼伦贝尔市鄂温克族自治旗巴彦托海镇南工业园区利
宋MAX家用商务首选刚刚小编在车友圈看到好多朋友都在讨论宋MAX购车优惠0。4万元,这么诱人的降价幅度,想要购车的朋友们不考虑一下趁机拿下?比亚迪益丰祥泰店,活动时间截止到10月30日,机会难得,不容
呼伦贝尔益丰祥泰比亚迪4S店宋MAX优惠不要错过比亚迪宋MAX购车优惠0。4万元,这么诱人的降价幅度,想要购车的朋友们不考虑一下趁机拿下?比亚迪益丰祥泰店,活动时间截止到10月31日,机会难得,不容错过,让自己花更少的钱买到心仪
比亚迪新能源核心技术刀片电池日前,我们从外媒获悉,比亚迪已经通过当地经销商开始向挪威首批客户交付七座版唐(参数询价)EV车型。新车在挪威的起售价格为59。99万挪威克朗,约合人民币44。1万元,相比国内市场补
比亚迪唐欢迎您来试驾本周秦最新报价比亚迪益丰祥泰店即日起到10月30日限时促销,限时特惠4。62,如此优惠的降幅,大家可千万不要错过,店铺地址内蒙古自治区呼伦贝尔市鄂温克族自治旗巴彦托海镇南工业园区利
呼伦贝尔唐购车优惠1。81欢迎试乘试驾心心念念的唐,等了这么久终于有优惠了,小编打听到比亚迪王朝呼伦贝尔益丰祥泰店截止到10月31日购车优惠0。3万元,这么好的机会朋友们是不是也和小编一样迫不及待的想到店去感受一下呢促
比亚迪DM混动技术,新能源汽车领导者2003年开始研发,2008年正式上市的混合动力车型比亚迪F3DM,是中国品牌首款量产插电式混合动力汽车。这款具有里程碑式意义的车型奠定了比亚迪在中国新能源汽车市场中的开拓者和引领
呼伦贝尔益丰祥泰比亚迪4S店尽享优惠本周宋Pro最新报价比亚迪益丰祥泰店11。1日限时促销,降价10。78,如此优惠的降幅,大家可千万不要错过,店铺地址内蒙古自治区呼伦贝尔市鄂温克族自治旗巴彦托海镇南工业园区利丰汽车