Netty线程模型源码剖析(文章尾附带全流程图)
Netty服务端设置参数
首先创建bossGroup和workerGroup(Netty客户端和服务端启动源码在上篇文章中), bossGroup只是处理客户端连接请求 ,真正和客户端业务处理会交给workerGroup完成。new NioEventLoopGroup的源码如下(其中有很多调this和super的地方就不一一列出来了,流程图中都包含,只对重要代码做注释分析)://bossGroup和workerGroup都属于MultithreadEventLoopGroup,相当于线程池 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { //如果线程数为0,则取默认值,有io.netty.eventLoopThreads取这个值,没有的话默认是cpu核数*2 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } //进入super到MultithreadEventExecutorGroup,然后会给每个children赋值 children[i] = newChild(executor, args);代码如下 protected EventLoop newChild(Executor executor, Object... args) throws Exception { //创建NioEventLoop,相当于线程池中的每个线程 return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector();//调openSelector()方法,打开Selector处理Channel,就是NIO代码 selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
创建务器端的启动对象ServerBootstrap(就是一个空对象),然后再对其设置group,设置channel,设置option及在pipeline中加入自己写的Handler。public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup);//点进去就是给group属性赋值 if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup;//给childGroup属性赋值,记住这两个属性,下面会用 return this; }//给ServerBootstrap设置channel,最后会跟到下面这段代码 public ReflectiveChannelFactory(Class<? extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz"); try { //通过传过来的类对象NioServerSocketChannel.class,得到构造参数,下面函数会用 //会在下面一节中分析这个NioServerSocketChannel.class的构造方法 this.constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", e); } } //将上述方法得到的constructor,传到本方法内 public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory;//给channelFactory属性赋值,用来创建对象,后续会用来创建对象 return self(); }//设置options,就是一个map,可以初始化服务器连接队列大小, //服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接 //多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理 public B option(ChannelOption option, T value) { if (option == null) { throw new NullPointerException("option"); } if (value == null) { synchronized (options) { options.remove(option); } } else { synchronized (options) { options.put(option, value); } } return self(); }//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行 ////对workerGroup的SocketChannel设置自己写的处理器,把自己写的处理器加到管道中 public ServerBootstrap childHandler(ChannelHandler childHandler) { if (childHandler == null) { throw new NullPointerException("childHandler"); } this.childHandler = childHandler; return this; }Netty服务端绑定端口
Netty服务端绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况;启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕(参考上一篇文章中服务端绑定端口代码),绑定源码如下://调用ServerBootstrap的bind函数一直跟最后会到AbstractBootstrap的doBind函数,此函数主要两步 //第一初始化并注册 initAndRegister(),第二 把ServerChannel绑定到网络端口 doBind0,第二步源码就不再分析了 final ChannelFuture initAndRegister() { Channel channel = null; try { //就是用constructor.newInstance()即上一节中NioServerSocketChannel.class的构造方法来初始化 //参考下面源码分析初始化NioServerSocketChannel channel = channelFactory.newChannel(); //初始化channel,获取管道ChannelPipeline,是在初始化NioServerSocketChannel产生的管道对象, //向管道中添加ChannelInitializer对象,当channel注册时会调用andlerAdded从而调用initChannel init(channel);//具体源码参考下面 } catch (Throwable t) { if (channel != null) { channel.unsafe().closeForcibly(); return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } //注册channel对象 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; } //向管道中添加ChannelInitializer对象,当channel注册时会调用andlerAdded从而调用initChannel void init(Channel channel) throws Exception { final Map, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map, Object> attrs = attrs0(); synchronized (attrs) { for (Entry, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey