范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

透过ApacheThrift服务端启动读懂Netty和线程池的使用

  1. 前言
  Thrift是Facebook贡献给apache的rpc框架,但是这款框架的java版本在公司内部并不是那么受待见,因为其性能相比C++版本差了很多,但是后续基于netty重写了以后性能得到了极大的提升,相比于C++版本已经差距不大了。为此取了个新的名字Nifty = Netty + Thrift。
  如果你使用过thrift的话,基本都会使用自动生成的代码,那真的是没法看,即使定义一个简单的类都会生成巨多的代码,把read,write方法全部写到里面去了。总之早期的thrfit各方面都似乎不那么友好。后面架构进行了升级,提供了新的swift库,注意这个不是ios的swift,从而生成的java类和普通的java类基本一致,无非多了点注解,而序列化反序列化也都移到了相应的包中,从而使得我们的代码非常简洁易懂。
  其实这款rpc框架的性能是非常不错的,早几年性能是好过grpc的,目前小米还是在用的。这款框架很轻量,即不提供服务治理的功能。如果公司规模不大急需做功能,暂时没精力去做服务治理的话可能还是会选择dubbo等带服务治理功能的rpc框架。但是恰恰是thrift不提供服务治理,这样公司可以自己去定义服务治理的功能。
  目前不管是书籍还是博客等关于thrift的都是少之又少,最近突然爱学习了,所以打算写一系列thrift相关的博客,关于使用基本不会介绍的过多,因为基本使用也就十几行代码,主要是介绍内部处理逻辑。具体的包括Thrift框架分析,netty框架分析,分布式服务治理等三个方面。
  为了方便后续统称为Thrift而不是Nifty,因为很多代码还是沿用的Thrift。2. Thrfit服务端创建与核心组件介绍EchoServiceImpl service = new EchoServiceImpl(); ThriftCodecManager manager = new ThriftCodecManager(); ThriftServiceProcessor processor = new ThriftServiceProcessor(manager, ImmutableList.of(), service); ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8081)); server.start();
  其中EchoServiceImpl是使用swift代码生成器生成的接口的实现类,有个echo方法,简单说下这几个组件。ThriftCodecManager:编解码器的管理类,会将各类编解码器,比如StringCodec添加到缓存中。ThriftServiceProcessor:thrift服务处理器,服务端收到数据后,最终将由这个类来进行处理,可以理解为最核心的类了。ThriftServer: thrift服务器,主要用来设置参。启动服务,说具体点就是设置好netty的一些处理器等参数,然后启动netty服务。ThriftServerConfig: Thrfit服务启动的一些配置类,包括了端口号,线程池,线程数量等。3. 服务端启动流程极简介绍
  从上面的组件分析是不是能猜到一点thrift的内部处理流程。简单两句话就是,创建各类自定义处理器handler,添加到netty的处理器集合中,然后启动netty服务。当收到客户端发来的数据后,交由特定处理器进行数据的处理,根据协议和编解码器从buffer中进行数据的解析和转换,最终得到类名,方法的参数和方法名等(各类信息都能解析到);从ThriftServicProcessor查到Method,传入方法参数,反射执行得到结果,然后将结果通过netty响应给客户端。4. 服务端启动全解析
  先从ThriftServer  创建和启动来看,了解总体的流程,后续再回过头来看各个组件的处理流程。ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8081));  public ThriftServer(NiftyProcessor processor, ThriftServerConfig config){     this(processor, config, new NiftyTimer("thrift")); }
  继续往下,追到最终的构造方法public ThriftServer(         final NiftyProcessor processor,         ThriftServerConfig config,         @ThriftServerTimer Timer timer,         Map availableFrameCodecFactories,         Map availableProtocolFactories,         @ThriftServerWorkerExecutor Map availableWorkerExecutors,         NiftySecurityFactoryHolder securityFactoryHolder){      NiftyProcessorFactory processorFactory = new NiftyProcessorFactory(){         @Override         public NiftyProcessor getProcessor(TTransport transport)         {             return processor;         }     };      String transportName = config.getTransportName();     String protocolName = config.getProtocolName();      checkState(availableFrameCodecFactories.containsKey(transportName), "No available server transport named " + transportName);     checkState(availableProtocolFactories.containsKey(protocolName), "No available server protocol named " + protocolName);      configuredPort = config.getPort();      workerExecutor = config.getOrBuildWorkerExecutor(availableWorkerExecutors);      acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build());     acceptorThreads = config.getAcceptorThreadCount();     ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build());     ioThreads = config.getIoThreadCount();      serverChannelFactory = new NioServerSocketChannelFactory(new NioServerBossPool(acceptorExecutor, acceptorThreads, ThreadNameDeterminer.CURRENT),                                                              new NioWorkerPool(ioExecutor, ioThreads, ThreadNameDeterminer.CURRENT));      ThriftServerDef thriftServerDef = ThriftServerDef.newBuilder()                                                      .name("thrift")                                                      .listen(configuredPort)                                                      .limitFrameSizeTo((int) config.getMaxFrameSize().toBytes())                                                      .clientIdleTimeout(config.getIdleConnectionTimeout())                                                      .withProcessorFactory(processorFactory)                                                      .limitConnectionsTo(config.getConnectionLimit())                                                      .limitQueuedResponsesPerConnection(config.getMaxQueuedResponsesPerConnection())                                                      .thriftFrameCodecFactory(availableFrameCodecFactories.get(transportName))                                                      .protocol(availableProtocolFactories.get(protocolName))                                                      .withSecurityFactory(securityFactoryHolder.niftySecurityFactory)                                                      .using(workerExecutor)                                                      .taskTimeout(config.getTaskExpirationTimeout())                                                      .withSecurityFactory(config.getSecurityFactory())                                                      .withHeader(config.getHeader())                                                      .withServerHandler(config.getNiftyServerHandler())                                                      .build();      NettyServerConfigBuilder nettyServerConfigBuilder = NettyServerConfig.newBuilder();      nettyServerConfigBuilder.getServerSocketChannelConfig().setBacklog(config.getAcceptBacklog());     nettyServerConfigBuilder.setBossThreadCount(config.getAcceptorThreadCount());     nettyServerConfigBuilder.setWorkerThreadCount(config.getIoThreadCount());     nettyServerConfigBuilder.setTimer(timer);      NettyServerConfig nettyServerConfig = nettyServerConfigBuilder.build();      transport = new NettyServerTransport(thriftServerDef, nettyServerConfig, allChannels); }
  构造方法简单来说主要做了两件事:创建和获取netty需要的连接线程池和io处理线程池以及数量,从而构建netty服务组件NioServerSocketChannelFactory  ;构建netty服务的配置类nettyServerConfig, thrift服务的配置类ThriftServerDef
  我们再来看下细节点的东西,思考一些问题,如果不感兴趣可以跳过。
  工作线程池workerExecutor  的创建:workerExecutor = config.getOrBuildWorkerExecutor(availableWorkerExecutors);
  默认传进来的availableWorkerExecutors  是空的,所以最终是构建一个新的线程池,
  最终调用的方法是makeDefaultWorkerExecutor  ,下面的代码稍微简化了一点。默认得到的就是个无界的队列;如果你需要构建有限容量队列的线程池,可以在创建config后调用setMaxQueuedRequests  来设置队列容量超出队列容量后将执行线程池拒绝策略(throw RejectedExecutionException  )默认核心线程数和最大线程数是一样的,数值为200;使用guava提供的ThreadFactoryBuilder来构建线程工厂,主要是取个容易理解的名字;在thrift中大量使用了guava  提供的工具类。private ExecutorService makeDefaultWorkerExecutor(){     BlockingQueue queue = new LinkedBlockingQueue<>();      return new ThreadPoolExecutor(getWorkerThreads(),                                   getWorkerThreads(),                                   0L,                                   TimeUnit.MILLISECONDS,                                   queue,                                   new ThreadFactoryBuilder().setNameFormat("thrift-worker-%s").build(),                                   new ThreadPoolExecutor.AbortPolicy()); }
  netty的连接线程池和io线程池创建acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build()); acceptorThreads = config.getAcceptorThreadCount(); ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build()); ioThreads = config.getIoThreadCount(); 该线程池创建的方法是netty提供的,看名字是不是像无界队列的线程池呢?连接数量的限定是1,io线程数量的限定默认是电脑核数*2,但都是可以在配置类中指定的。
  关于netty配置类NettyServerConfig的构建nettyServerConfigBuilder.build()  , 在之前设置了连接线程的线程数和io线程池线程数以及定时器timer,调用build后如下:public NettyServerConfig build() {     Timer timer = getTimer();     ExecutorService bossExecutor = getBossExecutor();     int bossThreadCount = getBossThreadCount();     ExecutorService workerExecutor = getWorkerExecutor();     int workerThreadCount = getWorkerThreadCount();      return new NettyServerConfig(             getBootstrapOptions(),             timer != null ? timer : new NiftyTimer(threadNamePattern("")),             bossExecutor != null ? bossExecutor : buildDefaultBossExecutor(),             bossThreadCount,             workerExecutor != null ? workerExecutor : buildDefaultWorkerExecutor(),             workerThreadCount     ); }  private ExecutorService buildDefaultBossExecutor() {     return newCachedThreadPool(renamingThreadFactory(threadNamePattern("-boss-%s"))); } private ExecutorService buildDefaultWorkerExecutor() {     return newCachedThreadPool(renamingThreadFactory(threadNamePattern("-worker-%s"))); }
  由于没有设置两个线程池,所以会设置默认的线程池,注意这里一个是Boss线程池一个是Worker线程池。 其中的timer在后续构建netty处理器的时候会多次用到。
  allChannels  是一个netty  提供的channelGroup:private final DefaultChannelGroup allChannels = new DefaultChannelGroup();
  在介绍后续流程前,先了解下netty服务端创建步骤,因为这里用的是netty3,和我们比较熟悉的netty4差距有点大,可以对比着看下。ChannelFactory factory = new NioServerSocketChannelFactory(                  Executors.newCachedThreadPool(),                  Executors.newCachedThreadPool(),          ); ServerBootstrap bootstrap = new ServerBootstrap(factory);  bootstrap.setPipelineFactory(new ChannelPipelineFactory() {  public ChannelPipeline getPipeline() throws Exception {      ChannelPipeline pipeline = Channels.pipeline();      pipeline.addLast("handler1", new Handler1());      return pipeline;  } });  Channel channel = bootstrap.bind(new InetSocketAddress(8081));
  构建NettyServerTransport  的时候,在构造方法里面就是进行netty的一些设置。transport = new NettyServerTransport(thriftServerDef, nettyServerConfig, allChannels);
  在这里面主要是进行一些对象成员变量的设置,最后也是最重要的就是构建ChannelPipelineFactory  ,在其中设置各种处理器,大部分继承自SimpleChannelUpstreamHandler  ,少部分继承自ChannelDownstreamHandler  ,可以类比netty4  的ChannelInboundHandler,ChannelOutboundHandler  。this.pipelineFactory = new ChannelPipelineFactory(){     @Override     public ChannelPipeline getPipeline() throws Exception {         ChannelPipeline cp = Channels.pipeline();         // 设置处理器handler         return cp;     } }
  我们来看有哪些处理器。ConnectionLimiter  : 连接限流器, 创建的时候需要指定最大连接数,以及初始值为0的一个计数器;每次建立连接的时候计数器数值 + 1,关闭的时候数值 - 1;当连接数达到上限,则关闭通道,即channel。ChannelStatistics  : 传入allChannels来构建对象,内部持有一个channelCount = 0  ,来统计建立通道channel数, 每次建立连接接受到数据的时候channelCount + 1  ,提供了get方法来获取channelCount  ;同时将channel  加入到allChannels  中。编解码处理器DefaultThriftFrameCodec  ,每次收到的数据和传出的数据都需要进行一次编码或者解码。连接的上下文处理器ConnectionContextHandler  ,在建立连接的时候创建NiftyConnectionContext  ,连接的上下文环境,包含了removeAddress和属性map,绑定到ChannelHandlerContext  ,即ctx.setAttachment(context);  。netty  提供的关于超时处理器IdleStateHandler  和IdleDisconnectHandler  。NiftyDispatcher  ,这个处理器最为核心,收到buffer经过解码后数据就传到了该处理器,该处理器会对数据进行一系列的处理和方法的调用等。异常事件处理器NiftyExceptionLogger  ,重写了log  方法,如果出现异常事件,该处理器会打印响应的异常日志。
  来一览处理过程吧public NettyServerTransport(final ThriftServerDef def, final NettyServerConfig nettyServerConfig, final ChannelGroup allChannels){         this.def = def;         this.nettyServerConfig = nettyServerConfig;         this.port = def.getServerPort();         this.allChannels = allChannels;         final ConnectionLimiter connectionLimiter = new ConnectionLimiter(def.getMaxConnections());          this.channelStatistics = new ChannelStatistics(allChannels);          this.pipelineFactory = new ChannelPipelineFactory()         {             @Override             public ChannelPipeline getPipeline()                     throws Exception             {                 ChannelPipeline cp = Channels.pipeline();                 TProtocolFactory inputProtocolFactory = def.getDuplexProtocolFactory().getInputProtocolFactory();                 NiftySecurityHandlers securityHandlers = def.getSecurityFactory().getSecurityHandlers(def, nettyServerConfig);                 cp.addLast("connectionContext", new ConnectionContextHandler());                 cp.addLast("connectionLimiter", connectionLimiter);                 cp.addLast(ChannelStatistics.NAME, channelStatistics);                 cp.addLast("frameCodec", def.getThriftFrameCodecFactory().create(def.getMaxFrameSize(),                                                                                  inputProtocolFactory));                 if (def.getClientIdleTimeout() != null) {                     // Add handlers to detect idle client connections and disconnect them                     cp.addLast("idleTimeoutHandler", new IdleStateHandler(nettyServerConfig.getTimer(),                                                                           def.getClientIdleTimeout().toMillis(),                                                                           NO_WRITER_IDLE_TIMEOUT,                                                                           NO_ALL_IDLE_TIMEOUT,                                                                           TimeUnit.MILLISECONDS));                     cp.addLast("idleDisconnectHandler", new IdleDisconnectHandler());                 }                  cp.addLast("dispatcher", new NiftyDispatcher(def, nettyServerConfig.getTimer()));                 cp.addLast("exceptionLogger", new NiftyExceptionLogger());                 return cp;             }         };     }
  我们再回到server.start()  ,看下ThriftServer如何启动的
  ThriftServer  : 初始状态是NOT_STARTED  public synchronized ThriftServer start(){     checkState(state != State.CLOSED, "Thrift server is closed");     if (state == State.NOT_STARTED) {         transport.start(serverChannelFactory);         state = State.RUNNING;     }     return this; }
  NettyServerTransport:   标准的netty服务端创建过程,其中pipelineFactory  就是在前面NettyServerTransport  构造方法中所创建的。public void start(ServerChannelFactory serverChannelFactory){     bootstrap = new ServerBootstrap(serverChannelFactory);     bootstrap.setOptions(nettyServerConfig.getBootstrapOptions());     bootstrap.setPipelineFactory(pipelineFactory);      serverChannel = bootstrap.bind(new InetSocketAddress(port));     // ...  }
  到这里Thrift  的服务端启动就介绍完了,下一部分将会介绍服务端接受数据,处理数据和响应结果的流程。

28国联合调查,倪光南多次呼吁,美国芯片巨头美梦破碎去年九月份,美国芯片巨头英伟达放出消息,将豪掷400亿美元,从日本软银集团手中收购ARM公司,引起了国际市场的强烈反响。这一方面是因为400亿美元的天价,创下了全球半导体市场史上最华为地位飙升,欧洲多国解禁华为,美国又会做出什么疯狂的举动?华为老总任正非在一次采访做出了一项重大的全新决定,那就是把华为的核心科技毫无保留地上交给国家。为的就是能有朝一日带领其他国产企业打破美国的封锁,做自己真正的主人。说到这里,不免让人今日头条在你心中代表什么?充实。在今日头条可看新闻,不出门便知天下事。可以学习你需要的知识,而且都是免费的。可以交朋友,相互学习。可以领金币,玩玩。如此等等,不一而足。所以,有了今日头条,每天过的很充实。可商业保险是不是都是骗人的?如果是正规商业保险公司承保的保险,一般都不是骗局。但是大家要注意,保险销售人员可能会出于自身利益的角度考虑,对有些事情表述不清,让大家有一部分误解。真正等到出险了,由于符合保险的免如何对笔记本电脑进行真正有效的散热?笔记本电脑不同于台式机,它的机箱结构是封闭的,也就是说在使用的时候很难像用台式机那样拆开一面挡板直接怼风扇或者对它的内部进行改造,那么散热就是笔记本电脑使用所要面临的一个大问题,如你最喜欢的锁屏壁纸是什么?每个人的风格不一样,所选的锁屏壁纸也迥异。我以前喜欢一些风景类的锁屏壁纸,并且还特意去买了个单反去玩的时候会拍一些风景照当作壁纸,下面发几张,拍的不好大家别笑。以上这组是在广州与厦告别钴焦虑,蜂巢能源引领无钴时代?随着新能源汽车产业快速发展,对动力电池的需求愈发急切,无论是降钴增镍还是无钴化,都成为锂电行业备受关注的话题。特斯拉掀起的无钴风云没了下文,2021成都车展上,蜂巢能源的无钴电池包三星Galaxyzfilp3系列的出现,意味着折叠屏手机获得新生?今年折叠屏手机确实都有进步,当然三星的翻盖折叠屏手机看起来今年定价会创新低?Galaxyz系列手机,应该是未来折叠屏手机的发展思路?可能是这样吧!但如果明年主要手机厂商不推广折叠屏小东西也有大本领,向日葵开机棒,远程操作不求人记得上大学的时候,每天下课回到宿舍饭也不吃,就是为了能够和室友一起打游戏,有时候跑着回去也是为了能够早一些上线进入到游戏。那个时候我就在想,如果能在课堂上就把游戏上线了,那么是不是AppleWatchSeries7预计将采用新的表面设计以利用更大显示屏的优势随着即将推出的AppleWatchSeries7尺寸的增加,据传将采用更大的41毫米和45毫米尺寸,苹果计划包括新的手表表面,以利用更大的显示屏。彭博社的马克古尔曼在其最新出版的PPHP时间函数总结1checkdate()验证格利高里日期即日期是否存在。checkdate(month,day,year)month必需。一个从1到12的数字,规定月。day必需。一个从1到31的
小身材大能量!MOMAX100W氮化镓充电器体验Hi大家好,我是三重奏记得,最早在2019年的时候,市面上就出现了以新型半导体材料GaN(氮化镓),作为元件的快充产品,其充电功率直接飙升到60W,65W。要知道当时新发布的苹果1小个头大能量,AOHi65W氮化镓充电头体验Hi大家好,我是三重奏自苹果以环保名义取消附属充电头之后,相信很多人又气又恨。但是也不难发现,正是苹果的这个骚操作,给各大充电器品牌厂商带来了春天。目前市面上可选择的三方充电头越来韩国财阀不可战胜?三星太子李在镕抓了又放,连总统都不得不低头其实经常看韩国电视剧的粉丝,特别是爱看爱情片的人应该都对韩国财阀有着深刻的印象,在影视中韩国的财阀几乎都有着统一的爽朗外表,以及一颗玛丽苏的心。但在部分韩国媒体的笔下,韩国财阀却成女生为什么要侧着坐车?正着坐不是更安全吗?看完涨知识了不知道大家在骑行自行车摩托车电动车时有没有发现这样一个现象,那就是女生坐二轮交通工具时,往往选择侧着坐,而不是正着坐,正着坐不是更安全吗?相信直男是不会明白的,看完就会涨知识!优雅电子眼为罚而罚,一年抓拍60多万次违章,车主何时取消?汽车数量的增多,让交警的任务更重了,尽管有交规的限制,但是依旧有车主选择不遵守,这就需要交警来约束了,幸好电子警察出现了,在缓解交警压力的同时,也不会放过一个违章行为。交通规则的制头盔刚戴好,新禁令又出台?车主步行上班算了电动车发展迎来了寒冬,当然这是对于生产厂家而言的处境,就拿消费者来说,反而是有利的,尤其是一车一票出台之后,消费者在购买电动车后的维权变得简单。当然,随着新国标出台之后,一些著名的这类电动车将不能上路,查到会扣车罚款,月底开始实施如今,交通工具行业百花齐放,各种各样的出行工具占据了人们的生活,很多人就算是出门遛弯都会选择骑行,这并不是因为人们懒了,而是人们的生活方式得到了完全的改变。要问哪种交通工具最受人们在农村,一辆废旧摩托车可以卖到200多元,收购它干什么用呢?在经济发展的前期,人们外出基本依靠步行,这时候生活水平还没法得到保障,更不用说购买出行工具了,随着生活水平的提高,自行车进入了千家万户,当时结婚嫁妆里,自行车是大件。就在自行车占据女人说的这些话千万不能相信下第二十一句你给我一段时间考虑(不给我时间,我怎么溜啊)第二十二句你的条件真的很好(可是还没好到我想要的地步)第二十三句可是这样的感觉好怪(你这丑八怪,怪到这样还想吃天鹅肉?)第二十下载神器IDM的安装与使用,分分钟教你满速下载IDM(IntegratedDataMultiplexer)是一款国外的优秀下载工具,该软件提升你的下载速度最多达5倍,具有动态档案分割多重下载点技术,而且它会重复使用现有的联机,让女人超感动的100句温柔浪漫语下89。妳就是我最困难时的那位永远支持我的人!90。妳可知我百年的孤寂只为妳一人守候千夜的恋歌只为妳一人而唱91。白昼与黑夜将无法阻挡我俩的深深思念!92。我是那深深的大海,妳是那自