创建客户端与远程调用FramedClientConnectorconnectornewFramedClientConnector(newInetSocketAddress(8081));ThriftClientManagermanagernewThriftClientManager(newThriftCodecManager(),newNiftyClient(),ImmutableSet。of());EchoService。Ifaceclientmanager。createClient(connector,EchoService。Iface。class)。get();Stringanswerclient。echo(abc,newOperationActivityRequest()。setKeywords(htae));System。out。println(answer); 创建客户端获取代理类EchoService。Iface,之后便可以像本地方法使用一样来进行远程调用;从代码中看到核心组件就是FramedClientConnector:与连接相关,包括协议和通道等;NiftyClient:与客户端相关ThriftClientManager:与客户端管理相关重要组件快速了解 这部分主要简单看下几个组件的作用,细节的东西等分析创建客户端的时候再来看FramedClientConnector 应该可以叫做客户端连接器吧,内部主要是持有地址SocketAddress和协议工厂TDuplexProtocolFactory。提供了三个重要的方法newThriftClientChannel:获取通道FramedClientChannel,通过该通道可以将请求request写出;newChannelPipelineFactory:获取netty的pipeline,用来后续创建netty客户端的;connect:netty客户端与服务端进行连接顶层接口如下:publicinterfaceNiftyClientConnectorTextendsRequestChannel{ChannelFutureconnect(ClientBootstrapbootstrap);TnewThriftClientChannel(Channelchannel,NettyClientConfigclientConfig);ChannelPipelineFactorynewChannelPipelineFactory(intmaxFrameSize,NettyClientConfigclientConfig);} 可以看到后面两个方法都是和netty相关的,第一个方法其实也主要是对netty的channel进行封装然后进行请求的写出。此外FramedClientChannel继承自AbstractClientChannel,该类作为netty处理器,非常重要,在后面会详细说到。NiftyClient 从名字来看就是Nifty客户端相关的类,其内部属性主要就是netty相关的参数,boss线程池,worker线程池,channel线程组ChannelGroup,netty配置类NettyClientConfig,以及NioClientSocketChannelFactory,先来看构造方法publicNiftyClient(){this(NettyClientConfig。newBuilder()。build());}publicNiftyClient(NettyClientConfignettyClientConfig){this。nettyClientConfignettyClientConfig;this。timernettyClientConfig。getTimer();this。bossExecutornettyClientConfig。getBossExecutor();this。workerExecutornettyClientConfig。getWorkerExecutor();this。defaultSocksProxyAddressnettyClientConfig。getDefaultSocksProxyAddress();intbossThreadCountnettyClientConfig。getBossThreadCount();intworkerThreadCountnettyClientConfig。getWorkerThreadCount();NioWorkerPoolworkerPoolnewNioWorkerPool(workerExecutor,workerThreadCount,ThreadNameDeterminer。CURRENT);NioClientBossPoolbossPoolnewNioClientBossPool(bossExecutor,bossThreadCount,timer,ThreadNameDeterminer。CURRENT);this。channelFactorynewNioClientSocketChannelFactory(bossPool,workerPool);} 从有参的构造方法看到,主要是从netty配置类获取netty相关参数赋值给对象的属性。接着创建netty客户端重要组件。 如果是空参的方法,会使用默认的netty配置参数,来看一眼build方法做了什么。publicNettyClientConfigbuild(){TimertimergetTimer();ExecutorServicebossExecutorgetBossExecutor();intbossThreadCountgetBossThreadCount();ExecutorServiceworkerExecutorgetWorkerExecutor();intworkerThreadCountgetWorkerThreadCount();returnnewNettyClientConfig(getBootstrapOptions(),defaultSocksProxyAddress,timer!null?timer:newNiftyTimer(threadNamePattern()),bossExecutor!null?bossExecutor:buildDefaultBossExecutor(),bossThreadCount,workerExecutor!null?workerExecutor:buildDefaultWorkerExecutor(),workerThreadCount);} 最开始就获取netty相关的5个参数,boss线程池线程数量为1,worker线程池线程数量为核数2,其它都是null。所以构建NettyClientConfig的时候会重新构建timer和两个线程池。使用的Executors工具类的无限数量线程的线程池方法newCachedThreadPool,使用了自定义的线程池工厂(guava提供ThreadFactoryBuilder),主要是取回个合适的名字。privateExecutorServicebuildDefaultBossExecutor(){returnnewCachedThreadPool(renamingDaemonThreadFactory(threadNamePattern(bosss)));}privateExecutorServicebuildDefaultWorkerExecutor(){returnnewCachedThreadPool(renamingDaemonThreadFactory(threadNamePattern(workers)));}privateThreadFactoryrenamingDaemonThreadFactory(StringnameFormat){returnnewThreadFactoryBuilder()。setNameFormat(nameFormat)。setDaemon(true)。build();} NiftyClient主要是提供了一个功能获取连接,包括异步连接和同步连接。也就是上面说过的FramedClientChannel,其实还是由FramedClientConnector创建的,只是放到了Future中。ThriftClientManager 名字上看就是客户端管理器嘛,内部主要是持有编解码管理器和NiftyClient两个属性。privatefinalThriftCodecManagercodecManager;privatefinalNiftyClientniftyClient; 在构造的时候就会设置好,大部分时候只需要写成newThriftClientManager()即可,默认会创建这两个对象进行设置。其实该类主要就是提供createClient方法来创建客户端(代理对象),程序拿到代理对象后就可以很方便的去进行远程方法调用了。客户端创建和数据发送流程分析 我们从manager。createClient(connector,EchoService。Iface。class)点进去开始看,设置完一些超时的默认参数,稍微简化下得到publicT,CextendsNiftyClientChannelListenableFutureTcreateClient(finalNiftyClientConnectorCconnector,finalClassTtype,NullablefinalDurationconnectTimeout,NullablefinalDurationreceiveTimeout,NullablefinalDurationreadTimeout,NullablefinalDurationwriteTimeout,finalintmaxFrameSize,NullablefinalStringclientName,finalListlt;?extendsThriftClientEventHandlereventHandlers,NullableInetSocketAddresssocksProxy){(1)获取FutureFramerClientChannelfinalListenableFutureCconnectFutureniftyClient。connectAsync(connector,connectTimeout,receiveTimeout,readTimeout,writeTimeout,maxFrameSize,socksProxy);(2)转换后获取客户端代理对象ListenableFutureTclientFutureFutures。transform(connectFuture,newFunctionC,T(){OverridepublicTapply(NotNullCchannel){StringnameStrings。isNullOrEmpty(clientName)?connector。toString():clientName;returncreateClient(channel,type,name,eventHandlers);}},Runnable::run);returnclientFuture;} 主要是两个步骤,获取FramerClientChannel后接着获取客户端代理对象。 我们先看第一部分:如何获取FutureClientBootstrapbootstrapnewClientBootstrap(channelFactory);bootstrap。setOptions(nettyClientConfig。getBootstrapOptions());bootstrap。setPipelineFactory(clientChannelConnector。newChannelPipelineFactory(maxFrameSize,nettyClientConfig));ChannelFuturenettyChannelFutureclientChannelConnector。connect(bootstrap); 最开始这部分就是netty相关组件的设置,我们就看下clientChannelConnector(FrameClientConnector)提供的两个方法NiftyClient:publicChannelFutureconnect(ClientBootstrapbootstrap){returnbootstrap。connect(address);}OverridepublicChannelPipelineFactorynewChannelPipelineFactory(finalintmaxFrameSize,finalNettyClientConfigclientConfig){returnnewChannelPipelineFactory(){OverridepublicChannelPipelinegetPipeline()throwsException{ChannelPipelinecpChannels。pipeline();TimeoutHandler。addToPipeline(cp);cp。addLast(frameEncoder,newLengthFieldPrepender(LENGTHFIELDLENGTH));cp。addLast(frameDecoder,newLengthFieldBasedFrameDecoder(maxFrameSize,LENGTHFIELDOFFSET,LENGTHFIELDLENGTH,LENGTHADJUSTMENT,INITIALBYTESTOSTRIP));cp。addLast(clientMessage,newClientMessageHandler());if(clientHeader!null){clientHeader。createHandler(cp);}returncp;}};} connect方法自然不用说了,netty提供的;newChannelPipelineFactory则是构建netty的pipelie工厂,内部会添加各种处理器,暂时不去过多讲解了。 继续往下看nettyChannelFuture。addListener(newChannelFutureListener(){OverridepublicvoidoperationComplete(ChannelFuturefuture)throwsException{Channelchannelfuture。getChannel();if(channel!nullchannel。isOpen()){allChannels。add(channel);}}});returnnewTNiftyFuture(clientChannelConnector,receiveTimeout,readTimeout,sendTimeout,nettyChannelFuture); 首先是添加监听器,连接成功后将channel加入到channelGroup中;接着创建TNiftyFuture,这一步很关键,点进去看privateclassTNiftyFutureTextendsNiftyClientChannelextendsAbstractFutureT{privateTNiftyFuture(finalNiftyClientConnectorTclientChannelConnector,NullablefinalDurationreceiveTimeout,NullablefinalDurationreadTimeout,NullablefinalDurationsendTimeout,finalChannelFuturechannelFuture){channelFuture。addListener(newChannelFutureListener(){OverridepublicvoidoperationComplete(ChannelFuturefuture)throwsException{if(future。isSuccess()){ChannelnettyChannelfuture。getChannel();TchannelclientChannelConnector。newThriftClientChannel(nettyChannel,nettyClientConfig);channel。setReceiveTimeout(receiveTimeout);channel。setReadTimeout(readTimeout);channel。setSendTimeout(sendTimeout);set(channel);}}});}} 在构造方法中其实也是添加了个监听器,在建立连接后,获取通道,通过该通道创建ThriftChannel,在这里是FramedClientChannel,并且设置给该future。 来看一眼clientChannelConnector提供的最后一个方法newThriftClientChannelpublicFramedClientChannelnewThriftClientChannel(ChannelnettyChannel,NettyClientConfigclientConfig){FramedClientChannelchannelnewFramedClientChannel(nettyChannel,clientConfig。getTimer(),getProtocolFactory());ChannelPipelinecpnettyChannel。getPipeline();cp。addLast(thriftHandler,channel);returnchannel;} 构建传入nettyChannel,构建FramedClientChannel,同时添加到pipeline中去,所以FramedClientChannel肯定也是一个处理器了。 FramedClientChannel很重要,涉及到通道的一些操作,比如收取请求进行处理,发送请求等,继承自AbstractClientChannel(处理器)。 现在得到了thriftChannel,即步骤(1)已经介绍完了;再来看(2)创建客户端代理对象。(2)转换后获取客户端代理对象ListenableFutureTclientFutureFutures。transform(connectFuture,newFunctionC,T(){OverridepublicTapply(NotNullCchannel){StringnameStrings。isNullOrEmpty(clientName)?connector。toString():clientName;returncreateClient(channel,type,name,eventHandlers);}},Runnable::run); transform方法是guava提供进行future转换的,将Future〔I〕转换为Future〔U〕,这里就是将FrameClientChannel转换为EchoService。Iface(实际上是代理对象)。来看createClient方法: ThriftClientManager:privatefinalLoadingCacheTypeAndName,ThriftClientMetadataclientMetadataCacheCacheBuilder。newBuilder()。build(newCacheLoaderTypeAndName,ThriftClientMetadata(){OverridepublicThriftClientMetadataload(TypeAndNametypeAndName)throwsException{returnnewThriftClientMetadata(typeAndName。getType(),typeAndName。getName(),codecManager);}});publicTTcreateClient(RequestChannelchannel,ClassTtype,Stringname,Listlt;?extendsThriftClientEventHandlereventHandlers){ThriftClientMetadataclientMetadataclientMetadataCache。getUnchecked(newTypeAndName(type,name));StringclientDescriptionclientMetadata。getName()channel。toString();ThriftInvocationHandlerhandlernewThriftInvocationHandler(clientDescription,channel,clientMetadata。getMethodHandlers(),ImmutableList。ThriftClientEventHandlerbuilder()。addAll(globalEventHandlers)。addAll(eventHandlers)。build());returntype。cast(Proxy。newProxyInstance(type。getClassLoader(),newClasslt;?〔〕{type,Closeable。class},handler));} clientMetadataCache是使用guavacache创建的本地缓存,key为TypeAndName,value为ThriftClientMetadata。ThriftClientMetadata和ThriftServiceMetadata是很类似的,其内部会持有ThriftServiceMetadata,type和name,以及Method和方法处理器的映射privatefinalMapMethod,ThriftMethodHandlermethodHandlers;,这些都是在创建对象的时候初始化好的。 剩余的部分是使用动态代理创建代理对象,所以可以猜到ThriftInvocationHandler是继承自InvocationHandler。方法调用privatestaticclassThriftInvocationHandlerimplementsInvocationHandler{privatestaticfinalObject〔〕NOARGSnewObject〔0〕;privatefinalStringclientDescription;privatefinalRequestChannelchannel;privatefinalMapMethod,ThriftMethodHandlermethods;privatestaticfinalAtomicIntegersequenceIdCursornewAtomicInteger(1);privatefinalListlt;?extendsThriftClientEventHandlereventHandlers;privateThriftInvocationHandler(StringclientDescription,RequestChannelchannel,MapMethod,ThriftMethodHandlermethods,Listlt;?extendsThriftClientEventHandlereventHandlers){this。clientDescriptionclientDescription;this。channelchannel;this。methodsmethods;this。eventHandlerseventHandlers;}publicRequestChannelgetChannel(){returnchannel;}OverridepublicObjectinvoke(Objectproxy,Methodmethod,Object〔〕args)throwsThrowable{intsequenceIdsequenceIdCursor。getAndIncrement();TChannelBufferInputTransportinputTransportnewTChannelBufferInputTransport();TChannelBufferOutputTransportoutputTransportnewTChannelBufferOutputTransport();TTransportPairtransportPairfromSeparateTransports(inputTransport,outputTransport);TProtocolPairprotocolPairchannel。getProtocolFactory()。getProtocolPair(transportPair);TProtocolinputProtocolprotocolPair。getInputProtocol();TProtocoloutputProtocolprotocolPair。getOutputProtocol();ThriftMethodHandlermethodHandlermethods。get(method);NiftyClientChannelniftyClientChannel(NiftyClientChannel)channel;SocketAddressremoteAddressniftyClientChannel。getNettyChannel()。getRemoteAddress();ClientRequestContextrequestContextnewNiftyClientRequestContext(inputProtocol,outputProtocol,channel,remoteAddress);ClientContextChaincontextnewClientContextChain(eventHandlers,methodHandler。getQualifiedName(),requestContext);returnmethodHandler。invoke(channel,inputTransportinputTransport,outputTransport,inputProtocol,outputProtocol,sequenceId,context,args);}} 当进行方法调用的时候会走到invoke方法,这个知道java反射的都知道。invoke方法主要是3步创建输入输出协议,这里是TBinaryProtocal,内部持有的transport分别是TChannelBufferInputTransport和TChannelBufferOutputTransport,可以理解为传输层组件,内部持有ChannelBuffer来存放数据;根据Method获得ThriftMethodHandler;获取地址,调用ThriftMethodHandler。invoke方法; 从methodHandler。invoke继续走下去,发现有同步调用和异步调用,我们这里走的会是同步调用。privateObjectsynchronousInvoke(RequestChannelchannel,TChannelBufferInputTransportinputTransport,TChannelBufferOutputTransportoutputTransport,TProtocolinputProtocol,TProtocoloutputProtocol,intsequenceId,ClientContextChaincontextChain,Object〔〕args)throwsException{Objectresultsnull;writerequestoutputTransport。resetOutputBuffer();writeArguments(outputProtocol,sequenceId,args);ChannelBufferrequestBufferoutputTransport。getOutputBuffer();ClientMessageclientMessagenewClientMessage(methodMetadata。getServiceFullName(),sequenceId,requestBuffer,name);ChannelBufferresponseBufferSyncClientHelpers。sendSynchronousTwoWayMessage(channel,clientMessage);readresultsinputTransport。setInputBuffer(responseBuffer);waitForResponse(inputProtocol,sequenceId);resultsreadResponse(inputProtocol);returnresults;} 首先调用outputTransport。resetOutputBuffer();来清空channelBuffer,以及重置一些指针。 使用writeArguments(outputProtocol,sequenceId,args);来循环将方法参数写到outputProtocol中持有的outputTransport中,即内部持有的ChannelBuffer中。这部分之前写过类似的服务端读取数据,参考服务端读取数据 此时outputTransport。outBuffer中已经有数据了,通过outputTransport。getOutputBuffer();来获取数据,并基于此构建ClientMessage。 接着使用SyncClientHelpers。sendSynchronousTwoWayMessage(channel,clientMessage)来讲数据发送到服务端,获取服务端的结果responseBuffer。 最后解析服务端响应的buffer并返回结果,这部分和写数据类似也不多讲了,参数服务端读取数据 我们主要来看发送数据SyncClientHelpers。sendSynchronousTwoWayMessage(channel,clientMessage);这部分实现 SyncClientHelpers:publicstaticChannelBuffersendSynchronousTwoWayMessage(RequestChannelchannel,finalClientMessagerequest)throwsTException,InterruptedException{finalChannelBuffer〔〕responseHoldernewChannelBuffer〔1〕;finalTException〔〕exceptionHoldernewTException〔1〕;finalCountDownLatchlatchnewCountDownLatch(1);responseHolder〔0〕null;exceptionHolder〔0〕null;channel。sendAsynchronousRequest(request,false,newRequestChannel。Listener(){OverridepublicvoidonRequestSent(){}OverridepublicvoidonResponseReceived(ChannelBufferresponse){responseHolder〔0〕response;latch。countDown();}OverridepublicvoidonChannelError(TExceptione){exceptionHolder〔0〕e;latch。countDown();}});latch。await();if(exceptionHolder〔0〕!null){throwexceptionHolder〔0〕;}returnresponseHolder〔0〕;} 通过channel来发送请求,同时需要给个监听器。注意到onResponseReceived方法会设置响应结果哦,同时调用latch。countDown(),此时await停止阻塞,方法返回结果。主要还是来看sendAsynchronousRequest的实现。 FramedClientChannel:publicvoidsendAsynchronousRequest(finalClientMessagemessage,finalbooleanoneway,finalListenerlistener)throwsTException{finalintsequenceIdmessage。getSeqid();获取消息id构建消息RequestrequestnewRequest(listener);requestMap。put(sequenceId,request);后续收到响应后会根据消息id移除请求finalRequestrequestmakeRequest(sequenceId,listener,oneway);发送消息,调用的是FramedClientChannel的writeRequest,之前已经说过,调用netty。channle的write方法ChannelFuturesendFuturewriteRequest(message);queueSendTimeout(request);} 到这消息就发送完了,也许你可能好奇,不是说sendSynchronousTwoWayMessage会等待消息的返回结果吗?是的这里的latch用的非常巧妙,解释全在处理器AbstractClientChannel(FramedClientChannel的父类)中OverridepublicvoidmessageReceived(ChannelHandlerContextctx,MessageEvente){ChannelBufferresponseextractResponse(e。getMessage());intsequenceIdextractSequenceId(response);onResponseReceived(sequenceId,response);} 首先从服务端响应中抽取ChannelBuffer;再获取序列号;继续看onResponseReceivedprivatevoidonResponseReceived(intsequenceId,finalChannelBufferresponse){finalRequestrequestrequestMap。remove(sequenceId);executorService。execute(newRunnable(){Overridepublicvoidrun(){fireResponseReceivedCallback(request。getListener(),response);}});} 根据序号从之前map中获取之前设置的Request(内部持有Listener),然后使用线程异步执行任务;继续跟下去privatevoidfireResponseReceivedCallback(Listenerlistener,ChannelBufferresponse){listener。onResponseReceived(response);} 看到了这里执行了监听器listener。onResponseReceived,此时SyncClientHelpers。sendSynchronousTwoWayMessage就终止阻塞返回结果了。 到这里进行远程方法调用的,发送请求获取结果,解析结果整个流程就介绍完了。