透过ApacheThrift服务端启动读懂Netty和线程池的使用
1. 前言
Thrift是Facebook贡献给apache的rpc框架,但是这款框架的java版本在公司内部并不是那么受待见,因为其性能相比C++版本差了很多,但是后续基于netty重写了以后性能得到了极大的提升,相比于C++版本已经差距不大了。为此取了个新的名字Nifty = Netty + Thrift。
如果你使用过thrift的话,基本都会使用自动生成的代码,那真的是没法看,即使定义一个简单的类都会生成巨多的代码,把read,write方法全部写到里面去了。总之早期的thrfit各方面都似乎不那么友好。后面架构进行了升级,提供了新的swift库,注意这个不是ios的swift,从而生成的java类和普通的java类基本一致,无非多了点注解,而序列化反序列化也都移到了相应的包中,从而使得我们的代码非常简洁易懂。
其实这款rpc框架的性能是非常不错的,早几年性能是好过grpc的,目前小米还是在用的。这款框架很轻量,即不提供服务治理的功能。如果公司规模不大急需做功能,暂时没精力去做服务治理的话可能还是会选择dubbo等带服务治理功能的rpc框架。但是恰恰是thrift不提供服务治理,这样公司可以自己去定义服务治理的功能。
目前不管是书籍还是博客等关于thrift的都是少之又少,最近突然爱学习了,所以打算写一系列thrift相关的博客,关于使用基本不会介绍的过多,因为基本使用也就十几行代码,主要是介绍内部处理逻辑。具体的包括Thrift框架分析,netty框架分析,分布式服务治理等三个方面。
为了方便后续统称为Thrift而不是Nifty,因为很多代码还是沿用的Thrift。2. Thrfit服务端创建与核心组件介绍EchoServiceImpl service = new EchoServiceImpl(); ThriftCodecManager manager = new ThriftCodecManager(); ThriftServiceProcessor processor = new ThriftServiceProcessor(manager, ImmutableList.of(), service); ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8081)); server.start();
其中EchoServiceImpl是使用swift代码生成器生成的接口的实现类,有个echo方法,简单说下这几个组件。ThriftCodecManager:编解码器的管理类,会将各类编解码器,比如StringCodec添加到缓存中。ThriftServiceProcessor:thrift服务处理器,服务端收到数据后,最终将由这个类来进行处理,可以理解为最核心的类了。ThriftServer: thrift服务器,主要用来设置参。启动服务,说具体点就是设置好netty的一些处理器等参数,然后启动netty服务。ThriftServerConfig: Thrfit服务启动的一些配置类,包括了端口号,线程池,线程数量等。3. 服务端启动流程极简介绍
从上面的组件分析是不是能猜到一点thrift的内部处理流程。简单两句话就是,创建各类自定义处理器handler,添加到netty的处理器集合中,然后启动netty服务。当收到客户端发来的数据后,交由特定处理器进行数据的处理,根据协议和编解码器从buffer中进行数据的解析和转换,最终得到类名,方法的参数和方法名等(各类信息都能解析到);从ThriftServicProcessor查到Method,传入方法参数,反射执行得到结果,然后将结果通过netty响应给客户端。4. 服务端启动全解析
先从ThriftServer 创建和启动来看,了解总体的流程,后续再回过头来看各个组件的处理流程。ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8081)); public ThriftServer(NiftyProcessor processor, ThriftServerConfig config){ this(processor, config, new NiftyTimer("thrift")); }
继续往下,追到最终的构造方法public ThriftServer( final NiftyProcessor processor, ThriftServerConfig config, @ThriftServerTimer Timer timer, Map availableFrameCodecFactories, Map availableProtocolFactories, @ThriftServerWorkerExecutor Map availableWorkerExecutors, NiftySecurityFactoryHolder securityFactoryHolder){ NiftyProcessorFactory processorFactory = new NiftyProcessorFactory(){ @Override public NiftyProcessor getProcessor(TTransport transport) { return processor; } }; String transportName = config.getTransportName(); String protocolName = config.getProtocolName(); checkState(availableFrameCodecFactories.containsKey(transportName), "No available server transport named " + transportName); checkState(availableProtocolFactories.containsKey(protocolName), "No available server protocol named " + protocolName); configuredPort = config.getPort(); workerExecutor = config.getOrBuildWorkerExecutor(availableWorkerExecutors); acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build()); acceptorThreads = config.getAcceptorThreadCount(); ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build()); ioThreads = config.getIoThreadCount(); serverChannelFactory = new NioServerSocketChannelFactory(new NioServerBossPool(acceptorExecutor, acceptorThreads, ThreadNameDeterminer.CURRENT), new NioWorkerPool(ioExecutor, ioThreads, ThreadNameDeterminer.CURRENT)); ThriftServerDef thriftServerDef = ThriftServerDef.newBuilder() .name("thrift") .listen(configuredPort) .limitFrameSizeTo((int) config.getMaxFrameSize().toBytes()) .clientIdleTimeout(config.getIdleConnectionTimeout()) .withProcessorFactory(processorFactory) .limitConnectionsTo(config.getConnectionLimit()) .limitQueuedResponsesPerConnection(config.getMaxQueuedResponsesPerConnection()) .thriftFrameCodecFactory(availableFrameCodecFactories.get(transportName)) .protocol(availableProtocolFactories.get(protocolName)) .withSecurityFactory(securityFactoryHolder.niftySecurityFactory) .using(workerExecutor) .taskTimeout(config.getTaskExpirationTimeout()) .withSecurityFactory(config.getSecurityFactory()) .withHeader(config.getHeader()) .withServerHandler(config.getNiftyServerHandler()) .build(); NettyServerConfigBuilder nettyServerConfigBuilder = NettyServerConfig.newBuilder(); nettyServerConfigBuilder.getServerSocketChannelConfig().setBacklog(config.getAcceptBacklog()); nettyServerConfigBuilder.setBossThreadCount(config.getAcceptorThreadCount()); nettyServerConfigBuilder.setWorkerThreadCount(config.getIoThreadCount()); nettyServerConfigBuilder.setTimer(timer); NettyServerConfig nettyServerConfig = nettyServerConfigBuilder.build(); transport = new NettyServerTransport(thriftServerDef, nettyServerConfig, allChannels); }
构造方法简单来说主要做了两件事:创建和获取netty需要的连接线程池和io处理线程池以及数量,从而构建netty服务组件NioServerSocketChannelFactory ;构建netty服务的配置类nettyServerConfig, thrift服务的配置类ThriftServerDef
我们再来看下细节点的东西,思考一些问题,如果不感兴趣可以跳过。
工作线程池workerExecutor 的创建:workerExecutor = config.getOrBuildWorkerExecutor(availableWorkerExecutors);
默认传进来的availableWorkerExecutors 是空的,所以最终是构建一个新的线程池,
最终调用的方法是makeDefaultWorkerExecutor ,下面的代码稍微简化了一点。默认得到的就是个无界的队列;如果你需要构建有限容量队列的线程池,可以在创建config后调用setMaxQueuedRequests 来设置队列容量超出队列容量后将执行线程池拒绝策略(throw RejectedExecutionException )默认核心线程数和最大线程数是一样的,数值为200;使用guava提供的ThreadFactoryBuilder来构建线程工厂,主要是取个容易理解的名字;在thrift中大量使用了guava 提供的工具类。private ExecutorService makeDefaultWorkerExecutor(){ BlockingQueue queue = new LinkedBlockingQueue<>(); return new ThreadPoolExecutor(getWorkerThreads(), getWorkerThreads(), 0L, TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder().setNameFormat("thrift-worker-%s").build(), new ThreadPoolExecutor.AbortPolicy()); }
netty的连接线程池和io线程池创建acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build()); acceptorThreads = config.getAcceptorThreadCount(); ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build()); ioThreads = config.getIoThreadCount(); 该线程池创建的方法是netty提供的,看名字是不是像无界队列的线程池呢?连接数量的限定是1,io线程数量的限定默认是电脑核数*2,但都是可以在配置类中指定的。
关于netty配置类NettyServerConfig的构建nettyServerConfigBuilder.build() , 在之前设置了连接线程的线程数和io线程池线程数以及定时器timer,调用build后如下:public NettyServerConfig build() { Timer timer = getTimer(); ExecutorService bossExecutor = getBossExecutor(); int bossThreadCount = getBossThreadCount(); ExecutorService workerExecutor = getWorkerExecutor(); int workerThreadCount = getWorkerThreadCount(); return new NettyServerConfig( getBootstrapOptions(), timer != null ? timer : new NiftyTimer(threadNamePattern("")), bossExecutor != null ? bossExecutor : buildDefaultBossExecutor(), bossThreadCount, workerExecutor != null ? workerExecutor : buildDefaultWorkerExecutor(), workerThreadCount ); } private ExecutorService buildDefaultBossExecutor() { return newCachedThreadPool(renamingThreadFactory(threadNamePattern("-boss-%s"))); } private ExecutorService buildDefaultWorkerExecutor() { return newCachedThreadPool(renamingThreadFactory(threadNamePattern("-worker-%s"))); }
由于没有设置两个线程池,所以会设置默认的线程池,注意这里一个是Boss线程池一个是Worker线程池。 其中的timer在后续构建netty处理器的时候会多次用到。
allChannels 是一个netty 提供的channelGroup:private final DefaultChannelGroup allChannels = new DefaultChannelGroup();
在介绍后续流程前,先了解下netty服务端创建步骤,因为这里用的是netty3,和我们比较熟悉的netty4差距有点大,可以对比着看下。ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), ); ServerBootstrap bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("handler1", new Handler1()); return pipeline; } }); Channel channel = bootstrap.bind(new InetSocketAddress(8081));
构建NettyServerTransport 的时候,在构造方法里面就是进行netty的一些设置。transport = new NettyServerTransport(thriftServerDef, nettyServerConfig, allChannels);
在这里面主要是进行一些对象成员变量的设置,最后也是最重要的就是构建ChannelPipelineFactory ,在其中设置各种处理器,大部分继承自SimpleChannelUpstreamHandler ,少部分继承自ChannelDownstreamHandler ,可以类比netty4 的ChannelInboundHandler,ChannelOutboundHandler 。this.pipelineFactory = new ChannelPipelineFactory(){ @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline cp = Channels.pipeline(); // 设置处理器handler return cp; } }
我们来看有哪些处理器。ConnectionLimiter : 连接限流器, 创建的时候需要指定最大连接数,以及初始值为0的一个计数器;每次建立连接的时候计数器数值 + 1,关闭的时候数值 - 1;当连接数达到上限,则关闭通道,即channel。ChannelStatistics : 传入allChannels来构建对象,内部持有一个channelCount = 0 ,来统计建立通道channel数, 每次建立连接接受到数据的时候channelCount + 1 ,提供了get方法来获取channelCount ;同时将channel 加入到allChannels 中。编解码处理器DefaultThriftFrameCodec ,每次收到的数据和传出的数据都需要进行一次编码或者解码。连接的上下文处理器ConnectionContextHandler ,在建立连接的时候创建NiftyConnectionContext ,连接的上下文环境,包含了removeAddress和属性map,绑定到ChannelHandlerContext ,即ctx.setAttachment(context); 。netty 提供的关于超时处理器IdleStateHandler 和IdleDisconnectHandler 。NiftyDispatcher ,这个处理器最为核心,收到buffer经过解码后数据就传到了该处理器,该处理器会对数据进行一系列的处理和方法的调用等。异常事件处理器NiftyExceptionLogger ,重写了log 方法,如果出现异常事件,该处理器会打印响应的异常日志。
来一览处理过程吧public NettyServerTransport(final ThriftServerDef def, final NettyServerConfig nettyServerConfig, final ChannelGroup allChannels){ this.def = def; this.nettyServerConfig = nettyServerConfig; this.port = def.getServerPort(); this.allChannels = allChannels; final ConnectionLimiter connectionLimiter = new ConnectionLimiter(def.getMaxConnections()); this.channelStatistics = new ChannelStatistics(allChannels); this.pipelineFactory = new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline cp = Channels.pipeline(); TProtocolFactory inputProtocolFactory = def.getDuplexProtocolFactory().getInputProtocolFactory(); NiftySecurityHandlers securityHandlers = def.getSecurityFactory().getSecurityHandlers(def, nettyServerConfig); cp.addLast("connectionContext", new ConnectionContextHandler()); cp.addLast("connectionLimiter", connectionLimiter); cp.addLast(ChannelStatistics.NAME, channelStatistics); cp.addLast("frameCodec", def.getThriftFrameCodecFactory().create(def.getMaxFrameSize(), inputProtocolFactory)); if (def.getClientIdleTimeout() != null) { // Add handlers to detect idle client connections and disconnect them cp.addLast("idleTimeoutHandler", new IdleStateHandler(nettyServerConfig.getTimer(), def.getClientIdleTimeout().toMillis(), NO_WRITER_IDLE_TIMEOUT, NO_ALL_IDLE_TIMEOUT, TimeUnit.MILLISECONDS)); cp.addLast("idleDisconnectHandler", new IdleDisconnectHandler()); } cp.addLast("dispatcher", new NiftyDispatcher(def, nettyServerConfig.getTimer())); cp.addLast("exceptionLogger", new NiftyExceptionLogger()); return cp; } }; }
我们再回到server.start() ,看下ThriftServer如何启动的
ThriftServer : 初始状态是NOT_STARTED public synchronized ThriftServer start(){ checkState(state != State.CLOSED, "Thrift server is closed"); if (state == State.NOT_STARTED) { transport.start(serverChannelFactory); state = State.RUNNING; } return this; }
NettyServerTransport: 标准的netty服务端创建过程,其中pipelineFactory 就是在前面NettyServerTransport 构造方法中所创建的。public void start(ServerChannelFactory serverChannelFactory){ bootstrap = new ServerBootstrap(serverChannelFactory); bootstrap.setOptions(nettyServerConfig.getBootstrapOptions()); bootstrap.setPipelineFactory(pipelineFactory); serverChannel = bootstrap.bind(new InetSocketAddress(port)); // ... }
到这里Thrift 的服务端启动就介绍完了,下一部分将会介绍服务端接受数据,处理数据和响应结果的流程。
28国联合调查,倪光南多次呼吁,美国芯片巨头美梦破碎去年九月份,美国芯片巨头英伟达放出消息,将豪掷400亿美元,从日本软银集团手中收购ARM公司,引起了国际市场的强烈反响。这一方面是因为400亿美元的天价,创下了全球半导体市场史上最
华为地位飙升,欧洲多国解禁华为,美国又会做出什么疯狂的举动?华为老总任正非在一次采访做出了一项重大的全新决定,那就是把华为的核心科技毫无保留地上交给国家。为的就是能有朝一日带领其他国产企业打破美国的封锁,做自己真正的主人。说到这里,不免让人
今日头条在你心中代表什么?充实。在今日头条可看新闻,不出门便知天下事。可以学习你需要的知识,而且都是免费的。可以交朋友,相互学习。可以领金币,玩玩。如此等等,不一而足。所以,有了今日头条,每天过的很充实。可
商业保险是不是都是骗人的?如果是正规商业保险公司承保的保险,一般都不是骗局。但是大家要注意,保险销售人员可能会出于自身利益的角度考虑,对有些事情表述不清,让大家有一部分误解。真正等到出险了,由于符合保险的免
如何对笔记本电脑进行真正有效的散热?笔记本电脑不同于台式机,它的机箱结构是封闭的,也就是说在使用的时候很难像用台式机那样拆开一面挡板直接怼风扇或者对它的内部进行改造,那么散热就是笔记本电脑使用所要面临的一个大问题,如
你最喜欢的锁屏壁纸是什么?每个人的风格不一样,所选的锁屏壁纸也迥异。我以前喜欢一些风景类的锁屏壁纸,并且还特意去买了个单反去玩的时候会拍一些风景照当作壁纸,下面发几张,拍的不好大家别笑。以上这组是在广州与厦
告别钴焦虑,蜂巢能源引领无钴时代?随着新能源汽车产业快速发展,对动力电池的需求愈发急切,无论是降钴增镍还是无钴化,都成为锂电行业备受关注的话题。特斯拉掀起的无钴风云没了下文,2021成都车展上,蜂巢能源的无钴电池包
三星Galaxyzfilp3系列的出现,意味着折叠屏手机获得新生?今年折叠屏手机确实都有进步,当然三星的翻盖折叠屏手机看起来今年定价会创新低?Galaxyz系列手机,应该是未来折叠屏手机的发展思路?可能是这样吧!但如果明年主要手机厂商不推广折叠屏
小东西也有大本领,向日葵开机棒,远程操作不求人记得上大学的时候,每天下课回到宿舍饭也不吃,就是为了能够和室友一起打游戏,有时候跑着回去也是为了能够早一些上线进入到游戏。那个时候我就在想,如果能在课堂上就把游戏上线了,那么是不是
AppleWatchSeries7预计将采用新的表面设计以利用更大显示屏的优势随着即将推出的AppleWatchSeries7尺寸的增加,据传将采用更大的41毫米和45毫米尺寸,苹果计划包括新的手表表面,以利用更大的显示屏。彭博社的马克古尔曼在其最新出版的P
PHP时间函数总结1checkdate()验证格利高里日期即日期是否存在。checkdate(month,day,year)month必需。一个从1到12的数字,规定月。day必需。一个从1到31的