保健励志美文体育育儿作文
投稿投诉
作文动态
热点娱乐
育儿情感
教程科技
体育养生
教案探索
美文旅游
财经日志
励志范文
论文时尚
保健游戏
护肤业界

如何设计一个短小精悍可拓展的RPC框架?(含实现代码)

  本文来自公众号读者王码农的投稿
  感谢王码农同学的技术分享
  简介
  如果大家对RPC有一些了解的话,或多或者都会听到过一些大名鼎鼎的RPC框架,比如Dobbo、gRPC。但是大部分人对于他们底层的实现原理其实不甚了解。
  有一种比较好的学习方式:就是如果你想要了解一个框架的原理,你可以尝试去写一个简易版的框架出来,就比如如果你想理解SpringIOC的思想,最好的方式就是自己实现一个小型的IOC容器,自己慢慢体会。
  所以本文尝试带领大家去设计一个小型的RPC框架,同时对于框架会保持一些拓展点。
  通过阅读本文,你可以收获:
  理解RPC框架最核心的理念
  学习在设计框架的时候,如何保持拓展性本文会依赖一些组件,他们是实现RPC框架必要的一些知识,文中会尽量降低这些知识带来的障碍。但是,最好期望读者有以下知识基础:
  Zookeeper基本入门
  Netty基本入门
  RPC框架应该长什么样子
  我们首先来看一下:一个RPC框架是什么东西?
  我们最直观的感觉就是:集成了RPC框架之后,通过配置一个注册中心的地址。一个应用(称为服务提供者)将某个接口(interface)暴露出去,另外一个应用(称为服务消费者)通过引用这个接口(interface),然后调用了一下,就很神奇的可以调用到另外一个应用的方法了
  给我们的感觉就好像调用了一个本地方法一样。即便两个应用不是在同一个JVM中甚至两个应用都不在同一台机器中。
  那他们是如何做到的呢?
  其实啊,当我们的服务消费者调用某个RPC接口的方法之后,它的底层会通过动态代理,然后经过网络调用,去到服务提供者的机器上,然后执行对应的方法。
  接着方法的结果通过网络传输返回到服务消费者那里,然后就可以拿到结果了。
  整个过程如下图:那么这个时候,可能有人会问了:服务消费者怎么知道服务提供者在哪台机器的哪个端口呢?
  这个时候,就需要注册中心登场了,具体来说是这样子的:
  服务提供者在启动的时候,将自己应用所在机器的信息提交到注册中心上面。
  服务消费者在启动的时候,将需要消费的接口所在机器的信息抓回来
  这样一来,服务消费者就有了一份服务提供者所在的机器列表了服务消费者拿到了服务提供者的机器列表就可以通过网络请求来发起请求了。
  网络客户端,我们应该采用什么呢?有几种选择:使用JDK原生BIO(也就是ServerSocket那一套)。阻塞式IO方法,无法支撑高并发。使用JDK原生NIO(Selector、SelectionKey那一套)。非阻塞式IO,可以支持高并发,但是自己实现复杂,需要处理各种网络问题。使用大名鼎鼎的NIO框架Netty,天然支持高并发,封装好,API易用。作为一个有追求的程序员,我们要求开发出来的框架要求支持高并发、又要求简单、还要快。当然是选择Netty来实现了,使用Netty的一些很基本的API就能满足我们的需求。
  网络协议定义
  当然了,既然我们要使用网络传输数据。我们首先要定义一套网络协议出来。
  你可能又要问了,啥叫网络协议?网络协议,通俗理解,意思就是说我们的客户端发送的数据应该长什么样子,服务端可以去解析出来知道要做什么事情。话不多说,上代码:
  假设我们现在服务提供者有两个类:com。study。rpc。test。producer。HelloService
  publicinterfaceHelloService{
  StringsayHello(TestBeantestBean);
  }
  com。study。rpc。test。producer。TestBean
  publicclassTestBean{
  privateStringname;
  privateIntegerage;
  publicTestBean(Stringname,Integerage){
  this。namename;
  this。ageage;
  }
  gettersetter
  }
  现在我要调用HelloService。sayHello(TestBeantestBean)这个方法
  作为服务消费者,应该怎么定义我们的请求,从而让服务端知道我是要调用这个方法呢?
  这需要我们将这个接口信息产生一个唯一的标识:这个标识会记录了接口名、具体是那个方法、然后具体参数是什么!
  然后将这些信息组织起来发送给服务端,我这里的方式是将信息保存为一个JSON格式的字符串来传输。
  比如上面的接口我们传输的数据大概是这样的:
  {
  interfaces:interfacecom。study。rpc。test。producer。HelloServicemethodsayHello
  parametercom。study。rpc。test。producer。TestBean,
  requestId:3,
  parameter:{
  com。study。rpc。test。producer。TestBean:{
  age:20,
  name:张三
  }
  }
  }
  嗯,我这里用一个JSON来标识这次调用是调用哪个接口的哪个方法,其中interface标识了唯一的类,parameter标识了里面具体有哪些参数,其中key就是参数的类全限定名,value就是这个类的JSON信息。
  可能看到这里,大家可能有意见了:数据不一定用JSON格式传输啊,而且使用JSON也不一定性能最高啊。
  你使用JDK的Serializable配合Netty的ObjectDecoder来实现,这当然也可以,其实这里是一个拓展点,我们应该要提供多种序列化方式来供用户选择
  但是这里选择了JSON的原因是因为它比较直观,对于写文章来说比较合理。
  开发服务提供者
  嗯,搞定了网络协议之后,我们开始开发服务提供者了。对于服务提供者,因为我们这里是写一个简单版本的RPC框架,为了保持简洁。
  我们不会引入类似Spring之类的容器框架,所以我们需要定义一个服务提供者的配置类,它用于定义这个服务提供者是什么接口,然后它具体的实例对象是什么:publicclassServiceConfig
  publicClass
  publicTinstance;
  publicServiceConfig(Class
  this。typetype;
  this。instanceinstance;
  }
  publicClass
  returntype;
  }
  publicvoidsetType(Class
  this。typetype;
  }
  publicTgetInstance{
  returninstance;
  }
  publicvoidsetInstance(Tinstance){
  this。instanceinstance;
  }
  }
  有了这个东西之后,我们就知道需要暴露哪些接口了。
  为了框架有一个统一的入口,我定义了一个类叫做ApplicationContext,可以认为这是一个应用程序上下文,他的构造函数,接收2个参数,代码如下:
  publicApplicationContext(StringregistryUrl,List
  1。保存需要暴露的接口配置
  this。serviceConfigsserviceConfigs?newArrayList:serviceConfigs;
  step2:实例化注册中心
  initRegistry(registryUrl);
  step3:将接口注册到注册中心,从注册中心获取接口,初始化服务接口列表
  RegistryInforegistryInfo;
  InetAddressaddrInetAddress。getLocalHost;
  Stringhostnameaddr。getHostName;
  StringhostAddressaddr。getHostAddress;
  registryInfonewRegistryInfo(hostname,hostAddress,port);
  doRegistry(registryInfo);
  step4:初始化Netty服务器,接受到请求,直接打到服务提供者的service方法中
  if(!this。serviceConfigs。isEmpty){
  需要暴露接口才暴露
  nettyServernewNettyServer(this。serviceConfigs,interfaceMethods);
  nettyServer。init(port);
  }
  }
  注册中心设计这里分为几个步骤,首先保存了接口配置,接着初始化注册中心,因为注册中心可能会提供多种来供用户选择,所以这里需要定义一个注册中心的接口:publicinterfaceRegistry{
  将生产者接口注册到注册中心
  paramclazz类
  paramregistryInfo本机的注册信息
  voidregister(Classclazz,RegistryInforegistryInfo)throwsException;
  }
  这里我们提供一个注册的方法,这个方法的语义是将clazz对应的接口注册到注册中心。接收两个参数,一个是接口的class对象,另一个是注册信息,
  里面包含了本机的一些基本信息,如下:publicclassRegistryInfo{
  privateStringhostname;
  privateStringip;
  privateIntegerport;
  publicRegistryInfo(Stringhostname,Stringip,Integerport){
  this。hostnamehostname;
  this。ipip;
  this。portport;
  }
  gettersetter
  }
  好了,定义好注册中心,回到之前的实例化注册中心的地方,代码如下:
  注册中心
  privateRegistryregistry;
  privatevoidinitRegistry(StringregistryUrl){
  if(registryUrl。startsWith(zookeeper:)){
  registryUrlregistryUrl。substring(12);
  registrynewZookeeperRegistry(registryUrl);
  }elseif(registryUrl。startsWith(multicast:)){
  registrynewMulticastRegistry(registryUrl);
  }
  }
  这里逻辑也非常简单,就是根据url的schema来判断是那个注册中心
  注册中心这里实现了2个实现类,分别使用zookeeper作为注册中心,另外一个是使用广播的方式作为注册中心。
  广播注册中心这里仅仅是做个示范,内部没有实现。我们主要是实现了zookeeper的注册中心。
  (当然了,如果有兴趣,可以实现更多的注册中心供用户选择,比如redis之类的,这里只是为了保持拓展点)
  那么实例化完注册中心之后,回到上面的代码:
  注册服务提供者
  step3:将接口注册到注册中心,从注册中心获取接口,初始化服务接口列表
  RegistryInforegistryInfo;
  InetAddressaddrInetAddress。getLocalHost;
  Stringhostnameaddr。getHostName;
  StringhostAddressaddr。getHostAddress;
  registryInfonewRegistryInfo(hostname,hostAddress,port);
  doRegistry(registryInfo);
  这里逻辑很简单,就是获取本机的的基本信息构造成RegistryInfo,然后调用了doRegistry方法:
  接口方法对应method对象
  privateMap
  privatevoiddoRegistry(RegistryInforegistryInfo)throwsException{
  for(ServiceConfigconfig:serviceConfigs){
  Classtypeconfig。getType;
  registry。register(type,registryInfo);
  MethoddeclaredMethodstype。getDeclaredMethods;
  for(Methodmethod:declaredMethods){
  StringidentifyInvokeUtils。buildInterfaceMethodIdentify(type,method);
  interfaceMethods。put(identify,method);
  }
  }
  }
  这里做了2件事情:
  将接口注册到注册中心中对于每一个接口的每一个方法,生成一个唯一标识,保存在interfaceMethods集合中
  下面分别分析这两件事情,首先是注册方法:因为我们用到了zookeeper,为了方便,引入了zookeeper的客户端框架curator
  dependency
  groupIdorg。apache。curatorgroupId
  curatorrecipesartifactId
  version2。3。0version
  dependency
  接着看代码:
  publicclassZookeeperRegistryimplementsRegistry{
  privateCuratorFrameworkclient;
  publicZookeeperRegistry(StringconnectString){
  RetryPolicyretryPolicynewExponentialBackoffRetry(1000,3);
  clientCuratorFrameworkFactory。newClient(connectString,retryPolicy);
  client。start;
  try{
  StatmyRPCclient。checkExists。forPath(myRPC);
  if(myRPC){
  client。create
  。creatingParentsIfNeeded
  。forPath(myRPC);
  }
  System。out。println(ZookeeperClient初始化完毕。。。。。。);
  }catch(Exceptione){
  e。printStackTrace;
  }
  }
  Override
  publicvoidregister(Classclazz,RegistryInforegistryInfo)throwsException{
  1。注册的时候,先从zk中获取数据
  2。将自己的服务器地址加入注册中心中
  为每一个接口的每一个方法注册一个临时节点,然后key为接口方法的唯一标识,data为服务地址列表
  MethoddeclaredMethodsclazz。getDeclaredMethods;
  for(Methodmethod:declaredMethods){
  StringkeyInvokeUtils。buildInterfaceMethodIdentify(clazz,method);
  StringpathmyRPCkey;
  Statstatclient。checkExists。forPath(path);
  List
  if(stat!){
  如果这个接口已经有人注册过了,把数据拿回来,然后将自己的信息保存进去
  bytebytesclient。getData。forPath(path);
  StringdatanewString(bytes,StandardCharsets。UTF8);
  registryInfosJSONArray。parseArray(data,RegistryInfo。class);
  if(registryInfos。contains(registryInfo)){
  正常来说,zk的临时节点,断开连接后,直接就没了,但是重启会经常发现存在节点,所以有了这样的代码
  System。out。println(地址列表已经包含本机【key】,不注册了);
  }else{
  registryInfos。add(registryInfo);
  client。setData。forPath(path,JSONArray。toJSONString(registryInfos)。getBytes);
  System。out。println(注册到注册中心,路径为:【path】信息为:registryInfo);
  }
  }else{
  registryInfosnewArrayList;
  registryInfos。add(registryInfo);
  client。create
  。creatingParentsIfNeeded
  临时节点,断开连接就关闭
  。withMode(CreateMode。EPHEMERAL)
  。forPath(path,JSONArray。toJSONString(registryInfos)。getBytes);
  System。out。println(注册到注册中心,路径为:【path】信息为:registryInfo);
  }
  }
  }
  }
  zookeeper注册中心在初始化的时候,会建立好连接。然后注册的时候,针对clazz接口的每一个方法,都会生成一个唯一标识
  这里使用了InvokeUtils。buildInterfaceMethodIdentify方法:
  publicstaticStringbuildInterfaceMethodIdentify(Classclazz,Methodmethod){
  MapString,StringmapnewLinkedHashMap;
  map。put(interface,clazz。getName);
  map。put(method,method。getName);
  Parameterparametersmethod。getParameters;
  if(parameters。length0){
  StringBuilderparamnewStringBuilder;
  for(inti0;iparameters。length;i){
  Parameterpparameters〔i〕;
  param。append(p。getType。getName);
  if(iparameters。length){
  param。append(,);
  }
  }
  map。put(parameter,param。toString);
  }
  returnmap2String(map);
  }
  publicstaticStringmap2String(MapString,Stringmap){
  StringBuildersbnewStringBuilder;
  Iteratormap。entrystring,spanStringiteratormap。entrySet。iterator;
  while(iterator。hasNext){
  Map。EntryString,Stringentryiterator。next;
  sb。append(entry。getKeyentry。getValue);
  if(iterator。hasNext){
  sb。append();
  }
  }
  returnsb。toString;
  }
  其实就是对接口的方法使用他们的限定名和参数来组成一个唯一的标识,比如
  strongtoutiaooriginspanHelloServicesayHello(TestBean)strong生成的大概是这样的:
  interfacecom。study。rpc。test。producer。HelloServicemethodsayHello
  parametercom。study。rpc。test。producer。TestBean
  接下来的逻辑就简单了,在Zookeeper中的myRPC路径下面建立临时节点,节点名称为我们上面的接口方法唯一标识,数据内容为机器信息。
  之所以采用临时节点是因为:如果机器宕机了,连接断开之后,消费者可以通过zookeeper的watcher机制感知到
  大概看起来是这样的:
  myRPCinterfacecom。study。rpc。test。producer。HelloServicemethodsayHello
  parametercom。study。rpc。test。producer。TestBean
  〔
  {
  hostname:peer1,
  port:8080
  },
  {
  hostname:peer2,
  port:8081
  }
  〕
  通过这样的方式,在服务消费的时候就可以拿到这样的注册信息,然后知道可以调用那台机器的那个端口。
  好了,注册中心弄完了之后,我们回到前面说的注册方法做的第二件事情,我们将每一个接口方法标识的方法放入了一个map中:
  接口方法对应method对象
  privateMapString,MethodinterfaceMethodsnewConcurrentHashMap;这个的原因是因为,我们在收到网络请求的时候,需要调用反射的方式调用method对象,所以存起来。
  启动网络服务端接受请求
  接下来我们就可以看第四步了:
  step4:初始化Netty服务器,接受到请求,直接打到服务提供者的service方法中
  if(!this。serviceConfigs。isEmpty){
  需要暴露接口才暴露
  nettyServernewNettyServer(this。serviceConfigs,interfaceMethods);
  nettyServer。init(port);
  }
  因为这里使用Netty来做的所以需要引入Netty的依赖:
  dependency
  groupIdio。nettygroupId
  nettyallartifactId
  version4。1。30。Finalversion
  dependency
  接着来分析:
  publicclassNettyServer{
  负责调用方法的handler
  privateRpcInvokeHandlerrpcInvokeHandler;
  publicNettyServer(List
  this。rpcInvokeHandlernewRpcInvokeHandler(serverConfigs,interfaceMethods);
  }
  publicintinit(intport)throwsException{
  EventLoopGroupbossGroupnewNioEventLoopGroup;
  EventLoopGroupworkerGroupnewNioEventLoopGroup;
  ServerBootstrapbnewServerBootstrap;
  b。group(bossGroup,workerGroup)
  。channel(NioServerSocketChannel。class)
  。option(ChannelOption。SOBACKLOG,1024)
  。childHandler(newChannelInitializer
  Override
  protectedvoidinitChannel(SocketChannelch)throwsException{
  ByteBufdelimiterUnpooled。copiedBuffer();
  设置按照分隔符来切分消息,单条消息限制为1MB
  ch。pipeline。addLast(newDelimiterBasedFrameDecoder(10241024,delimiter));
  ch。pipeline。addLast(newStringDecoder);
  ch。pipeline。addLast。addLast(rpcInvokeHandler);
  }
  });
  ChannelFuturesyncb。bind(port)。sync;
  System。out。println(启动NettyService,端口为:port);
  returnport;
  }
  }
  这部分主要的都是netty的api,我们不做过多的说明,就简单的说一下:我们通过作为标识符号来区分两条信息,然后一条信息的最大长度为1MB所有逻辑都在RpcInvokeHandler中,这里面传进去了配置的服务接口实例,以及服务接口实例每个接口方法唯一标识对应的Method对象的Map集合。
  publicclassRpcInvokeHandlerextendsChannelInboundHandlerAdapter{
  接口方法唯一标识对应的Method对象
  privateMapString,MethodinterfaceMethods;
  接口对应的实现类
  privateMapclass,ObjectinterfaceToInstance;
  线程池,随意写的,不要吐槽
  privateThreadPoolExecutorthreadPoolExecutornewThreadPoolExecutor(10,
  50,60,TimeUnit。SECONDS,newLinkedBlockingQueue(100),
  newThreadFactory{
  AtomicIntegermnewAtomicInteger(0);
  Override
  publicThreadnewThread(Runnabler){
  returnnewThread(r,IOthreadm。incrementAndGet);
  }
  });
  publicRpcInvokeHandler(List
  MapstrongtoutiaooriginspanStringstrong,MethodinterfaceMethods){
  strongtoutiaooriginspanthisstrong。interfaceToInstancestrongtoutiaooriginspannewstrongConcurrentHashMap;
  strongtoutiaooriginspanthisstrong。interfaceMethodsinterfaceMethods;
  strongtoutiaooriginspanforstrong(ServiceConfigconfig:serviceConfigList){
  interfaceToInstance。put(config。getType,config。getInstance);
  }
  }
  Override
  strongtoutiaooriginspanpublicstrongstrongtoutiaooriginspanvoidstrongchannelRead(ChannelHandlerContextctx,strongtoutiaooriginspanObjectstrongmsg)strongtoutiaooriginspanthrowsstrongException{
  strongtoutiaooriginspantrystrong{
  strongtoutiaooriginspanStringstrongmessage(strongtoutiaooriginspanStringstrong)msg;
  这里拿到的是一串JSON数据,解析为Request对象,
  事实上这里解析网络数据,可以用序列化方式,定一个接口,可以实现JSON格式序列化,或者其他序列化
  但是demo版本就算了。
  System。out。println(接收到消息:msg);
  RpcRequestrequestRpcRequest。parse(message,ctx);
  threadPoolExecutor。execute(strongtoutiaooriginspannewstrongRpcInvokeTask(request));
  }strongtoutiaooriginspanfinallystrong{
  ReferenceCountUtil。release(msg);
  }
  }
  Override
  strongtoutiaooriginspanpublicstrongstrongtoutiaooriginspanvoidstrongchannelReadComplete(ChannelHandlerContextctx)strongtoutiaooriginspanthrowsstrongException{
  ctx。flush;
  }
  Override
  strongtoutiaooriginspanpublicstrongstrongtoutiaooriginspanvoidstrongexceptionCaught(ChannelHandlerContextctx,Throwablecause)strongtoutiaooriginspanthrowsstrongException{
  System。out。println(发生了异常。。。cause);
  cause。printStackTrace;
  ctx。close;
  }
  strongtoutiaooriginspanpublicstrongclassRpcInvokeTaskimplementsRunnable{
  strongtoutiaooriginspanprivatestrongRpcRequestrpcRequest;
  RpcInvokeTask(RpcRequestrpcRequest){
  strongtoutiaooriginspanthisstrong。rpcRequestrpcRequest;
  }
  Override
  strongtoutiaooriginspanpublicstrongstrongtoutiaooriginspanvoidstrongrun{
  strongtoutiaooriginspantrystrong{
  数据大概是这样子的
  {interfaces:interfacecom。study。rpc。test。producer。HelloServicemethodsayHellometercom
  。study。rpc。test。producer。TestBean,requestId:3,parameter:{com。study。rpc。test。producer
  。TestBean:{age:20,name:张三}}}
  这里希望能拿到每一个服务对象的每一个接口的特定声明
  strongtoutiaooriginspanStringstronginterfaceIdentityrpcRequest。getInterfaceIdentity;
  MethodmethodinterfaceMethods。get(interfaceIdentity);
  MapstrongtoutiaooriginspanStringstrong,strongtoutiaooriginspanStringstrongmapstring2Map(interfaceIdentity);
  strongtoutiaooriginspanStringstronginterfaceNamemap。get(interface);
  ClassinterfaceClassClass。forName(interfaceName);
  strongtoutiaooriginspanObjectstrongointerfaceToInstance。get(interfaceClass);
  strongtoutiaooriginspanStringstrongparameterStringmap。get(parameter);
  strongtoutiaooriginspanObjectstrongresult;
  strongtoutiaooriginspanifstrong(parameterString!){
  strongtoutiaooriginspanStringstrongparameterTypeClassparameterString。split(,);
  MapstrongtoutiaooriginspanStringstrong,strongtoutiaooriginspanObjectstrongparameterMaprpcRequest。getParameterMap;
  strongtoutiaooriginspanObjectstrongparameterInstancestrongtoutiaooriginspannewstrongstrongtoutiaooriginspanObjectstrong〔parameterTypeClass。length〕;
  strongtoutiaooriginspanforstrong(inti0;iparameterTypeClass。length;i){
  strongtoutiaooriginspanStringstrongparameterClazzparameterTypeClass〔i〕;
  parameterInstance〔i〕parameterMap。get(parameterClazz);
  }
  resultmethod。invoke(o,parameterInstance);
  }strongtoutiaooriginspanelsestrong{
  resultmethod。invoke(o);
  }
  写回响应
  ChannelHandlerContextctxrpcRequest。getCtx;
  strongtoutiaooriginspanStringstrongrequestIdrpcRequest。getRequestId;
  RpcResponseresponseRpcResponse。create(strongtoutiaooriginspanJSONObjectstrong。toJSONString(result),interfaceIdentity,
  requestId);
  strongtoutiaooriginspanStringstrongsstrongtoutiaooriginspanJSONObjectstrong。toJSONString(response);
  ByteBufbyteBufUnpooled。copiedBuffer(s。getBytes);
  ctx。writeAndFlush(byteBuf);
  System。out。println(响应给客户端:s);
  }strongtoutiaooriginspancatchstrong(Exceptione){
  e。printStackTrace;
  }
  }
  strongtoutiaooriginspanpublicstrongstrongtoutiaooriginspanstaticstrongMapstrongtoutiaooriginspanStringstrong,strongtoutiaooriginspanStringstrongstring2Map(strongtoutiaooriginspanStringstrongstr){
  strongtoutiaooriginspanStringstrongsplitstr。split();
  MapstrongtoutiaooriginspanStringstrong,strongtoutiaooriginspanStringstrongmapstrongtoutiaooriginspannewstrongstrongtoutiaooriginspanHashMapstrong(16);
  strongtoutiaooriginspanforstrong(strongtoutiaooriginspanStringstrongs:split){
  strongtoutiaooriginspanStringstrongsplit1s。split();
  map。put(split1〔0〕,split1〔〕);
  }
  strongtoutiaooriginspanreturnstrongmap;
  }
  }
  }
  这里说明一下上面的逻辑:channelRead方法用于接收消息,接收到的就是我们前面分析的那个JSON格式的数据,接着我们将消息解析成RpcRequest
  publicclassRpcRequest{
  privateStringinterfaceIdentity;
  privateMapString,ObjectparameterMapnewHashMap;
  privateChannelHandlerContextctx;
  privateStringrequestId;
  publicstaticRpcRequestparse(Stringmessage,ChannelHandlerContextctx)throwsClassNotFoundException{
  {
  interfaces:interfacecom。study。rpc。test。producer。HelloServicemethodsayHello2meterjava。lang
  。String,com。study。rpc。test。producer。TestBean,
  parameter:{
  java。lang。String:haha,
  com。study。rpc。test。producer。TestBean:{
  name:小王,
  age:20
  }
  }
  }
  JSONObjectjsonObjectJSONObject。parseObject(message);
  StringinterfacesjsonObject。getString(interfaces);
  JSONObjectparameterjsonObject。getJSONObject(parameter);
  SetStringstringsparameter。keySet;
  RpcRequestrequestnewRpcRequest;
  request。setInterfaceIdentity(interfaces);
  MapString,ObjectparameterMapnewHashMap(16);
  StringrequestIdjsonObject。getString(requestId);
  for(Stringkey:strings){
  if(key。equals(java。lang。String)){
  parameterMap。put(key,parameter。getString(key));
  }else{
  ClassclazzClass。forName(key);
  Objectobjectparameter。getObject(key,clazz);
  parameterMap。put(key,object);
  }
  }
  request。setParameterMap(parameterMap);
  request。setCtx(ctx);
  request。setRequestId(requestId);
  returnrequest;
  }
  }
  接着从request中解析出来需要调用的接口,然后通过反射调用对应的接口,得到结果后我们将响应封装成PrcResponse写回给客户端:
  publicclassRpcResponse{
  privateStringresult;
  privateStringinterfaceMethodIdentify;
  privateStringrequestId;
  publicStringgetResult{
  returnresult;
  }
  publicvoidsetResult(Stringresult){
  this。resultresult;
  }
  publicstaticRpcResponsecreate(Stringresult,StringinterfaceMethodIdentify,StringrequestId){
  RpcResponseresponsenewRpcResponse;
  response。setResult(result);
  response。setInterfaceMethodIdentify(interfaceMethodIdentify);
  response。setRequestId(requestId);
  returnresponse;
  }
  }
  里面包含了请求的结果JSON串,接口方法唯一标识,请求ID。数据大概看起来这个样子:
  {interfaceMethodIdentify:interfacecom。study。rpc。test。producer。HelloServicemethodsayHello
  parametercom。study。rpc。test。producer。TestBean,requestId:,
  result:牛逼,我收到了消息:TestBean{name张三,age20}}
  通过这样的信息,客户端就可以通过响应结果解析出来。
  测试服务提供者
  既然我们代码写完了,现在需要测试一把:
  首先我们先写一个HelloService的实现类出来:
  publicclassHelloServiceImplimplementsHelloService{
  Override
  publicStringsayHello(TestBeantestBean){
  return牛逼,我收到了消息:testBean;
  }
  }
  接着编写服务提供者代码:
  publicclassTestProducer{
  publicstaticvoidmain(String〔〕args)throwsException{
  StringconnectionStringzookeeper:localhost1:2181,localhost2:2181,localhost3:2181;
  HelloServiceservicenewHelloServiceImpl;
  ServiceConfig
  List
  serviceConfigList。add(config);
  ApplicationContextctxnewApplicationContext(connectionString,serviceConfigList,
  ,50071);
  }
  }
  接着启动起来,看到日志:
  ZookeeperClient初始化完毕。。。。。。
  注册到注册中心,路径为:【myRPCinterfacecom。study。rpc。test。producer。HelloService
  methodsayHellometercom。study。rpc。test。producer。TestBean】
  信息为:RegistryInfo{hostnamelocalhost,ip192。168。16。7,port50071}
  启动NettyService,端口为:50071
  这个时候,我们期望用NettyClient发送请求:
  {
  interfaces:interfacecom。study。rpc。test。producer。HelloService
  methodsayHellometercom。study。rpc。test。producer。TestBean,
  requestId:3,
  parameter:{
  com。study。rpc。test。producer。TestBean:{
  age:20,
  name:张三
  }
  }
  }
  得到的响应应该是:
  {interfaceMethodIdentify:interfacecom。study。rpc。test。producer。HelloServicemethodsayHello
  parametercom。study。rpc。test。producer。TestBean,requestId:,
  result:牛逼,我收到了消息:TestBean{name张三,age20}}
  那么,可以编写一个测试程序(这个程序仅仅用于中间测试用,读者不必理解):
  publicclassNettyClient{
  publicstaticvoidmain(String〔〕args){
  EventLoopGroupgroupnewNioEventLoopGroup;
  try{
  BootstrapbnewBootstrap;
  b。group(group)
  。channel(NioSocketChannel。class)
  。option(ChannelOption。TCPNODELAY,true)
  。handler(newChannelInitializer
  Override
  protectedvoidinitChannel(SocketChannelch)throwsException{
  ch。pipeline。addLast(newStringDecoder);
  ch。pipeline。addLast(newNettyClientHandler);
  }
  });
  ChannelFuturesyncb。connect(127。0。0。1,50071)。sync;
  sync。channel。closeFuture。sync;
  }catch(Exceptione){
  e。printStackTrace;
  }finally{
  group。shutdownGracefully;
  }
  }
  privatestaticclassNettyClientHandlerextendsChannelInboundHandlerAdapter{
  Override
  publicvoidchannelActive(ChannelHandlerContextctx)throwsException{
  JSONObjectjsonObjectnewJSONObject;
  jsonObject。put(interfaces,interfacecom。study。rpc。test。producer
  。HelloServicemethodsayHellometercom。study。rpc。test。producer。TestBean);
  JSONObjectparamnewJSONObject;
  JSONObjectbeannewJSONObject;
  bean。put(age,20);
  bean。put(name,张三);
  param。put(com。study。rpc。test。producer。TestBean,bean);
  jsonObject。put(parameter,param);
  jsonObject。put(requestId,);
  System。out。println(发送给服务端JSON为:jsonObject。toJSONString);
  StringmsgjsonObject。toJSONString;
  ByteBufbyteBufUnpooled。buffer(msg。getBytes。length);
  byteBuf。writeBytes(msg。getBytes);
  ctx。writeAndFlush(byteBuf);
  }
  Override
  publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{
  System。out。println(收到消息:msg);
  }
  }
  }
  启动之后,看到控制台输出:
  发送给服务端JSON为:{interfaces:interfacecom。study。rpc。test。producer。HelloServicemethodsayHello
  parametercom。study。rpc。test。producer。TestBean,requestId:3,
  parameter:{com。study。rpc。test。producer。TestBean:{name:张三,age:20}}}
  收到消息:{interfaceMethodIdentify:interfacecom。study。rpc。test。producer。HelloService
  methodsayHellometercom。study。rpc。test。producer。TestBean,requestId:3,
  result:牛逼,我收到了消息:TestBean{name张三,age20}}
  bingo,完美实现了RPC的服务提供者。接下来我们只需要实现服务消费者就完成了。
  开发服务消费者
  服务消费者是同样的处理,我们同样要定义一个消费者的配置:
  publicclassReferenceConfig
  privateClass
  publicReferenceConfig(Class
  this。typetype;
  }
  publicClass
  returntype;
  }
  publicvoidsetType(Class
  this。typetype;
  }
  }
  然后我们是统一入口,在ApplicationContext中修改代码:
  publicApplicationContext(StringregistryUrl,List
  List
  step1:保存服务提供者和消费者
  this。serviceConfigsserviceConfigs?newArrayList:serviceConfigs;
  this。referenceConfigsreferenceConfigs?newArrayList:referenceConfigs;
  。。。。
  }
  privatevoiddoRegistry(RegistryInforegistryInfo)throwsException{
  for(ServiceConfigconfig:serviceConfigs){
  Classtypeconfig。getType;
  registry。register(type,registryInfo);
  MethoddeclaredMethodstype。getDeclaredMethods;
  for(Methodmethod:declaredMethods){
  StringidentifyInvokeUtils。buildInterfaceMethodIdentify(type,method);
  interfaceMethods。put(identify,method);
  }
  }
  for(ReferenceConfigconfig:referenceConfigs){
  List
  if(registryInfos!){
  interfacesMethodRegistryList。put(config。getType,registryInfos);
  initChannel(registryInfos);
  }
  }
  }
  在注册的时候,我们需要将需要消费的接口,通过注册中心抓取出来,所以注册中心要增加一个接口方法:
  publicinterfaceRegistry{
  将生产者接口注册到注册中心
  paramclazz类
  paramregistryInfo本机的注册信息
  voidregister(Classclazz,RegistryInforegistryInfo)throwsException;
  为服务提供者抓取注册表
  paramclazz类
  return服务提供者所在的机器列表
  List
  }
  获取服务提供者的机器列表
  具体在Zookeeper中的实现如下:
  Override
  publicList
  MethoddeclaredMethodsclazz。getDeclaredMethods;
  List
  for(Methodmethod:declaredMethods){
  StringkeyInvokeUtils。buildInterfaceMethodIdentify(clazz,method);
  StringpathmyRPCkey;
  Statstatclient。checkExists
  。forPath(path);
  if(stat){
  这里可以添加watcher来监听变化,这里简化了,没有做这个事情
  System。out。println(警告:无法找到服务接口:path);
  continue;
  }
  if(registryInfos){
  bytebytesclient。getData。forPath(path);
  StringdatanewString(bytes,StandardCharsets。UTF8);
  registryInfosJSONArray。parseArray(data,RegistryInfo。class);
  }
  }
  returnregistryInfos;
  }
  其实就是去zookeeper获取节点中的数据,得到接口所在的机器信息,获取到的注册信息诸侯,就会调用以下代码:
  if(registryInfos!){
  保存接口和服务地址
  interfacesMethodRegistryList。put(config。getType,registryInfos);
  初始化网络连接
  initChannel(registryInfos);
  }
  privatevoidinitChannel(List
  for(RegistryInfoinfo:registryInfos){
  if(!channels。containsKey(info)){
  System。out。println(开始建立连接:info。getIp,info。getPort);
  NettyClientclientnewNettyClient(info。getIp,info。getPort);
  client。setMessageCallback(message{
  这里收单服务端返回的消息,先压入队列
  RpcResponseresponseJSONObject。parseObject(message,RpcResponse。class);
  responses。offer(response);
  synchronized(ApplicationContext。this){
  ApplicationContext。this。notifyAll;
  }
  });
  等待连接建立
  ChannelHandlerContextctxclient。getCtx;
  channels。put(info,ctx);
  }
  }
  }
  我们会针对每一个唯一的RegistryInfo建立一个连接,然后有这样一段代码:
  client。setMessageCallback(message{
  这里收单服务端返回的消息,先压入队列
  RpcResponseresponseJSONObject。parseObject(message,RpcResponse。class);
  responses。offer(response);
  synchronized(ApplicationContext。this){
  ApplicationContext。this。notifyAll;
  }
  });
  设置一个callback,用于收到消息的时候,回调这里的代码,这部分我们后面再分析。
  然后在client。getCtx的时候,同步阻塞直到连接完成,建立好连接后通过,NettyClient的代码如下:
  publicclassNettyClient{
  privateChannelHandlerContextctx;
  privateMessageCallbackmessageCallback;
  publicNettyClient(Stringip,Integerport){
  EventLoopGroupgroupnewNioEventLoopGroup;
  try{
  BootstrapbnewBootstrap;
  b。group(group)
  。channel(NioSocketChannel。class)
  。option(ChannelOption。TCPNODELAY,true)
  。handler(newChannelInitializer
  Override
  protectedvoidinitChannel(SocketChannelch)throwsException{
  ByteBufdelimiterUnpooled。copiedBuffer(。getBytes);
  设置按照分隔符来切分消息,单条消息限制为1MB
  ch。pipeline。addLast(newDelimiterBasedFrameDecoder(10241024,delimiter));
  ch。pipeline。addLast(newStringDecoder);
  ch。pipeline。addLast(newNettyClientHandler);
  }
  });
  ChannelFuturesyncb。connect(ip,port)。sync;
  }catch(Exceptione){
  e。printStackTrace;
  }
  }
  publicvoidsetMessageCallback(MessageCallbackcallback){
  this。messageCallbackcallback;
  }
  publicChannelHandlerContextgetCtxthrowsInterruptedException{
  System。out。println(等待连接成功。。。);
  if(ctx){
  synchronized(this){
  wait;
  }
  }
  returnctx;
  }
  privateclassNettyClientHandlerextendsChannelInboundHandlerAdapter{
  Override
  publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{
  try{
  Stringmessage(String)msg;
  if(messageCallback!){
  messageCallback。onMessage(message);
  }
  }finally{
  ReferenceCountUtil。release(msg);
  }
  }
  Override
  publicvoidchannelActive(ChannelHandlerContextctx)throwsException{
  NettyClient。this。ctxctx;
  System。out。println(连接成功:ctx);
  synchronized(NettyClient。this){
  NettyClient。this。notifyAll;
  }
  }
  Override
  publicvoidchannelReadComplete(ChannelHandlerContextctx)throwsException{
  ctx。flush;
  }
  Override
  publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{
  cause。printStackTrace;
  }
  }
  publicinterfaceMessageCallback{
  voidonMessage(Stringmessage);
  }
  }
  这里主要是用了wait和notifyAll来实现同步阻塞等待连接建立。
  建立好连接后,我们保存到集合中:
  等待连接建立
  ChannelHandlerContextctxclient。getCtx;
  channels。put(info,ctx);
  发送请求好了,到了这里我们为每一个需要消费的接口建立了网络连接,接下来要做的事情就是提供一个接口给用户获取服务提供者实例:
  我把这个方法写在ApplicationContext中:
  负责生成requestId的类
  privateLongAdderrequestIdWorkernewLongAdder;
  获取调用服务
  SuppressWarnings(unchecked)
  public
  return(T)Proxy。newProxyInstance(getClass。getClassLoader,newClass{clazz},newInvocationHandler{
  Override
  publicObjectinvoke(Objectproxy,Methodmethod,Object〔〕args)throwsThrowable{
  StringmethodNamemethod。getName;
  if(equals。equals(methodName)hashCode。equals(methodName)){
  thrownewIllegalAccessException(不能访问methodName方法);
  }
  if(toString。equals(methodName)){
  returnclazz。getNamemethodName;
  }
  step1:获取服务地址列表
  List
  if(registryInfos){
  thrownewRuntimeException(无法找到服务提供者);
  }
  step2:负载均衡
  RegistryInforegistryInfoloadBalancer。choose(registryInfos);
  ChannelHandlerContextctxchannels。get(registryInfo);
  StringidentifyInvokeUtils。buildInterfaceMethodIdentify(clazz,method);
  StringrequestId;
  synchronized(ApplicationContext。this){
  requestIdWorker。increment;
  requestIdString。valueOf(requestIdWorker。longValue);
  }
  InvokerinvokernewDefaultInvoker(method。getReturnType,ctx,requestId,identify);
  inProgressInvoker。put(identifyrequestId,invoker);
  returninvoker。invoke(args);
  }
  });
  }
  这里主要是通过动态代理来实现的,首先通过class来获取对应的机器列表,接着通过loadBalancer来选择一个机器。这个LoaderBalance是一个接口:
  publicinterfaceLoadBalancer{
  选择一个生产者
  paramregistryInfos生产者列表
  return选中的生产者
  RegistryInfochoose(List
  }
  在ApplicationContext初始化的时候可以选择不同的实现,我这里主要实现了一个简单的随机算法(后续可以拓展为其他的,比如RoundRobin之类的):
  publicclassRandomLoadbalancerimplementsLoadBalancer{
  Override
  publicRegistryInfochoose(List
  RandomrandomnewRandom;
  intindexrandom。nextInt(registryInfos。size);
  returnregistryInfos。get(index);
  }
  }
  接着构造接口方法的唯一标识identify,还有一个requestId。
  为什么需要一个requestId呢?
  这是因为我们在处理响应的时候,需要找到某个响应是对应的哪个请求,但是仅仅使用identify是不行的
  因为我们同一个应用程序中可能会有多个线程同时调用同一个接口的同一个方法,这样的identify是相同的。
  所以我们需要用identifyrequestId的方式来判断,reqeustId是一个自增的LongAddr。服务端在响应的时候会将requestId返回。
  接着我们构造了一个Invoker,把它放入inProgressInvoker的集合中。调用了其invoke方法:
  InvokerinvokernewDefaultInvoker(method。getReturnType,ctx,requestId,identify);
  inProgressInvoker。put(identifyrequestId,invoker);
  阻塞等待结果
  returninvoker。invoke(args);
  publicclassDefaultInvoker
  privateChannelHandlerContextctx;
  privateStringrequestId;
  privateStringidentify;
  privateClass
  privateTresult;
  DefaultInvoker(Class
  this。returnTypereturnType;
  this。ctxctx;
  this。requestIdrequestId;
  this。identifyidentify;
  }
  SuppressWarnings(unckecked)
  Override
  publicTinvoke(Object〔〕args){
  JSONObjectjsonObjectnewJSONObject;
  jsonObject。put(interfaces,identify);
  JSONObjectparamnewJSONObject;
  if(args!){
  for(Objectobj:args){
  param。put(obj。getClass。getName,obj);
  }
  }
  jsonObject。put(parameter,param);
  jsonObject。put(requestId,requestId);
  System。out。println(发送给服务端JSON为:jsonObject。toJSONString);
  StringmsgjsonObject。toJSONString;
  ByteBufbyteBufUnpooled。buffer(msg。getBytes。length);
  byteBuf。writeBytes(msg。getBytes);
  ctx。writeAndFlush(byteBuf);
  waitForResult;
  returnresult;
  }
  Override
  publicvoidsetResult(Stringresult){
  synchronized(this){
  this。resultJSONObject。parseObject(result,returnType);
  notifyAll;
  }
  }
  privatevoidwaitForResult{
  synchronized(this){
  try{
  wait;
  }catch(InterruptedExceptione){
  e。printStackTrace;
  }
  }
  }
  }
  我们可以看到调用Invoker的invoke方法之后,会运行到waitForResult这里,这里已经把请求通过网络发送出去了,但是就会被卡住。
  这是因为我们的网络请求的结果不是同步返回的,有可能是客户端同时发起很多个请求,所以我们不可能在这里让他同步阻塞等待的。
  接受响应那么对于服务消费者而言,把请求发送出去但是卡住了,这个时候当服务端处理完之后,会把消息返回给客户端。返回的入口在
  NettyClient的onChannelRead中:
  Override
  publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{
  try{
  Stringmessage(String)msg;
  if(messageCallback!){
  messageCallback。onMessage(message);
  }
  }finally{
  ReferenceCountUtil。release(msg);
  }
  }
  这里通过callback回调出去。是否还记的我们在初始化NettyClient的时候,会设置一个callback?
  响应队列
  privateConcurrentLinkedQueue
  client。setMessageCallback(message{
  这里收单服务端返回的消息,先压入队列
  RpcResponseresponseJSONObject。parseObject(message,RpcResponse。class);
  responses。offer(response);
  synchronized(ApplicationContext。this){
  ApplicationContext。this。notifyAll;
  }
  });
  这里接受消息之后,解析成为一个RpcResponse对象,然后压入responses队列中,这样我们就把所有的请求响应放入队列中。
  但是这样一来,我们应该怎么把响应结果返回给调用的地方呢?我们可以这样做:起一个或多个后台线程,然后从队列中拿出响应,然后根据响应从我们之前保存的inProcessInvoker中找出对应的Invoker,然后把结果返回回去
  publicApplicationContext(。。。。){
  。。。。。
  step5:启动处理响应的processor
  initProcessor;
  }
  privatevoidinitProcessor{
  事实上,这里可以通过配置文件读取,启动多少个processor
  intnum;
  processorsnewResponseProcessor〔num〕;
  for(inti0;i;i){
  processors〔i〕createProcessor(i);
  }
  }
  处理响应的线程
  privateclassResponseProcessorextendsThread{
  Override
  publicvoidrun{
  System。out。println(启动响应处理线程:getName);
  while(true){
  多个线程在这里获取响应,只有一个成功
  RpcResponseresponseresponses。poll;
  if(response){
  try{
  synchronized(ApplicationContext。this){
  如果没有响应,先休眠
  ApplicationContext。this。wait;
  }
  }catch(InterruptedExceptione){
  e。printStackTrace;
  }
  }else{
  System。out。println(收到一个响应:response);
  StringinterfaceMethodIdentifyresponse。getInterfaceMethodIdentify;
  StringrequestIdresponse。getRequestId;
  StringkeyinterfaceMethodIdentifyrequestId;
  InvokerinvokerinProgressInvoker。remove(key);
  invoker。setResult(response。getResult);
  }
  }
  }
  }
  这里面如果从队列中拿不到数据,就会调用wait方法等待这里需要注意的是,在callbak中获取到响应的时候我们是会调用notifyAll来唤醒这里的线程的:
  responses。offer(response);
  synchronized(ApplicationContext。this){
  ApplicationContext。this。notifyAll;
  }
  这里被唤醒之后,就会有多个线程去争抢那个响应,因为队列是线程安全的,所以这里多个线程可以获取到响应结果。
  接着拿到结果之后,通过identifyrequestId构造成唯一的请求标识,从inProgressInvoker中获取对应的invoker,然后通过setResult将结果设置进去:
  StringkeyinterfaceMethodIdentifyrequestId;
  InvokerinvokerinProgressInvoker。remove(key);
  invoker。setResult(response。getResult);
  Override
  publicvoidsetResult(Stringresult){
  synchronized(this){
  this。resultJSONObject。parseObject(result,returnType);
  notifyAll;
  }
  }
  这里设置进去之后,就会将结果用json反序列化成为用户需要的结果,然后调用其notifyAll方法唤醒invoke方法被阻塞的线程:
  SuppressWarnings(unckecked)
  Override
  publicTinvoke(Object〔〕args){
  JSONObjectjsonObjectnewJSONObject;
  jsonObject。put(interfaces,identify);
  JSONObjectparamnewJSONObject;
  if(args!){
  for(Objectobj:args){
  param。put(obj。getClass。getName,obj);
  }
  }
  jsonObject。put(parameter,param);
  jsonObject。put(requestId,requestId);
  System。out。println(发送给服务端JSON为:jsonObject。toJSONString);
  StringmsgjsonObject。toJSONStringNettyServer。DELIMITER;
  ByteBufbyteBufUnpooled。buffer(msg。getBytes。length);
  byteBuf。writeBytes(msg。getBytes);
  ctx。writeAndFlush(byteBuf);
  这里被唤醒
  waitForResult;
  returnresult;
  }
  然后就可以返回结果了,返回的结果就会返回给用户了。
  整体测试到了这里我们的生产者和消费者的代码都写完了,我们来整体测试一遍。生产者的代码是和之前的一致的:
  publicclassTestProducer{
  publicstaticvoidmain(String〔〕args)throwsException{
  StringconnectionStringzookeeper:localhost1:2181,localhost2:2182,localhost3:2181;
  HelloServiceservicenewHelloServiceImpl;
  ServiceConfig
  List
  serviceConfigList。add(config);
  ApplicationContextctxnewApplicationContext(connectionString,serviceConfigList,,50071);
  }
  }
  消费者测试代码:
  publicclassTestConsumer{
  publicstaticvoidmain(String〔〕args)throwsException{
  StringconnectionStringzookeeper:localhost1:2181,localhost2:2182,localhost3:2181;
  ReferenceConfig
  ApplicationContextctxnewApplicationContext(connectionString,,Collections。singletonList(config),
  50070);
  HelloServicehelloServicectx。getService(HelloService。class);
  System。out。println(sayHello(TestBean)结果为:helloService。sayHello(newTestBean(张三,20)));
  }
  }
  接着启动生产者,然后启动消费者:
  生产者得到的日志如下:
  ZookeeperClient初始化完毕。。。。。。
  注册到注册中心,路径为:【myRPCinterfacecom。study。rpc。test。producer。HelloService
  methodsayHellometercom。study。rpc。test。producer。TestBean】
  信息为:RegistryInfo{hostnamelocalhost,ip192。168。16。7,port50071}
  启动NettyService,端口为:50071
  启动响应处理线程:Responseprocessor0
  启动响应处理线程:Responseprocessor2
  启动响应处理线程:Responseprocessor1
  接收到消息:{interfaces:interfacecom。study。rpc。test。producer。HelloService
  methodsayHellometercom。study。rpc。test。producer。TestBean,requestId:1,
  parameter:{com。study。rpc。test。producer。TestBean:{age:20,name:张三}}}
  响应给客户端:{interfaceMethodIdentify:interfacecom。study。rpc。test。producer。HelloService
  methodsayHellometercom。study。rpc。test。producer。TestBean,requestId:1,
  result:牛逼,我收到了消息:TestBean{name张三,age20}}
  消费者得到的日志为:
  ZookeeperClient初始化完毕。。。。。。
  开始建立连接:192。168。16。7,50071
  等待连接成功。。。
  启动响应处理线程:Responseprocessor1
  启动响应处理线程:Responseprocessor0
  启动响应处理线程:Responseprocessor2
  连接成功:ChannelHandlerContext(NettyClientNettyClientHandler0,
  〔id:0xb7a59701,L:192。168。16。7:58354R:192。168。16。7:50071〕)
  发送给服务端JSON为:{interfaces:interfacecom。study。rpc。test。producer。HelloService
  methodsayHellometercom。study。rpc。test。producer。TestBean,requestId:1,
  parameter:{com。study。rpc。test。producer。TestBean:{age:20,name:张三}}}
  收到一个响应:RpcResponse{result牛逼,我收到了消息:TestBean{name张三,age20},
  interfaceMethodIdentifyinterfacecom。study。rpc。test。producer。HelloService
  methodsayHellometercom。study。rpc。test。producer。TestBean,requestId1}
  sayHello(TestBean)结果为:牛逼,我收到了消息:TestBean{name张三,age20}
  总结通过完成这个RPC框架,大家应该会大致对RPC的实现原理有个感性的认识,这里总结一下特性:支持多种注册中心,可配置(虽然只实现了zookeeper,但是我们拓展是非常简单的)
  支持负载均衡当然了还有非常多的不足之处,这是无可否认的,随意写出来的框架和工业级使用的框架比较还是不一样
  我这里列举一些不完美的地方把(有兴趣的可以搞搞):
  实现序列化框架的拓展,多种序列化供用户选择
  网络请求错误处理,这里实现非常简陋,健壮性很差
  注册中心不支持故障感知和自动恢复
  调用监控,性能指标
  加油。
  END

为养育孩子,她沦落红灯区女性致贫,从生孩子开始前几天有女朋友向我请教,她已经奔三了,过年要相亲,都需要注意些什么问题。她以为我会告诉她相亲需要注意什么。但我只开玩笑似的回道:男人可以睡,别随便生孩子。为什……小白兔小学作文450字小白兔小学作文1我家养了一只小兔子,名叫小球球,它长得极其可爱,我非常喜欢。它的毛像雪一样白,像棉花一样柔软,并且油光发亮;它的眼睛像一对发光的灯泡似的炯炯有神;它的一张……我第一次走独木桥E度网专稿未经允许不得转载今天我出去玩了上人大玩了在人大走独木桥我老是很快我挺直了站好快步通过了妈妈在旁边站着教我她说好了今天再继续练……毕业毕业年复一年,秋到夏走我们在秋天时悄然相遇六十六个人组成了一个班级一个家每天和星辰作伴踏日月而来清晨的我们一起读书朗朗的书声是我们的回忆一起在课堂……当你负债累累,连亲人都瞧不起时,做好这几件事,好运会主动找你打卡美好生活很多时候,我们想把所有的鸡蛋放在一个篮子里,借钱创业,让我们的生活发生翻天覆地的变化,让我们的家庭从此过上美好幸福的生活。但谁又曾想过生意失败,我们不仅没有致富,而……一个人,喜欢你和爱你的区别打卡美好生活喜欢一个人,就像一首歌里的歌词:跟着感觉走,紧抓住梦的手,蓝天越来越近越来越温柔,心情就像风一样自由,突然发现一个完全不同的我爱一个人,不再是跟着……女人跟过的男人多了,身上就会有这4种特征,一看一个准婚恋手册如果女性和更多的男性在一起,她们将具有以下四个特点。有些女人更容易坠入爱河,但有些女人确很单纯。即使在他们三十多岁的时候,他们仍然只相爱一两次。那么,如何区分呢?……一个女人最好的气质干净温柔优雅打卡美好生活一个真正有气质的女人从不炫耀自己。她不会告诉人们她读过什么书,有多少衣服,买了什么珠宝,因为她没有自卑感。我不知道什么时候,我们总是以外表来判断一个人。男人喜……我的老公为什么结婚以后变化这么大,让我想都不敢想01hr在牵手之前,每对情侣都会期望未来美好而安心的一天,但他们不能想象的是婚姻比爱情复杂得多。婚姻和爱情是完全不同的两件事,由于日常生活、经济开支、人际关系等方面的摩擦,夫妻……患难见真情!孟晚舟与丈夫的不离不弃9月25日下午7点多,在加拿大三年的她终于回到了祖国的环抱。在下飞机的那一刻,孟晚舟的丈夫在人群中大声喊着我爱你。这一插曲也是迅速火遍互联网,让很多人赞叹,让人羡慕。她的……为什么男人喜欢和有家庭的女人发生感情?答案其实很简单女人总是相信年轻漂亮的女孩能吸引男人。事实上,当男人看女人时,长相好、身材好的女人总是更受男人青睐。然而,年轻漂亮的女孩很好,但已婚男人很容易不愿意招惹未婚女人。相反,一……父母,不止能使我们互相守护,也能互相伤害子女和父母不仅可以互相保护,也可以互相伤害,我的一个朋友,他和我一起长大,一起上学,一起去遥远的地方学习,然后一起回来工作。当她还是个孩子的时候,她总是最缺乏零食的人。她总是可……
描写同学神态的段落1、虎头虎脑眉清目秀面红耳赤白净柔嫩衣着得体眉开眼笑破涕为笑捧腹大笑笑逐颜开满面春风2、他的眉毛时而紧紧地皱起,眉宇间形成一个问号;时而愉快地舒展,像个感叹号。3、……侃红楼,林黛玉真的算美吗?林黛玉《红楼梦》读到5回,写到第三回,作为阅读的兴趣,放松下自己,想写写第一女主角林黛玉。其实,三百多年过去,林黛玉的形象放到现在并不讨喜,没有人尤其男人会以一个整……读书系列红楼梦第一回浅析(梦里依稀风铃)《红楼梦》插画甄士隐梦幻识通灵,贾雨村风尘怀闺秀所有古典章回小说都会以两句诗词起头,用于剧透内容,提醒阅者。开篇神话故事时代是由无……失去光明刘思易刘思易大家好!我是绍兴晚报的小记者:刘思易。今天我们要采访小灯桔补习班的牛津大学,他们正在举行盲人体验的游戏。向左一点,不对不对,再稍微向右一点。教室里热闹得不可开……读红楼神秘秦氏一门消亡史梦里依稀风铃字数3063《红楼梦》中最富有神秘色彩的人物之一是秦可卿,围绕着她的争议历来是最多的。也难怪,因为她身上的谜团最多,死因又跟第五回判词判曲相悖,尽……描写新年的六年级作文春节即农历新年,俗称过年,过去也叫元旦,一般从腊月二十三的祭灶到正月十五,也有的从腊月初八开,下面给大家分享描写新年的作文,一起来看看吧!描写新年的作文1盼星星,盼月亮,……伤感的中午我刚刚吃过午饭就在大庭里剔牙前台的那个美女玉臂轻摇向我打招呼你来一下来一下再近点再近点我告诉你一件事我辞工了我疑惑地问为什……小学生同学聚会作文100字汇总要说有什么才能快速增加同学之间感情的途径的话,那就非同学聚会莫属了。下面是小编采集关于同学聚会的材料:【作文一】今天我和万彦珊、洪晨洋约好一起去做蛋糕。我和万彦珊做……河坡上的放羊娃在村前的悠长的河畔那里野草闲花遍地那里羊群像朵朵洁白的云那里有翩翩起舞的蝴蝶和蜻蜓那里还有胡乱飞舞的蚊蝇那里的男女老少放肆的大笑里面夹杂着一……所有的灵感夜,是白天过后的深思,是喧嚣后的宁静,是回归自然的空灵。当黑夜最后的一抹光亮从眼角逝去,黑夜的寂静给这片空间又多填了一丝落寞之感。时光不会停留,过往,疼痛,宿命,一季又一……有关雪景的作文500字3篇有关雪景的作文1雪是冬天的使者,洁白的象征,它以自己独有的身姿装点着大地。像玉一样洁,像烟一样轻,像柳絮一样柔,从天空中纷纷扬扬的飘落,把这美丽的世界装点得银装素裹,分外……有关让心飞扬的叙事文天真的脸庞已被成熟的笑容俺去,失控的情绪已被冷静的心态遮住。我已经长大了!然而翻开旧时的相片,心中却如平静的湖水产生波澜。看,相片中的我背着大山,头仰天空,正在操控风筝,……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网