SpringCloud升级之路2020。0。x版38。实现自定义WebClient
本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent 实现 WeClient 的 NamedContextFactory
我们要实现的是不同微服务自动配置装载不同的 WebClient Bean,这样就可以通过 NamedContextFactory 实现。我们先来编写下实现这个 NamedContextFactory 整个的加载流程的代码,其结构图如下所示:
spring.factories # AutoConfiguration org.springframework.boot.autoconfigure.EnableAutoConfiguration= com.github.jojotech.spring.cloud.webflux.auto.WebClientAutoConfiguration
在 spring.factories 定义了自动装载的自动配置类 WebClientAutoConfiguration
WebClientAutoConfiguration @Import(WebClientConfiguration.class) @Configuration(proxyBeanMethods = false) public class WebClientAutoConfiguration { }
WebClientAutoConfiguration 这个自动配置类 Import 了 WebClientConfiguration
WebClientConfiguration @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(WebClientConfigurationProperties.class) public class WebClientConfiguration { @Bean public WebClientNamedContextFactory getWebClientNamedContextFactory() { return new WebClientNamedContextFactory(); } }
WebClientConfiguration 中创建了 WebClientNamedContextFactory 这个 NamedContextFactory 的 Bean。在这个 NamedContextFactory 中,定义了默认配置 WebClientDefaultConfiguration。在这个默认配置中,主要是给每个微服务都定义了一个 WebClient 定义 WebClient 的配置类
我们编写下上一节定义的配置,包括: 微服务名称 微服务地址,服务地址,不填写则为 http://微服务名称 连接超时,使用 Duration,这样我们可以用更直观的配置了,例如 5ms,6s,7m 等等 响应超时,使用 Duration,这样我们可以用更直观的配置了,例如 5ms,6s,7m 等等 可以重试的路径,默认只对 GET 方法重试,通过这个配置增加针对某些非 GET 方法的路径的重试;同时,这些路径可以使用 * 等路径匹配符,即 Spring 中的 AntPathMatcher 进行路径匹配多个路径。例如 /query/order/**
WebClientConfigurationProperties @Data @NoArgsConstructor @ConfigurationProperties(prefix = "webclient") public class WebClientConfigurationProperties { private Map configs; @Data @NoArgsConstructor public static class WebClientProperties { private static AntPathMatcher antPathMatcher = new AntPathMatcher(); private Cache retryablePathsMatchResult = Caffeine.newBuilder().build(); /** * 服务地址,不填写则为 http://serviceName */ private String baseUrl; /** * 微服务名称,不填写就是 configs 这个 map 的 key */ private String serviceName; /** * 可以重试的路径,默认只对 GET 方法重试,通过这个配置增加针对某些非 GET 方法的路径的重试 */ private List retryablePaths; /** * 连接超时 */ private Duration connectTimeout = Duration.ofMillis(500); /** * 响应超时 */ private Duration responseTimeout = Duration.ofSeconds(8); /** * 是否匹配 * @param path * @return */ public boolean retryablePathsMatch(String path) { if (CollectionUtils.isEmpty(retryablePaths)) { return false; } return retryablePathsMatchResult.get(path, k -> { return retryablePaths.stream().anyMatch(pattern -> antPathMatcher.match(pattern, path)); }); } } } 粘合 WebClient 与 resilience4j
接下来粘合 WebClient 与 resilience4j 实现断路器以及重试逻辑,WebClient 基于 project-reactor 实现,resilience4j 官方提供了与 project-reactor 的粘合库: io.github.resilience4j resilience4j-reactor
参考官方文档,我们可以像下面这样给普通的 WebClient 增加相关组件:
增加重试器 : //由于还是在前面弄好的 spring-cloud 环境下,所以还是可以这样获取配置对应的 retry Retry retry; try { retry = retryRegistry.retry(name, name); } catch (ConfigurationNotFoundException e) { retry = retryRegistry.retry(name); } Retry finalRetry = retry; WebClient.builder().filter((clientRequest, exchangeFunction) -> { return exchangeFunction.exchange(clientRequest) //核心就是加入 RetryOperator .transform(RetryOperator.of(finalRetry)); })
这个 RetryOperator 其实就是使用了 project-reactor 中的 retryWhen 方法实现了 resilience4j 的 retry 机制:
RetryOperator @Override public Publisher apply(Publisher publisher) { //对于 mono 的处理 if (publisher instanceof Mono) { Context context = new Context<>(retry.asyncContext()); Mono upstream = (Mono) publisher; return upstream.doOnNext(context::handleResult) .retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors))) .doOnSuccess(t -> context.onComplete()); } else if (publisher instanceof Flux) { //对于 flux 的处理 Context context = new Context<>(retry.asyncContext()); Flux upstream = (Flux) publisher; return upstream.doOnNext(context::handleResult) .retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors))) .doOnComplete(context::onComplete); } else { //不可能是 mono 或者 flux 以外的其他的 throw new IllegalPublisherException(publisher); } }
可以看出,其实主要填充了: doOnNext(context::handleResult): 在有响应之后调用,将响应结果传入 retry 的 Context,判断是否需要重试以及重试间隔是多久,并且抛出异常 RetryDueToResultException retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors))):捕捉异常 RetryDueToResultException,根据其中的间隔时间,返回 reactor 的重试间隔:Mono.delay(Duration.ofMillis(waitDurationMillis)) doOnComplete(context::onComplete):请求完成,没有异常之后,调用 retry 的 complete 进行清理
增加断路器 : //由于还是在前面弄好的 spring-cloud 环境下,所以还是可以这样获取配置对应的 circuitBreaker CircuitBreaker circuitBreaker; try { circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, finalServiceName); } catch (ConfigurationNotFoundException e) { circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId); } CircuitBreaker finalCircuitBreaker = circuitBreaker; WebClient.builder().filter((clientRequest, exchangeFunction) -> { return exchangeFunction.exchange(clientRequest) //核心就是加入 CircuitBreakerOperator .transform(CircuitBreakerOperator.of(finalCircuitBreaker)); })
类似的,CircuitBreakerOperator 其实也是粘合断路器与 reactor 的 publisher 中的一些 stage 方法,将结果的成功或者失败记录入断路器,这里需要注意,可能有的链路能走到 onNext,可能有的链路能走到 onComplete,也有可能都走到,所以 这两个方法都要记录成功,并且保证只记录一次 :
CircuitBreakerSubscriber class CircuitBreakerSubscriber extends AbstractSubscriber { private final CircuitBreaker circuitBreaker; private final long start; private final boolean singleProducer; private final AtomicBoolean successSignaled = new AtomicBoolean(false); private final AtomicBoolean eventWasEmitted = new AtomicBoolean(false); protected CircuitBreakerSubscriber(CircuitBreaker circuitBreaker, CoreSubscriber<? super T> downstreamSubscriber, boolean singleProducer) { super(downstreamSubscriber); this.circuitBreaker = requireNonNull(circuitBreaker); this.singleProducer = singleProducer; this.start = circuitBreaker.getCurrentTimestamp(); } @Override protected void hookOnNext(T value) { if (!isDisposed()) { //正常完成时,断路器也标记成功,因为可能会触发多次(因为 onComplete 也会记录),所以需要 successSignaled 标记只记录一次 if (singleProducer && successSignaled.compareAndSet(false, true)) { circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), value); } //标记事件已经发出,就是已经执行完 WebClient 的请求,后面判断取消的时候会用到 eventWasEmitted.set(true); downstreamSubscriber.onNext(value); } } @Override protected void hookOnComplete() { //正常完成时,断路器也标记成功,因为可能会触发多次(因为 onNext 也会记录),所以需要 successSignaled 标记只记录一次 if (successSignaled.compareAndSet(false, true)) { circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit()); } downstreamSubscriber.onComplete(); } @Override public void hookOnCancel() { if (!successSignaled.get()) { //如果事件已经发出,那么也记录成功 if (eventWasEmitted.get()) { circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit()); } else { //否则取消 circuitBreaker.releasePermission(); } } } @Override protected void hookOnError(Throwable e) { //记录失败 circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), e); downstreamSubscriber.onError(e); } }
我们会使用这个库进行粘合,但是不会直接使用上面的代码,因为考虑到: 需要在重试以及断路中加一些日志,便于日后的优化 需要定义重试的 Exception,并且与断路器相结合,将非 2xx 的响应码也封装成特定的异常 需要在断路器相关的 Operator 中增加类似于 FeignClient 中的负载均衡的数据更新,使得负载均衡更加智能
在下面一节我们会详细说明我们是如何实现的有断路器以及重试逻辑和负载均衡数据更新的 WebClient。
迅雷没有做错什么,只是用户不需要了1hr说起迅雷,每个人都会想起那标志性的蜂鸟图案和下载成功的通知铃声。叮。每当这个声音响起时,总能让在场无数老司机展露出各种意义不明的微笑。欢乐时光,就要开始了。但现在,再谈起迅雷
国产生态破冰,芯动科技传来好消息,完成国产操作系统适配在信息技术体系的生态大环境,曾经也是摆在我国面前的一座高峰,曾经也是我国没有取得突破的高端领域,但是,华为事件犹如平地惊雷,让我国渐渐意识到高端科技和独立自主的重要性。因此,许许多
vivoX80系列背面外观曝光蔡司加持矩阵相机模组熟悉vivo的用户都知道,vivo旗下的X系列一直定位全能影像旗舰,此前的X系列多款机型均获得了广泛的好评。而在不久前,vivo新一代影像旗舰vivoX80系列也开始得到外界曝光,
本是同根生,何不用华为?在中国智能汽车历史上,3月23日或许是一个关键性的节点。因为在这一天,目前中国最大的本土智能车商比亚迪,高调宣布与人工智能计算制造商英伟达在智能驾驶技术方面达成合作。依据比亚迪公布
3500左右,有哪些高性价比手机推荐?3500左右推荐以下机型!我们买手机无非是看性能(芯片),拍照(摄像头),续航(电池容量和快充)。荣耀30搭载麒麟9855gsoc芯片,性能介于麒麟820和980之间,能打挖孔屏,
2000元手机6万多人没砍下来拼多多的回应?在3月17号的时候游戏主播超级小桀在直播间里提了一句我们来试一下拼多多的砍一刀活动,说起砍一刀活动前段时间刘律师的事大家也知道吧?其实也不一定有6万人帮砍可两三千总有吧?况且本人也
年过四十还有翻身的机会吗?看到题目,思绪万千。我47了,同样在奋斗的路上一切才刚刚开始人到中年,确实多了很多羁绊,上有老下有小,沉重的负担让我们步履维艰。不过,正像许巍在歌中唱的没有什么能够阻挡,我对自由的
现在网络上充斥着大量的短视频创业经验分享,有几个是真的?百分之九十都是假的,他们只是为了吸引你去看。有经验的人不会分享给大家的,他们分享给大家的只是一些基本的经验。要想做好短视频就得自己专心的去做,做一些真实的视频,慢慢的就会有经验了。
编写高性能Java程序的技术汇编性能并不总是我们需要首要考虑的因素,但当我们需要处理的数据量较大,或者对象计算需要消耗较多资源时,性能问题就会自然浮上来,需要我们花些心思进行调优。本文搜集整理了40多条编写高性能
普冉股份新一代摄像头模组EEPROM产品应用于海外手机头部厂商旗舰机型普冉股份新一代摄像头模组EEPROM产品应用于海外手机头部厂商旗舰机型财联社3月20日电,普冉股份公告,公司NORFlash全系列产品研发完成并成为量产交付主力,晶圆良率达到95以
苹果等退出俄市场,中国手机乘胜追击,出货量暴增成大赢家俄乌战争正在打得如火如荼,这给许多行业带来了巨大的变动和调整,紧跟着各种的制裁给到俄罗斯,这其中不乏诸如苹果三星等手机企业。各种因素机缘巧合之下,却推动了中国手机在俄罗斯的销量暴增
滴滴退市Uber巨亏Lyft暴跌,网约车车载媒体还好吗?车载广告或成为网约车的救命稻草。网约车凭借高效便捷舒适成为大众喜爱的出行交通之一,并且快速异军突起,成功在市场中取得一席之地。滴滴退市Uber巨亏Lyft暴跌滴滴在去年远赴美国上市
关于光速问题光速问题在物理学中具有特殊的地位。光速不变是A爱因斯坦狭义相对论的基本假设之一。根据爱因斯坦的狭义相对论,光速在真空中速度恒为c。c为宇宙中的极限速度,没有任何物体或信息运动的速度
节能风电拟收购腾煌公司100股权北京商报讯(记者董亮丁宁)5月13日晚间,节能风电(601016)发布公告称,拟通过支付现金和承债的方式收购巨鹿县腾煌新能源科技有限公司(以下简称腾煌公司)100股权,股权收购价格
Docker从入门到精通之Dockerfile详解Dockerfile详解Dockerfile是一个构建镜像的文本文件,其中包含用户可以在命令行上调用组装镜像的所有命令,使用dockerbuild构建格式CommentINSTRU
SE否认被索尼收购市场猜测被传收购背后或可见多种合作可能本报记者陈溢波吴可仲北京报道两家颇具历史渊源的日本游戏厂商,近期出现合作传闻。有报道称,索尼正在收购日本游戏厂商史克威尔艾尼克斯(SQUAREENIX,以下简称SE)。对此,中国经
上海京东运输车被拦截后续!北京超市被抢空,拿着手机等发布会昨日,上海2271869,数据有所反弹!近日,有网友反映京东运输车辆在上海一个道口被拦截,不让走,引发广泛关注。昨天有关方作出回应,系工作人员表达不准确引起误会。16家快递分拨处理
让更多机器人走进餐厅,擎朗智能坚持以稳打市场REAL100记者佘晓晨编辑疫情肆虐的时代,以技术为基底的无接触配送成为常态。和两年前相比,商用机器人市场并没有发生结构性的变化,但客户的需求更为多元。基于机器人的移动技术,有商家希望实现营销推
JavaScript任务池线程池在多线程语言中,我们通常不会随意的在需要启动线程的时候去启动,而是会选择创建一个线程池。所谓线程池,本意其实就是(不止这些作用,其余作用可以自行查阅)节省操作系统资源限制最大
极狐阿尔法S全新HI版上市懂车更懂中国车5月7日,极狐阿尔法S全新HI版终于上市了,新车带来了三个闪亮头衔全球率先搭载华为智能汽车全栈解决方案的量产车全球率先支持城市道路高阶智能驾驶的量产车全球率先搭载华为智能座舱鸿蒙车
新能源车充电桩的扩容新能源车作为十四五规划之一的重要内容势不可挡,优点枚不胜举,比如省油省钱节能环保。可是车一旦买了,充电成了一个比较现实的问题,许多小区即使业主有固定的停车位,由于电容量不足,依旧无
Prometheus技术白皮书整理(六)下面的一些功能是我们即将要做的事情。如果你想查看整个计划和当前工作的完整概述,请查看github上Prometheus项目的issue,如Prometheus服务新的存储引擎现在一