详细图解 Netty Reactor 启动全流程(reactive netty)

  本篇文章为你整理了详细图解 Netty Reactor 启动全流程(reactive netty)的详细内容,包含有reactor-netty reactive netty netty启动过程源码分析 netty starter 详细图解 Netty Reactor 启动全流程,希望能帮助你了解 详细图解 Netty Reactor 启动全流程。

   本文我们通过图解源码的方式完整地介绍了整个Netty服务端启动流程,并介绍了在启动过程中涉及到的ServerBootstrap相关的属性以及配置方式。NioServerSocketChannel的创建初始化过程以及类的继承结构。其中重点介绍了NioServerSocketChannel向Reactor的注册过程以及Reactor线程的启动时机和pipeline的初始化时机。最后介绍了NioServerSocketChannel绑定端口地址的整个流程。

  
 

  大家第一眼看到这幅流程图,是不是脑瓜子嗡嗡的呢?

  大家先不要惊慌,问题不大,本文笔者的目的就是要让大家清晰的理解这幅流程图,从而深刻的理解Netty Reactor的启动全流程,包括其中涉及到的各种代码设计实现细节。

  在上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》中我们详细介绍了Netty服务端核心引擎组件主从Reactor组模型 NioEventLoopGroup以及Reactor模型 NioEventLoop的创建过程。最终我们得到了netty Reactor模型的运行骨架如下:

  现在Netty服务端程序的骨架是搭建好了,本文我们就基于这个骨架来深入剖析下Netty服务端的启动过程。

  我们继续回到上篇文章提到的Netty服务端代码模板中,在创建完主从Reactor线程组:bossGroup,workerGroup后,接下来就开始配置Netty服务端的启动辅助类ServerBootstrap 了。

  

public final class EchoServer {

 

   static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

   public static void main(String[] args) throws Exception {

   // Configure the server.

   //创建主从Reactor线程组

   EventLoopGroup bossGroup = new NioEventLoopGroup(1);

   EventLoopGroup workerGroup = new NioEventLoopGroup();

   final EchoServerHandler serverHandler = new EchoServerHandler();

   try {

   ServerBootstrap b = new ServerBootstrap();

   b.group(bossGroup, workerGroup)//配置主从Reactor

   .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型

   .option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项

   .handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel- pipline- handler

   .childHandler(new ChannelInitializer SocketChannel () {//设置从Reactor中注册channel的pipeline

   @Override

   public void initChannel(SocketChannel ch) throws Exception {

   ChannelPipeline p = ch.pipeline();

   //p.addLast(new LoggingHandler(LogLevel.INFO));

   p.addLast(serverHandler);

   // Start the server. 绑定端口启动服务,开始监听accept事件

   ChannelFuture f = b.bind(PORT).sync();

   // Wait until the server socket is closed.

   f.channel().closeFuture().sync();

   } finally {

   // Shut down all event loops to terminate all threads.

   bossGroup.shutdownGracefully();

   workerGroup.shutdownGracefully();

  

 

  在上篇文章中我们对代码模板中涉及到ServerBootstrap 的一些配置方法做了简单的介绍,大家如果忘记的话,可以在返回去回顾一下。

  ServerBootstrap类其实没有什么特别的逻辑,主要是对Netty启动过程中需要用到的一些核心信息进行配置管理,比如:

  
Netty的核心引擎组件主从Reactor线程组: bossGroup,workerGroup。通过ServerBootstrap#group方法配置。

  
Netty服务端使用到的Channel类型:NioServerSocketChannel ,通过ServerBootstrap#channel方法配置。
 

  以及配置NioServerSocketChannel时用到的SocketOption。SocketOption用于设置底层JDK NIO Socket的一些选项。通过ServerBootstrap#option方法进行配置。

  
主ReactorGroup中的MainReactor管理的Channel类型为NioServerSocketChannel,如图所示主要用来监听端口,接收客户端连接,为客户端创建初始化NioSocketChannel,然后采用round-robin轮询的方式从图中从ReactorGroup中选择一个SubReactor与该客户端NioSocketChannel进行绑定。

  
从ReactorGroup中的SubReactor管理的Channel类型为NioSocketChannel,它是netty中定义客户端连接的一个模型,每个连接对应一个。如图所示SubReactor负责监听处理绑定在其上的所有NioSocketChannel上的IO事件。

  
保存服务端NioServerSocketChannel和客户端NioSocketChannel对应pipeline中指定的ChannelHandler。用于后续Channel向Reactor注册成功之后,初始化Channel里的pipeline。

  
不管是服务端用到的NioServerSocketChannel还是客户端用到的NioSocketChannel,每个Channel实例都会有一个Pipeline,Pipeline中有多个ChannelHandler用于编排处理对应Channel上感兴趣的IO事件。

  ServerBootstrap结构中包含了netty服务端程序启动的所有配置信息,在我们介绍启动流程之前,先来看下ServerBootstrap的源码结构:

  ServerBootstrap

  ServerBootstrap的继承结构比较简单,继承层次的职责分工也比较明确。

  ServerBootstrap主要负责对主从Reactor线程组相关的配置进行管理,其中带child前缀的配置方法是对从Reactor线程组的相关配置管理。从Reactor线程组中的Sub Reactor负责管理的客户端NioSocketChannel相关配置存储在ServerBootstrap结构中。

  父类AbstractBootstrap则是主要负责对主Reactor线程组相关的配置进行管理,以及主Reactor线程组中的Main Reactor负责处理的服务端ServerSocketChannel相关的配置管理。

  1. 配置主从Reactor线程组

  

ServerBootstrap b = new ServerBootstrap();

 

  b.group(bossGroup, workerGroup)//配置主从Reactor

  

 

  

public class ServerBootstrap extends AbstractBootstrap ServerBootstrap, ServerChannel {

 

   //Main Reactor线程组

   volatile EventLoopGroup group;

   //Sub Reactor线程组

   private volatile EventLoopGroup childGroup;

   public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {

   //父类管理主Reactor线程组

   super.group(parentGroup);

   if (this.childGroup != null) {

   throw new IllegalStateException("childGroup set already");

   this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");

   return this;

  

 

  2. 配置服务端ServerSocketChannel

  

ServerBootstrap b = new ServerBootstrap();

 

  b.channel(NioServerSocketChannel.class);

  

 

  

public class ServerBootstrap extends AbstractBootstrap ServerBootstrap, ServerChannel {

 

   //用于创建ServerSocketChannel ReflectiveChannelFactory

   private volatile ChannelFactory ? extends C channelFactory;

   public B channel(Class ? extends C channelClass) {

   return channelFactory(new ReflectiveChannelFactory C (

   ObjectUtil.checkNotNull(channelClass, "channelClass")

   @Deprecated

   public B channelFactory(ChannelFactory ? extends C channelFactory) {

   ObjectUtil.checkNotNull(channelFactory, "channelFactory");

   if (this.channelFactory != null) {

   throw new IllegalStateException("channelFactory set already");

   this.channelFactory = channelFactory;

   return self();

  

 

  在向ServerBootstrap配置服务端ServerSocketChannel的channel 方法中,其实是创建了一个ChannelFactory工厂实例ReflectiveChannelFactory,在Netty服务端启动的过程中,会通过这个ChannelFactory去创建相应的Channel实例。

  我们可以通过这个方法来配置netty的IO模型,下面为ServerSocketChannel在不同IO模型下的实现:

  
我们只需要将IO模型的这些核心接口对应的实现类前缀改为对应IO模型的前缀,就可以轻松在Netty中完成对IO模型的切换。

  2.1 ReflectiveChannelFactory

  

public class ReflectiveChannelFactory T extends Channel implements ChannelFactory T {

 

   //NioServerSocketChannelde 构造器

   private final Constructor ? extends T constructor;

   public ReflectiveChannelFactory(Class ? extends T clazz) {

   ObjectUtil.checkNotNull(clazz, "clazz");

   try {

   //反射获取NioServerSocketChannel的构造器

   this.constructor = clazz.getConstructor();

   } catch (NoSuchMethodException e) {

   throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +

   " does not have a public non-arg constructor", e);

   @Override

   public T newChannel() {

   try {

   //创建NioServerSocketChannel实例

   return constructor.newInstance();

   } catch (Throwable t) {

   throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);

  

 

  从类的签名我们可以看出,这个工厂类是通过泛型加反射的方式来创建对应的Channel实例。

  泛型参数T extends Channel表示的是要通过工厂类创建的Channel类型,这里我们初始化的是NioServerSocketChannel。

  在ReflectiveChannelFactory 的构造器中通过反射的方式获取NioServerSocketChannel的构造器。

  在newChannel 方法中通过构造器反射创建NioServerSocketChannel实例。

  注意这时只是配置阶段,NioServerSocketChannel此时并未被创建。它是在启动的时候才会被创建出来。

  3. 为NioServerSocketChannel配置ChannelOption

  

ServerBootstrap b = new ServerBootstrap();

 

  //设置被MainReactor管理的NioServerSocketChannel的Socket选项

  b.option(ChannelOption.SO_BACKLOG, 100)

  

 

  

public abstract class AbstractBootstrap B extends AbstractBootstrap B, C , C extends Channel implements Cloneable {

 

   //serverSocketChannel中的ChannelOption配置

   private final Map ChannelOption ? , Object options = new LinkedHashMap ChannelOption ? , Object

   public T B option(ChannelOption T option, T value) {

   ObjectUtil.checkNotNull(option, "option");

   synchronized (options) {

   if (value == null) {

   options.remove(option);

   } else {

   options.put(option, value);

   return self();

  

 

  无论是服务端的NioServerSocketChannel还是客户端的NioSocketChannel它们的相关底层Socket选项ChannelOption配置全部存放于一个Map类型的数据结构中。

  由于客户端NioSocketChannel是由从Reactor线程组中的Sub Reactor来负责处理,所以涉及到客户端NioSocketChannel所有的方法和配置全部是以child前缀开头。

  

ServerBootstrap b = new ServerBootstrap();

 

  .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)

  

 

  

public class ServerBootstrap extends AbstractBootstrap ServerBootstrap, ServerChannel {

 

   //客户端SocketChannel对应的ChannelOption配置

   private final Map ChannelOption ? , Object childOptions = new LinkedHashMap ChannelOption ? , Object

   public T ServerBootstrap childOption(ChannelOption T childOption, T value) {

   ObjectUtil.checkNotNull(childOption, "childOption");

   synchronized (childOptions) {

   if (value == null) {

   childOptions.remove(childOption);

   } else {

   childOptions.put(childOption, value);

   return this;

  

 

  相关的底层Socket选项,netty全部枚举在ChannelOption类中,笔者这里就不一一列举了,在本系列后续相关的文章中,笔者还会为大家详细的介绍这些参数的作用。

  

public class ChannelOption T extends AbstractConstant ChannelOption T {

 

   ..................省略..............

   public static final ChannelOption Boolean SO_BROADCAST = valueOf("SO_BROADCAST");

   public static final ChannelOption Boolean SO_KEEPALIVE = valueOf("SO_KEEPALIVE");

   public static final ChannelOption Integer SO_SNDBUF = valueOf("SO_SNDBUF");

   public static final ChannelOption Integer SO_RCVBUF = valueOf("SO_RCVBUF");

   public static final ChannelOption Boolean SO_REUSEADDR = valueOf("SO_REUSEADDR");

   public static final ChannelOption Integer SO_LINGER = valueOf("SO_LINGER");

   public static final ChannelOption Integer SO_BACKLOG = valueOf("SO_BACKLOG");

   public static final ChannelOption Integer SO_TIMEOUT = valueOf("SO_TIMEOUT");

   ..................省略..............

  

 

  4. 为服务端NioServerSocketChannel中的Pipeline配置ChannelHandler

  

 //serverSocketChannel中pipeline里的handler(主要是acceptor)

 

   private volatile ChannelHandler handler;

   public B handler(ChannelHandler handler) {

   this.handler = ObjectUtil.checkNotNull(handler, "handler");

   return self();

  

 

  向NioServerSocketChannel中的Pipeline添加ChannelHandler分为两种方式:

  显式添加: 显式添加的方式是由用户在main线程中通过ServerBootstrap#handler的方式添加。如果需要添加多个ChannelHandler,则可以通过ChannelInitializer向pipeline中进行添加。

  
关于ChannelInitializer后面笔者会有详细介绍,这里大家只需要知道ChannelInitializer是一种特殊的ChannelHandler,用于初始化pipeline。适用于向pipeline中添加多个ChannelHandler的场景。

  

 ServerBootstrap b = new ServerBootstrap();

 

   b.group(bossGroup, workerGroup)//配置主从Reactor

   .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型

   .handler(new ChannelInitializer NioServerSocketChannel () {

   @Override

   protected void initChannel(NioServerSocketChannel ch) throws Exception {

   ChannelPipeline p = ch.pipeline();

   p.addLast(channelhandler1)

   .addLast(channelHandler2)

   ......

   .addLast(channelHandler3);

  

 

  隐式添加:隐式添加主要添加的就是主ReactorGroup的核心组件也就是下图中的acceptor,Netty中的实现为ServerBootstrapAcceptor,本质上也是一种ChannelHandler,主要负责在客户端连接建立好后,初始化客户端NioSocketChannel,在从Reactor线程组中选取一个Sub Reactor,将客户端NioSocketChannel 注册到Sub Reactor中的selector上。

  
隐式添加ServerBootstrapAcceptor是由Netty框架在启动的时候负责添加,用户无需关心。

  在本例中,NioServerSocketChannel的PipeLine中只有两个ChannelHandler,一个由用户在外部显式添加的LoggingHandler,另一个是由Netty框架隐式添加的ServerBootstrapAcceptor。

  其实我们在实际项目使用的过程中,不会向netty服务端NioServerSocketChannel添加额外的ChannelHandler,NioServerSocketChannel只需要专心做好自己最重要的本职工作接收客户端连接就好了。这里额外添加一个LoggingHandler只是为了向大家展示ServerBootstrap的配置方法。

  5. 为客户端NioSocketChannel中的Pipeline配置ChannelHandler

  

 final EchoServerHandler serverHandler = new EchoServerHandler();

 

   serverBootstrap.childHandler(new ChannelInitializer SocketChannel () {//设置从Reactor中注册channel的pipeline

   @Override

   public void initChannel(SocketChannel ch) throws Exception {

   ChannelPipeline p = ch.pipeline();

   p.addLast(new LoggingHandler(LogLevel.INFO));

   p.addLast(serverHandler);

  

 

  

 //socketChannel中pipeline中的处理handler

 

   private volatile ChannelHandler childHandler;

   public ServerBootstrap childHandler(ChannelHandler childHandler) {

   this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");

   return this;

  

 

  向客户端NioSocketChannel中的Pipeline里添加ChannelHandler完全是由用户自己控制显式添加,添加的数量不受限制。

  由于在Netty的IO线程模型中,是由单个Sub Reactor线程负责执行客户端NioSocketChannel中的Pipeline,一个Sub Reactor线程负责处理多个NioSocketChannel上的IO事件,如果Pipeline中的ChannelHandler添加的太多,就会影响Sub Reactor线程执行其他NioSocketChannel上的Pipeline,从而降低IO处理效率,降低吞吐量。

  所以Pipeline中的ChannelHandler不易添加过多,并且不能再ChannelHandler中执行耗时的业务处理任务。

  在我们通过ServerBootstrap配置netty服务端启动信息的时候,无论是向服务端NioServerSocketChannel的pipeline中添加ChannelHandler,还是向客户端NioSocketChannel的pipeline中添加ChannelHandler,当涉及到多个ChannelHandler添加的时候,我们都会用到ChannelInitializer,那么这个ChannelInitializer究竟是何方圣神,为什么要这样做呢?我们接着往下看~~

  ChannelInitializer

  首先ChannelInitializer它继承于ChannelHandler,它自己本身就是一个ChannelHandler,所以它可以添加到childHandler中。

  其他的父类大家这里可以不用管,后面文章中笔者会一一为大家详细介绍。

  那为什么不直接添加ChannelHandler而是选择用ChannelInitializer呢?

  这里主要有两点原因:

  
前边我们提到,客户端NioSocketChannel是在服务端accept连接后,在服务端NioServerSocketChannel中被创建出来的。但是此时我们正处于配置ServerBootStrap阶段,服务端还没有启动,更没有客户端连接上来,此时客户端NioSocketChannel还没有被创建出来,所以也就没办法向客户端NioSocketChannel的pipeline中添加ChannelHandler。

  
客户端NioSocketChannel中Pipeline里可以添加任意多个ChannelHandler,但是Netty框架无法预知用户到底需要添加多少个ChannelHandler,所以Netty框架提供了回调函数ChannelInitializer#initChannel,使用户可以自定义ChannelHandler的添加行为。

  
当客户端NioSocketChannel注册到对应的Sub Reactor上后,紧接着就会初始化NioSocketChannel中的Pipeline,此时Netty框架会回调ChannelInitializer#initChannel执行用户自定义的添加逻辑。

  

public abstract class ChannelInitializer C extends Channel extends ChannelInboundHandlerAdapter {

 

   @Override

   @SuppressWarnings("unchecked")

   public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {

   //当channelRegister事件发生时,调用initChannel初始化pipeline

   if (initChannel(ctx)) {

   .................省略...............

   } else {

   .................省略...............

   private boolean initChannel(ChannelHandlerContext ctx) throws Exception {

   if (initMap.add(ctx)) { // Guard against re-entrance.

   try {

   //此时客户单NioSocketChannel已经创建并初始化好了

   initChannel((C) ctx.channel());

   } catch (Throwable cause) {

   .................省略...............

   } finally {

   .................省略...............

   return true;

   return false;

   protected abstract void initChannel(C ch) throws Exception;

   .................省略...............

  

 

  这里由netty框架回调的ChannelInitializer#initChannel方法正是我们自定义的添加逻辑。

  

 final EchoServerHandler serverHandler = new EchoServerHandler();

 

   serverBootstrap.childHandler(new ChannelInitializer SocketChannel () {//设置从Reactor中注册channel的pipeline

   @Override

   public void initChannel(SocketChannel ch) throws Exception {

   ChannelPipeline p = ch.pipeline();

   p.addLast(new LoggingHandler(LogLevel.INFO));

   p.addLast(serverHandler);

  

 

  到此为止,Netty服务端启动所需要的必要配置信息,已经全部存入ServerBootStrap启动辅助类中。

  接下来要做的事情就是服务端的启动了。

  

// Start the server. 绑定端口启动服务,开始监听accept事件

 

  ChannelFuture f = serverBootStrap.bind(PORT).sync();

  

 

  Netty服务端的启动

  经过前面的铺垫终于来到了本文的核心内容----Netty服务端的启动过程。

  如代码模板中的示例所示,Netty服务端的启动过程封装在io.netty.bootstrap.AbstractBootstrap#bind(int)函数中。

  接下来我们看一下Netty服务端在启动过程中究竟干了哪些事情?

  大家看到这副启动流程图先不要慌,接下来的内容笔者会带大家各个击破它,在文章的最后保证让大家看懂这副流程图。

  我们先来从netty服务端启动的入口函数开始我们今天的源码解析旅程:

  

 public ChannelFuture bind(int inetPort) {

 

   return bind(new InetSocketAddress(inetPort));

   public ChannelFuture bind(SocketAddress localAddress) {

   //校验Netty核心组件是否配置齐全

   validate();

   //服务端开始启动,绑定端口地址,接收客户端连接

   return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));

   private ChannelFuture doBind(final SocketAddress localAddress) {

   //异步创建,初始化,注册ServerSocketChannel到main reactor上

   final ChannelFuture regFuture = initAndRegister();

   final Channel channel = regFuture.channel();

   if (regFuture.cause() != null) {

   return regFuture;

   if (regFuture.isDone()) {

   ........serverSocketChannel向Main Reactor注册成功后开始绑定端口....,

   } else {

   //如果此时注册操作没有完成,则向regFuture添加operationComplete回调函数,注册成功后回调。

   regFuture.addListener(new ChannelFutureListener() {

   @Override

   public void operationComplete(ChannelFuture future) throws Exception {

   ........serverSocketChannel向Main Reactor注册成功后开始绑定端口....,

   return promise;

  

 

  Netty服务端的启动流程总体如下:

  
注册成功后,开始初始化NioServerSocketChannel中的pipeline,然后在pipeline中触发channelRegister事件。

  
绑定端口地址成功后,向NioServerSocketChannel对应的Pipeline中触发传播ChannelActive事件,在ChannelActive事件回调中向Main Reactor注册OP_ACCEPT事件,开始等待客户端连接。服务端启动完成。

  
当netty服务端启动成功之后,最终我们会得到如下结构的阵型,开始枕戈待旦,准备接收客户端的连接,Reactor开始运转。

  接下来,我们就来看下Netty源码是如何实现以上步骤的~~

  1. initAndRegister

  

 final ChannelFuture initAndRegister() {

 

   Channel channel = null;

   try {

   //创建NioServerSocketChannel

   //ReflectiveChannelFactory通过泛型,反射,工厂的方式灵活创建不同类型的channel

   channel = channelFactory.newChannel();

   //初始化NioServerSocketChannel

   init(channel);

   } catch (Throwable t) {

   ..............省略.................

   //向MainReactor注册ServerSocketChannel

   ChannelFuture regFuture = config().group().register(channel);

   ..............省略.................

   return regFuture;

  

 

  从函数命名中我们可以看出,这个函数主要做的事情就是首先创建NioServerSocketChannel ,并对NioServerSocketChannel 进行初始化,最后将NioServerSocketChannel 注册到Main Reactor中。

  1.1 创建NioServerSocketChannel

  还记得我们在介绍ServerBootstrap启动辅助类配置服务端ServerSocketChannel类型的时候提到的工厂类ReflectiveChannelFactory 吗?

  因为当时我们在配置ServerBootstrap启动辅助类的时候,还没到启动阶段,而配置阶段并不是创建具体ServerSocketChannel的时机。

  所以Netty通过工厂模式将要创建的ServerSocketChannel的类型(通过泛型指定)以及 创建的过程(封装在newChannel函数中)统统先封装在工厂类ReflectiveChannelFactory中。

  ReflectiveChannelFactory通过泛型,反射,工厂的方式灵活创建不同类型的channel

  等待创建时机来临,我们调用保存在ServerBootstrap中的channelFactory直接进行创建。

  

public class ReflectiveChannelFactory T extends Channel implements ChannelFactory T {

 

   private final Constructor ? extends T constructor;

   @Override

   public T newChannel() {

   try {

   return constructor.newInstance();

   } catch (Throwable t) {

   throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);

  

 

  下面我们来看下NioServerSocketChannel的构建过程:

  1.1.1 NioServerSocketChannel

  

public class NioServerSocketChannel extends AbstractNioMessageChannel

 

   implements io.netty.channel.socket.ServerSocketChannel {

   //SelectorProvider(用于创建Selector和Selectable Channels)

   private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

   public NioServerSocketChannel() {

   this(newSocket(DEFAULT_SELECTOR_PROVIDER));

   //创建JDK NIO ServerSocketChannel

   private static ServerSocketChannel newSocket(SelectorProvider provider) {

   try {

   return provider.openServerSocketChannel();

   } catch (IOException e) {

   throw new ChannelException(

   "Failed to open a server socket.", e);

   //ServerSocketChannel相关的配置

   private final ServerSocketChannelConfig config;

   public NioServerSocketChannel(ServerSocketChannel channel) {

   //父类AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT

   super(null, channel, SelectionKey.OP_ACCEPT);

   //DefaultChannelConfig中设置用于Channel接收数据用的buffer- AdaptiveRecvByteBufAllocator

   config = new NioServerSocketChannelConfig(this, javaChannel().socket());

  

 

  
首先调用newSocket 创建JDK NIO 原生ServerSocketChannel,这里调用了SelectorProvider#openServerSocketChannel 来创建JDK NIO 原生ServerSocketChannel,我们在上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》中详细的介绍了SelectorProvider相关内容,当时是用SelectorProvider来创建Reactor中的Selector。大家还记得吗??

  
通过父类构造器设置NioServerSocketChannel感兴趣的IO事件,这里设置的是SelectionKey.OP_ACCEPT事件。并将JDK NIO 原生ServerSocketChannel封装起来。

  
创建Channel的配置类NioServerSocketChannelConfig,在配置类中封装了对Channel底层的一些配置行为,以及JDK中的ServerSocket。以及创建NioServerSocketChannel接收数据用的Buffer分配器AdaptiveRecvByteBufAllocator。

  
NioServerSocketChannelConfig没什么重要的东西,我们这里也不必深究,它就是管理NioServerSocketChannel相关的配置,这里唯一需要大家注意的是这个用于Channel接收数据用的Buffer分配器AdaptiveRecvByteBufAllocator,我们后面在介绍Netty如何接收连接的时候还会提到。

  NioServerSocketChannel 的整体构建过程介绍完了,现在我们来按照继承层次再回过头来看下NioServerSocketChannel 的层次构建,来看下每一层都创建了什么,封装了什么,这些信息都是Channel的核心信息,所以有必要了解一下。

  在NioServerSocketChannel 的创建过程中,我们主要关注继承结构图中红框标注的三个类,其他的我们占时先不用管。

  其中AbstractNioMessageChannel类主要是对NioServerSocketChannel底层读写行为的封装和定义,比如accept接收客户端连接。这个我们后续会介绍到,这里我们并不展开。

  1.1.2 AbstractNioChannel

  

public abstract class AbstractNioChannel extends AbstractChannel {

 

   //JDK NIO原生Selectable Channel

   private final SelectableChannel ch;

   // Channel监听事件集合 这里是SelectionKey.OP_ACCEPT事件

   protected final int readInterestOp;

   protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {

   super(parent);

   this.ch = ch;

   this.readInterestOp = readInterestOp;

   try {

   //设置Channel为非阻塞 配合IO多路复用模型

   ch.configureBlocking(false);

   } catch (IOException e) {

   .............省略................

  

 

  
封装Channel在创建时指定感兴趣的IO事件,对于NioServerSocketChannel来说感兴趣的IO事件为OP_ACCEPT事件。

  
1.1.3 AbstractChannel

  

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

 

   //channel是由创建层次的,比如ServerSocketChannel 是 SocketChannel的 parent

   private final Channel parent;

   //channel全局唯一ID machineId+processId+sequence+timestamp+random

   private final ChannelId id;

   //unsafe用于封装对底层socket的相关操作

   private final Unsafe unsafe;

   //为channel分配独立的pipeline用于IO事件编排

   private final DefaultChannelPipeline pipeline;

   protected AbstractChannel(Channel parent) {

   this.parent = parent;

   //channel全局唯一ID machineId+processId+sequence+timestamp+random

   id = newId();

   //unsafe用于定义实现对Channel的底层操作

   unsafe = newUnsafe();

   //为channel分配独立的pipeline用于IO事件编排

   pipeline = newChannelPipeline();

  

 

  
Netty中的Channel创建是有层次的,这里的parent属性用来保存上一级的Channel,比如这里的NioServerSocketChannel是顶级Channel,所以它的parent = null。客户端NioSocketChannel是由NioServerSocketChannel创建的,所以它的parent = NioServerSocketChannel。

  
为Channel分配全局唯一的ChannelId。ChannelId由机器Id(machineId),进程Id(processId),序列号(sequence),时间戳(timestamp),随机数(random)构成

  


 private DefaultChannelId() {

 

   data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];

   int i = 0;

   // machineId

   System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);

   i += MACHINE_ID.length;

   // processId

   i = writeInt(i, PROCESS_ID);

   // sequence

   i = writeInt(i, nextSequence.getAndIncrement());

   // timestamp (kind of)

   i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());

   // random

   int random = PlatformDependent.threadLocalRandom().nextInt();

   i = writeInt(i, random);

   assert i == data.length;

   hashCode = Arrays.hashCode(data);

  

 

  创建NioServerSocketChannel的底层操作类Unsafe 。这里创建的是io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe。

  
Unsafe为Channel接口的一个内部接口,用于定义实现对Channel底层的各种操作,Unsafe接口定义的操作行为只能由Netty框架的Reactor线程调用,用户线程禁止调用。

  

interface Unsafe {

 

   //分配接收数据用的Buffer

   RecvByteBufAllocator.Handle recvBufAllocHandle();

   //服务端绑定的端口地址

   SocketAddress localAddress();

   //远端地址

   SocketAddress remoteAddress();

   //channel向Reactor注册

   void register(EventLoop eventLoop, ChannelPromise promise);

   //服务端绑定端口地址

   void bind(SocketAddress localAddress, ChannelPromise promise);

   //客户端连接服务端

   void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

   //关闭channle

   void close(ChannelPromise promise);

   //读数据

   void beginRead();

   //写数据

   void write(Object msg, ChannelPromise promise);

  

 

  为NioServerSocketChannel分配独立的pipeline用于IO事件编排。pipeline其实是一个ChannelHandlerContext类型的双向链表。头结点HeadContext,尾结点TailContext。ChannelHandlerContext中包装着ChannelHandler。

  
ChannelHandlerContext 保存 ChannelHandler上下文信息,用于事件传播。后面笔者会单独开一篇文章介绍,这里我们还是聚焦于启动主线。

  这里只是为了让大家简单理解pipeline的一个大致的结构,后面会写一篇文章专门详细讲解pipeline。

  

 protected DefaultChannelPipeline(Channel channel) {

 

   this.channel = ObjectUtil.checkNotNull(channel, "channel");

   succeededFuture = new SucceededChannelFuture(channel, null);

   voidPromise = new VoidChannelPromise(channel, true);

   tail = new TailContext(this);

   head = new HeadContext(this);

   head.next = tail;

   tail.prev = head;

  

 

  到了这里NioServerSocketChannel就创建完毕了,我们来回顾下它到底包含了哪些核心信息。

  1.2 初始化NioServerSocketChannel

  

 void init(Channel channel) {

 

   //向NioServerSocketChannelConfig设置ServerSocketChannelOption

   setChannelOptions(channel, newOptionsArray(), logger);

   //向netty自定义的NioServerSocketChannel设置attributes

   setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

   ChannelPipeline p = channel.pipeline();

   //获取从Reactor线程组

   final EventLoopGroup currentChildGroup = childGroup;

   //获取用于初始化客户端NioSocketChannel的ChannelInitializer

   final ChannelHandler currentChildHandler = childHandler;

   //获取用户配置的客户端SocketChannel的channelOption以及attributes

   final Entry ChannelOption ? , Object [] currentChildOptions;

   synchronized (childOptions) {

   currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);

   final Entry AttributeKey ? , Object [] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

   //向NioServerSocketChannel中的pipeline添加初始化ChannelHandler的逻辑

   p.addLast(new ChannelInitializer Channel () {

   @Override

   public void initChannel(final Channel ch) {

   final ChannelPipeline pipeline = ch.pipeline();

   //ServerBootstrap中用户指定的channelHandler

   ChannelHandler handler = config.handler();

   if (handler != null) {

   //LoggingHandler

   pipeline.addLast(handler);

   //添加用于接收客户端连接的acceptor

   ch.eventLoop().execute(new Runnable() {

   @Override

   public void run() {

   pipeline.addLast(new ServerBootstrapAcceptor(

   ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

  

 

  
Netty自定义的SocketChannel类型均继承AttributeMap接口以及DefaultAttributeMap类,正是它们定义了ChannelAttributes。用于向Channel添加用户自定义的一些信息。

  这个ChannelAttributes的用处大有可为,Netty后边的许多特性都是依靠这个ChannelAttributes来实现的。这里先卖个关子,大家可以自己先想一下可以用这个ChannelAttributes做哪些事情?

  
获取从Reactor线程组childGroup,以及用于初始化客户端NioSocketChannel的ChannelInitializer,ChannelOption,ChannelAttributes,这些信息均是由用户在启动的时候向ServerBootstrap添加的客户端NioServerChannel配置信息。这里用这些信息来初始化ServerBootstrapAcceptor。因为后续会在ServerBootstrapAcceptor中接收客户端连接以及创建NioServerChannel。

  
向NioServerSocketChannel中的pipeline添加用于初始化pipeline的ChannelInitializer。

  
问题来了,这里为什么不干脆直接将ChannelHandler添加到pipeline中,而是又使用到了ChannelInitializer呢?

  其实原因有两点:

  
为了保证线程安全地初始化pipeline,所以初始化的动作需要由Reactor线程进行,而当前线程是用户程序的启动Main线程 并不是Reactor线程。这里不能立即初始化。

  
初始化Channel中pipeline的动作,需要等到Channel注册到对应的Reactor中才可以进行初始化,当前只是创建好了NioServerSocketChannel,但并未注册到Main Reactor上。

  
初始化NioServerSocketChannel中pipeline的时机是:当NioServerSocketChannel注册到Main Reactor之后,绑定端口地址之前。

  
前边在介绍ServerBootstrap配置childHandler时也用到了ChannelInitializer,还记得吗??

  问题又来了,大家注意下ChannelInitializer#initChannel方法,在该初始化回调方法中,添加LoggingHandler是直接向pipeline中添加,而添加Acceptor为什么不是直接添加而是封装成异步任务呢?

  这里先给大家卖个关子,笔者会在后续流程中为大家解答~~~~~

  此时NioServerSocketChannel中的pipeline结构如下图所示:

  1.3 向Main Reactor注册NioServerSocketChannel

  从ServerBootstrap获取主Reactor线程组NioEventLoopGrou。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: