专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

一篇文章玩转RPC通信原理,并使用Netty实现一个PRC(

  1。什么是RPC
  RPC一般指远程过程调用。RPC是远程过程调用(RemoteProcedureCall)的缩写形式。首先看下服务的演变过程:单一应用架构MVC三层架构PRC分布式服务弹性计算架构
  接口请求也在慢慢演变:TCPIP报文协议RMI(仅JAVA可用)WebServiceHTTPGPRC(Thrift,Dubbo)SpringRestful(路径风格)
  总体而言就是随着服务的增多,也伴随着服务之间的调用频繁和繁琐,这就有了PRC这代名词。
  PRC普通应用在分布式架构中,先看下分布式服务派系阿里系:dubbozookeepernginxspring生态:cloudeurekagateway
  RPC的核心职能,以dubbo图解为例
  这个机制现在用的很广泛了,例如cloud中的注册中心和配置中心。大概了解一下理论后,接下来我们用代码来实操,以便更深入的认识PRC。2。Netty实现一个RPC2。1原理概述客户端1。通过bean的初始化回调判断是否需要注入动态代理2。在动态代理回调类中使用Netty调用远程服务,并发送约定协议的消息3。使用回调机制返回服务端响应,并返回原始类服务端1。在bean的回调判断是否为发布的服务,是的话保存在公共map中,初始化时启动Rpc服务2。调用服务解析消息后,通过请求的service获取指定的service,通过反射调用,并将结果返回2。2pom。xml依赖
  基于springboot2。5。6版本,额外引入lombok和fastjsonnetty依赖dependencygroupIdio。nettygroupIdnettyallartifactIdversion4。1。42。Finalversiondependency2。3apijar包
  自定义注解,api目录为待发布的API接口,protocol为公用的协议和工具包
  2。4。客户端架构2。4。1rpc目录下为公用代码,可以单独抽离的
  2。4。2Controller代码注意这两个声明,并没有加Autowired或ResourceRpcReferenceHellServicehellService;RpcReferenceOrderServiceorderService;GetMapping(hello)publicStringhello(RequestParamStringorderId){returnorderService。getOrder(orderId);}GetMapping(add)publicintadd(RequestParamIntegera,RequestParamIntegerb){returnhellService。add(a,b);}
  PS说明:上面的两个声明没有加Autowired或Resource,所以spring容器在注入的时候不会处理这里两个,本文使用的是反射注入。如果想交由spring处理可以参考mybatis第九话手写实现一个简单的mybatis版本中的Mapper接口注入原理2。4。3核心动态代理处理类RpcBeanPostProcessor实现环境配置回调EnvironmentAware该类为初始化类之后的回调还没到注入阶段因此在这里接收环境的回调,读取RPC的配置传递到代理类中Environmentenvironment;注册之前设置坏境变量OverridepublicvoidsetEnvironment(Environmentenvironment){this。environmentenvironment;}实现了InstantiationAwareBeanPostProcessor接口,重写postProcessAfterInitialization方法可以在bean初始化之前后返回继承类或者代理类,aop就是典型的例子OverridepublicObjectpostProcessAfterInitialization(Objectbean,StringbeanName)throwsBeansException{Classlt;?clazzbean。getClass();遍历所有的声明for(Fieldfield:clazz。getDeclaredFields()){如果包含这个注解就创建代理类,并用反射注入if(field。isAnnotationPresent(RpcReference。class)){Objectinstance;StringbeanClassNamefield。getType()。getName();try{单例缓存if(cacheProxyMap。containsKey(beanClassName)){instancecacheProxyMap。get(beanClassName);}else{根据不同的服务名称参数传递不同的rpc调用地址RpcReferenceannotationfield。getAnnotation(RpcReference。class);生成动态代理instanceProxy。newProxyInstance(field。getType()。getClassLoader(),newClass〔〕{field。getType()},可以配置注解参数以获取不同的RPC连接配置newProxyHandler(bean,beanClassName,this。environment。getProperty(annotation。name()。rpcHost),Integer。valueOf(this。environment。getProperty(annotation。name()。rpcPort))));}log。info(createproxybean:{},beanClassName);反射注入field。setAccessible(true);field。set(bean,instance);cacheProxyMap。put(field。getType()。getName(),instance);}catch(IllegalAccessExceptione){log。error(createbeanerror,beanClassName{},beanClassName);}}}returnbean;}2。4。4动态代理调用类ProxyHandlerinvoke方法OverridepublicObjectinvoke(Objectproxy,Methodmethod,Object〔〕args)throwsThrowable{组装协议RpcRequestrequestnewRpcRequest();设置一个唯一ID,用来回调request。setReqId(UUID。randomUUID()。toString());request。setService(this。service);request。setMethod(method。getName());request。setParamterType(method。getParameterTypes());request。setArgs(args);发起服务调用NettyClientnettyClientnewNettyClient();nettyClient。start(rpcHost,rpcPort,newMyRpcClientHandler());返回结果returnnettyClient。sendRequest(request);}2。4。5NettyClient公共类该类不是单例的,但是保存通道和回调的Map是单例的publicChannelchannel;publicvoidstart(Stringhost,intport,RpcHandlerrpcHandler){StringmapKeyhost:port;if(NettyConstans。clientMap。containsKey(mapKey)){this。channelNettyConstans。clientMap。get(mapKey);return;}NioEventLoopGroupb1newNioEventLoopGroup();BootstrapbsnewBootstrap()。group(b1)。channel(NioSocketChannel。class)。handler(newChannelInitializerChannel(){OverrideprotectedvoidinitChannel(Channelchannel)throwsException{ChannelPipelinepipelinechannel。pipeline();这里偷懒就直接用string的编解码了pipeline。addLast(newStringEncoder());pipeline。addLast(newStringDecoder());pipeline。addLast(rpcHandler);}});try{客户端连接服务端ChannelFuturefuturebs。connect(host,port)。sync();future。addListener(listen{if(listen。isSuccess()){log。info(connectrpcservicesuccess,{}:{},host,port);}});channelfuture。channel();保存为单例NettyConstans。clientMap。put(mapKey,channel);}catch(Exceptione){b1。shutdownGracefully();log。error(connectrpcserviceerror,{}:{},host,port);}}publicObjectsendRequest(RpcRequestrpcRequest)throwsException{自定义一个返回结果的回调保存到单例Map中RpcFutureRpcResponserpcFuturenewRpcFuture(newDefaultPromiseRpcResponse(newDefaultEventLoop()));NettyConstans。rpcFutureMap。put(rpcRequest。getReqId(),rpcFuture);消息发送,编解码为string,所以发送的是stringchannel。writeAndFlush(JSONObject。toJSONString(rpcRequest));实际上为阻塞等待回调由接收消息那里回调其实还有一个熔断线程处理这些超时或者一直没有回调的returnrpcFuture。getPromise()。get()。getContent();}2。4。6客户端接收消息handlerMyRpcClientHandler协议RpcResponseOverrideprotectedvoidchannelRead0(ChannelHandlerContextctx,Stringmsg)throwsException{log。info(RpcResponsereceivemsg:{},msg);RpcResponseresponseJSONObject。parseObject(msg,RpcResponse。class);未知的消息直接忽略if(responsenull!NettyConstans。rpcFutureMap。containsKey(response。getReqId()))return;给指定的ReqId回调NettyConstans。rpcFutureMap。get(response。getReqId())。getPromise()。setSuccess(response);NettyConstans。rpcFutureMap。remove(response。getReqId());}OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{log。error(连接出现异常,重置连接:{},ctx。channel()。remoteAddress());异常重连服务端重启之类的NettyConstans。clientMap。remove(ctx。channel()。remoteAddress()。toString());}
  客户端的代码基本上贴完了,比较复杂,服务端会比较简单,接下来看看服务端的代码2。5服务端架构2。5。2。1bean的初始化回调RpcBeanPostProcessorstaticMapString,ObjectbeanMapnewConcurrentHashMap();OverridepublicObjectpostProcessAfterInitialization(Objectbean,StringbeanName)throwsBeansException{Classlt;?clazzbean。getClass();只要包含该注解的就报保存到Map中if(clazz。isAnnotationPresent(RpcService。class)){存的是服务发布的接口类名称beanMap。put(clazz。getInterfaces()〔0〕。getName(),bean);log。info(registerrpcservice:{},clazz。getInterfaces()〔0〕。getName());}returnbean;}
  这里没有往注册中心上发布了,直接以本地Map的形式保存的。主要是为弄懂原理2。5。2NettyService初始化使用springboot的启动回调开始一个RPC服务Overridepublicvoidrun(String。。。args)throwsException{启动代码就不贴了编解码为StringNettyService。start(port,newMyRpcHandler());}自定义handler类MyRpcHandler协议RpcRequestOverrideprotectedvoidchannelRead0(ChannelHandlerContextctx,Stringmsg)throwsException{log。info(RpcRequestreceivemsg:{},msg);RpcRequestrequestJSONObject。parseObject(msg,RpcRequest。class);if(requestnullrequest。getReqId()null)return;Stringservicerequest。getService();ObjectbeanRpcBeanPostProcessor。beanMap。get(service);根据方法名称和参数类型获取类中的方法Methodmethodbean。getClass()。getMethod(request。getMethod(),request。getParamterType());Objectresultmethod。invoke(bean,request。getArgs());响应协议RpcResponseresponsenewRpcResponse();response。setReqId(request。getReqId());response。setContent(result);写出和发送同理ctx。writeAndFlush(JSONObject。toJSONString(response));}3。RPC测试
  分别启动客户端和服务端3。1客户端调用控制台日志createproxybean:com。exmaple。demo。api。HellServicecreateproxybean:com。exmaple。demo。api。OrderService执行http:127。0。0。1:8080hello?orderId1234567connectrpcservicesuccess,127。0。0。1:18080RpcResponsereceivemsg:{content:selectorderservicebyorderId:1234567,reqId:61a37ef56a974fe79ba9d8c3a955c8c0}3。2服务端调用日志startremoteservice:18080RpcRequestreceivemsg:{args:〔1234567〕,method:getOrder,paramterType:〔java。lang。String〕,reqId:61a37ef56a974fe79ba9d8c3a955c8c0,service:com。exmaple。demo。api。OrderService}第二次调用http:127。0。0。1:8080add?a4545b12日志RpcRequestreceivemsg:{args:〔4545,12〕,method:add,paramterType:〔int,int〕,reqId:4f312678b4634db9a861d8b4b9c9fc4a,service:com。exmaple。demo。api。HellService}4。总结4。1关于反射注入
  正常应该使用的是FactoryBean的方式注入的,这里只是为了搞懂原理,忽略!4。2关于Rpc服务地址
  正常的RPC服务,会先从注册中心获取这个服务发布的地址,也就是我们配置中的地址实际上是注册中心的地址建立连接后,应该会保持心跳,第二次调用不再重新建立连接4。3关于阻塞异步回调
  实际上还有熔断机制,应该处理掉一直等待的回调

今天才发现,微信长按2秒钟,竟隐藏6个小功能,太实用了微信对于每个人来说,基本上都是日常所需,然而有很多小功能都隐藏在细节里,可能很多人都不知道。今天才发现,微信长按2秒钟,还隐藏了6个小功能,太实用了。01。快捷启动菜单很多人使用微一起聆听风声雨声头条创作挑战赛乡间的小路弯弯曲曲伸向远方小溪环绕绿树红墙朵朵白云蓝蓝的天溪水轻轻的流淌没有江河翻滚的波浪静悄悄地流向远方就这样流淌着岁月如梭就这样流淌着人生的歌白云蓝天还是那样不会没有人比她更心痛照片中的他们笑的很开心,此时的她从来没有想到自己的宝贝儿子会用这种惨烈的方式与自己分别,这成为了她一生中的痛这个春节对于我们大部分人来说都是快乐的,但对于胡鑫宇的母亲和家人却是一种日益努力而后风生水起,众生皆苦不能认输今日的文案送哒水再浑浊,只要长久沉淀,依然会分外清澄人再愚钝,只要足够努力,一样能改写命运。上天不会亏待努力的人,也不会同情假勤奋的人,你有多努力,时光它知道。今天敢于做别人不敢做初春游山九灵山记济南城东八九里,有山名为九灵山,位于大汉峪村村南。在爬王八盖子山的时候听人提起九灵山有座庙宇,特意前往观瞻。从旅游路开始出发,沿着汉峪中路往南一直走,有一座小广场,顺着护栏绕过去,相爱时再美好,也抵不过背叛写完这封信,我们就彻底结束吧亲爱的周远我是余佳。不知道你看到这封信会不会惊讶,但是我想告诉你,到今天,正好是我喜欢你的两年零三个月,在这段时间里,我习惯了喜欢你的生活方式,习惯了把喜欢你当作生活的调味剂,和你冬日暖阳下的青白江美好时光这几日暖阳笼罩的青白江令人格外愉悦趁着晴好的天气不少市民也走出家门拥抱这难得的好光景凤凰湖内绿树成林小桥流水市民漫步其中感受着一山一色一花一草带来的无尽诗意舟行碧波上人在画中游小孩NBA再爆冷!争冠劲旅遭43分血洗,双星合砍57分,杜兰特证明之战北京时间2023年2月2日,在刚刚结束的一场NBA焦点大战中,坐镇主场的凯尔特人以13996击败篮网队,这样的结果,多少还是出乎大伙儿的意料。作为目前两支排名东部前4的球队,他们也广汽埃安hyperGT真有那么惊艳?定位中大型轿跑,零百加速4秒级说起国产新能源汽车,那广汽埃安的名号绝对是响当当的,旗下推出的车型销量都很不错。而广汽埃安未来要推出的广汽埃安hyperGT车型也算是赚足了眼球。广汽埃安hyperGT的定位为中大江诗丹顿有哪些款式值得入手?江诗丹顿手表有哪些不错的款式推荐吗江诗丹顿作为从未中断生产,历史最悠久的手表品牌,在国内可以说是深得人心,更是老一辈商界大佬最钟爱的手表品牌之一,其LOGO的马耳他十字标志,更是给首根超长单晶硅棒即将出炉,龙头股价大涨,硅料价格反弹明显,机构看好超跌反弹机会(附股)数据是个宝数据宝炒股少烦恼硅料价格显著反弹,相关上市公司业绩大增。港股协鑫科技今日大涨超11,现报2。41港元,总市值653亿港元。消息面上,协鑫科技子公司中能硅业彩虹工程CCZ(
宝宝什么时候会咯咯笑,几个月才会开始出现呢?宝宝一般多大开始哈哈大笑?一般宝宝在3个月的时候就会开始笑出声了,之前的笑都是微笑。宝宝最开始笑的时候是在月子里,但那时候的笑是无意识的肌肉活动。慢慢地,出了月子后,宝宝笑得越来越甄嬛离宫三年,为何只跟皇上见了一面就能复宠?表面上看上去,皇帝来到了甘露寺,皇帝只见了甄嬛一面,就不能忘怀,同她旧情复燃。可仔细分析,甄嬛走的每一步路都步步惊心。甄嬛的复宠,不仅有对人心的谋算,还有利益的交换,更有后宫格局的红警2的无限钱模式到底要怎么玩,怎么我老是输?这款游戏,主要你购买,兵种,搭配兵种,各个兵种都是相克的,你看他的出兵顺序,用克制的方法,很容易就把他击杀了。无限钱的模式,那就不用担心没有金币了,买克制对面的最贵的兵种,就完事了拳皇系列游戏中,哪个游戏人物的拳脚最具有观赏性?拳皇系列中的很多姿势我们都觉得很帅,其实大部分都是有原型的。目前位置,我们已知拳皇借鉴的漫画有刃牙阿基拉EVAJOJO奇妙冒险北斗神拳,而其中刃牙是被借鉴最多的。而这些被借鉴的技能中国足球为什么越整顿越差?因为都是一伙人都没整到点子上固为病根在足协!不一锅端了,重新建立新的组织和机制,特别是行风的重塑,消除贪腐和潜规则,还足球于大众,所谓的整顿,都是贼喊捉贼,瞎子戴眼镜,脱裤子放屁!什么牌子的手机信号好?影响手机网速,信号的第一个因素,就是芯片,芯片又主要看基带,用了好的基带,手机网速上限就高,多天线MIMO设计也能让信号水平更高,市面上的芯片,主要由高通华为联发科提供,顶级的芯片老人自己买了个盒子助听器,听起来声音很响,对老人身体有影响吗?长时间的使用,对耳朵可能会有一定的影响,建议把音量调小一点,用一段时间之后休息一会儿,可能会好一点,建议戴助听器最好要经过专业验配。你好,盒式的助听器只会让听力越来越差,建议到专业什么类型的衣服上档次?墨羽觉得,每个人有每个人的生活方式,别人喜欢的穿搭风格不一定你也喜欢,你喜欢的风格不一定适合别人,再者说,从来没有一个定义说穿正装比穿休闲装的上档次,所以一切以你觉得舒适喜欢为主。婴儿可以吃红薯吗?红薯又名山芋地瓜甘薯等,富含蛋白质淀粉果胶纤维素氨基酸维生素及多种矿物质,有长寿食品之誉。小时候不懂红薯还是营养比较丰富的,但从小就非常喜欢吃红薯。小时候妈妈有时煮一大锅红薯,我都拔丝土豆怎么做?谢谢邀请!在我们家乡,也特别喜欢做拔丝系列的菜品。通常我们都是做拔丝地瓜,或者拔丝芋头,拔丝土豆算是做的次数比较少。其实拔丝系列的菜品不属于我们闽菜,它属于鲁菜的一道特色菜品,如果老胡说希望薇娅痛改前非健康再生,你答应吗?不答应,首先带货主播多的要命,不差她一个,今天她复出了,其他人就会有样学样,各个偷税漏税,抱着侥幸心理,查不到,钱到自己袋子,查到了,补个税道个歉继续捞金,让普通纳税人情何以堪,国
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网