我为 Netty 贡献源码()

  本篇文章为你整理了我为 Netty 贡献源码()的详细内容,包含有 我为 Netty 贡献源码,希望能帮助你了解 我为 Netty 贡献源码。

  接下来,笔者就带大家从这三个连接关闭场景来全面分析下 Netty 是如何处理连接关闭的。

  首先我们来看下最简单的场景 --- 正常的TCP连接关闭。

  1. 正常 TCP 连接关闭

  在进入源码实现之前,我们先来回顾下 TCP 连接关闭的整个流程,其实 Netty 中针对连接关闭的整个源码实现流程也是按照图中 TCP 连接关闭的四次挥手步骤进行的。

  首先 Netty 客户端在对应的 ChannelHandler 中调用 ctx.channel().close() 方法主动关闭连接,内核会向服务端发送一个 FIN 包,随即客户端连接进入 FIN_WAIT1 状态。

  

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

 

   @Override

   public void channelReadComplete(ChannelHandlerContext ctx) {

   // 客户端连接进入 FIN_WAIT1 状态

   ctx.channel().close();

  

 

  
服务端内核协议栈在接收到客户端发送过来的 FIN 包后,会自动回复客户端一个 ACK 包,随后会将文件结束符 EOF 插入到 Socket 接收缓冲区中的末尾。服务端连接状态进入 CLOSE_WAIT ,客户端接收到 ACK 包后进入FIN_WAIT2 状态。

  
当服务端内核协议栈将 EOF 插入到 Socket 的接收缓冲区时,这时 OP_READ 事件活跃,Reactor 线程随即会处理 channel 上的 OP_READ 事件,只不过此时从 channel 中读取到的字节数为 -1 ,表示对端发起了 channel 关闭请求。服务端开始执行连接关闭流程。

  
由于客户端调用的是 ctx.channel().close() 方法来关闭连接,相当于将 TCP 连接的读写通道同时关闭,所以客户端在 FIN_WAIT2 状态下无法在接收服务端发送的数据,但此时服务端处于 CLOSE_WAIT 状态下仍可向客户端发送数据,只不过客户端在接收到数据后会丢弃并发送 RST 报文给服务端。

  
服务端在 CLOSE_WAIT 状态下,调用 ctx.channel().close() 向客户端发送 FIN 包,随即进入 LAST_ACK 状态。

  
客户端在收到来自服务端的 FIN 包后,回复 ACK 包给服务端,完成四次挥手,随即进入 TIME_WAIT 状态,服务端在收到客户端的 ACK 包后结束 LAST_ACK 状态进入 CLOSE 状态。

  
Netty 中对于连接关闭的处理主要在第 3 步和第 5 步,剩下的逻辑均由内核协议栈处理完成。

  从上述 TCP 关闭连接的四次挥手步骤中,我们可以看出 Netty 对于关闭连接的响应是通过处理 OP_READ 事件来完成的,而对于 OP_READ 事件的处理,笔者已经在 Netty如何高效接收网络数据 一文中详细介绍过了,这里我们直接来到 OP_READ 事件的处理函数中,聚焦于连接关闭逻辑的处理。

  当 Reactor 线程轮询到 Channel 上有 OP_READ 事件活跃时,就会来到 NioEventLoop#processSelectedKey 函数中去处理活跃的 IO 事件,在本文的语义中 OP_READ 事件就表示连接关闭事件。

  

public final class NioEventLoop extends SingleThreadEventLoop {

 

   private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

   final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

   .................省略..............

   try {

   int readyOps = k.readyOps();

   .................省略..............

   if ((readyOps (SelectionKey.OP_READ SelectionKey.OP_ACCEPT)) != 0 readyOps == 0) {

   //处理 OP_READ 事件,本文中表示连接关闭事件

   unsafe.read();

   } catch (CancelledKeyException ignored) {

   unsafe.close(unsafe.voidPromise());

  

 

  最终会在 AbstractNioByteChannel#read 方法中完成对 OP_READ 事件的处理,下图中置灰的逻辑处理模块即为 Netty 在整个 OP_READ 事件处理中关于连接关闭事件的处理位置。

  Netty 中关于 OP_READ 事件的处理一共分为两大模块,一块是针对接收连接上网络数据的处理。另一块则是本文的主题,针对连接关闭事件的处理。

  

public abstract class AbstractNioByteChannel extends AbstractNioChannel {

 

   @Override

   public final void read() {

   final ChannelConfig config = config();

   ..........省略连接半关闭处理........

   ..........省略获取allocHandle过程.......

   ByteBuf byteBuf = null;

   boolean close = false;

   try {

   do {

   byteBuf = allocHandle.allocate(allocator);

   //记录本次读取了多少字节数

   allocHandle.lastBytesRead(doReadBytes(byteBuf));

   //如果本次没有读取到任何字节,则退出循环 进行下一轮事件轮询

   // -1 表示客户端主动关闭了连接close或者shutdownOutput 这里均会返回-1

   if (allocHandle.lastBytesRead() = 0) {

   // nothing was read. release the buffer.

   byteBuf.release();

   byteBuf = null;

   //当客户端主动关闭连接时(客户端发送fin1),会触发read就绪事件,这里从channel读取的数据会是-1

   close = allocHandle.lastBytesRead()

   if (close) {

   // There is nothing left to read as we received an EOF.

   readPending = false;

   break;

   .........省略.............

   } while (allocHandle.continueReading());

   allocHandle.readComplete();

   pipeline.fireChannelReadComplete();

   if (close) {

   //此时客户端发送fin1(fi_wait_1状态)主动关闭连接,服务端接收到fin,并回复ack进入close_wait状态

   //在服务端进入close_wait状态 需要调用close 方法向客户端发送fin_ack,服务端才能结束close_wait状态

   closeOnRead(pipeline);

   } catch (Throwable t) {

   ............省略...............

   } finally {

   ............省略...............

  

 

  在前边 TCP 连接关闭的步骤 3 中我们提到,当服务端的内核协议栈接收到来自客户端的 FIN 包后,内核协议栈会向 Socket 的接收缓冲区插入文件结束符 EOF ,表示客户端已经主动发起了关闭连接流程,这时 NioSocketChannel 上的 OP_READ 事件活跃,随即 Reactor 线程会在 AbstractNioByteChannel#read 方法中处理 OP_READ 事件。

  

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

 

   @Override

   protected int doReadBytes(ByteBuf byteBuf) throws Exception {

   final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

   allocHandle.attemptedBytesRead(byteBuf.writableBytes());

   //读到EOF后,这里会返回-1

   return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());

  

 

  Reactor 线程会通过 ByteBuf#writeBytes 方法读取 NioSocketChannel 中的数据,由于此时底层 Socket 接收缓冲区中只有一个 EOF 并没有其他接收数据,所以这里的 ByteBuf#writeBytes 方法会返回 -1。表示客户端已经发起了连接关闭流程,此时服务端连接状态为 CLOSE_WAIT ,客户端连接状态为 FIN_WAIT2 。

  

 boolean close = false;

 

   close = allocHandle.lastBytesRead()

   if (close) {

   closeOnRead(pipeline);

  

 

  当本次 read loop 从 Channel 中读取到的字节数为 -1 时,则进入 closeOnRead 方法,服务端开始关闭连接流程。

  从上述 Netty 处理 TCP 正常关闭流程( Socket 接收缓冲区中只有 EOF ,没有其他正常接收数据)可以看出,这种情况下只会触发 ChannelReadComplete 事件而不会触发 ChannelRead 事件。

  2. Netty 对 TCP 连接正常关闭的处理

  

 private void closeOnRead(ChannelPipeline pipeline) {

 

   //判断服务端连接接收方向是否关闭,这里肯定是没有关闭的

   if (!isInputShutdown0()) {

   if (isAllowHalfClosure(config())) {

   .....省略TCP连接半关闭处理逻辑.......

   } else {

   //如果不支持半关闭,则服务端直接调用close方法向客户端发送fin,结束close_wait状态进如last_ack状态

   close(voidPromise());

   } else {

   .....省略TCP连接半关闭处理逻辑.......

  

 

  众所周知 TCP 是一个面向连接的、可靠的、基于字节流的全双工传输层通信协议,既然它是全双工的,那就意味着 TCP 连接同时有一个读通道和写通道。

  这里的 isInputShutdown0 方法是用来判断 TCP 连接上的读通道是否关闭,那么在当前情况下,服务端的读通道肯定还没有关闭,因为目前 Netty 还没有调用任何关闭连接的系统调用。

  

 @Override

 

   protected boolean isInputShutdown0() {

   return isInputShutdown();

   @Override

   public boolean isInputShutdown() {

   return javaChannel().socket().isInputShutdown() !isActive();

  

 

  至于这里为什么要对读通道是否关闭进行判断,笔者会在本文 TCP 连接半关闭相关处理章节为大家详细解释。

  由于本小节介绍的是 TCP 连接正常关闭的场景,并不是半关闭,所以这里的 isAllowHalfClosure = false 。Reactor 线程进入 close 方法,执行真正的关闭流程。

  2.1 close 方法发起 TCP 连接关闭流程

  

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

 

   @Override

   public void close(final ChannelPromise promise) {

   assertEventLoop();

   ClosedChannelException closedChannelException =

   StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");

   close(promise, closedChannelException, closedChannelException, false);

   private void close(final ChannelPromise promise, final Throwable cause,

   final ClosedChannelException closeCause, final boolean notify) {

   .........省略...........

  

 

  这里正是 netty 关闭 channel 的核心逻辑所在,而关闭 channel 的行为又分为主动关闭和被动关闭,如本例中,客户端主动调用 ctx.channel().close() 发起关闭流程为主动关闭方,而服务端则是被动关闭方。

  而主动关闭方和被动关闭方在这里的传参是不一样的,我们先来看被动关闭方也就是本例中服务端在调用 close 方法的传参。

  

 @Override

 

   public void close(final ChannelPromise promise) {

   assertEventLoop();

   ClosedChannelException closedChannelException =

   StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");

   close(promise, closedChannelException, closedChannelException, false);

  

 

  ChannelPromise promise:服务端作为被动关闭方,这里传入的 ChannelPromise 类型为 VoidChannelPromise ,表示调用方对处理结果并不关心,VoidChannelPromise 不可添加 Listener ,不可修改操作结果状态。

  

public final class VoidChannelPromise extends AbstractFuture Void implements ChannelPromise {

 

   @Override

   public VoidChannelPromise addListener(GenericFutureListener ? extends Future ? super Void listener) {

   fail();

   return this;

   @Override

   public boolean isDone() {

   return false;

   @Override

   public boolean setUncancellable() {

   return true;

   @Override

   public VoidChannelPromise setFailure(Throwable cause) {

   fireException0(cause);

   return this;

   @Override

   public boolean trySuccess() {

   return false;

  

 

  而作为主动关闭方的客户端则需要监听 Channel 关闭的结果,所以这里传递的 ChannelPromise 参数为 DefaultChannelPromise 。

  

 ChannelFuture channelFuture = ctx.channel().close();

 

   channelFuture.addListener(new ChannelFutureListener() {

   @Override

   public void operationComplete(ChannelFuture future) throws Exception {

   ...........省略.......

  

 

  

 @Override

 

   public ChannelFuture close() {

   return close(newPromise());

   @Override

   public ChannelPromise newPromise() {

   return new DefaultChannelPromise(channel(), executor());

  

 

  Throwable cause:当 Channel 关闭之后,需要清理 Channel 写入缓冲队列 ChannelOutboundBuffer 中的待发送数据,这里会将异常 cause 传递给用户的 writePromise ,通知用户 Channel 已经关闭,write 操作失败。这里传入的异常类型为 StacklessClosedChannelException 。

  如图中所示,当用户调用 ctx.writeAndFlush(msg) 发送数据时,由于是异步发送 Netty 会在图中的第 2 步直接返回一个 ChannelFuture 给用户,发送成功或者发送失败都会通知这个 ChannelFuture 。如果在数据发送之前连接就关闭了,那么 Netty 就会把 StacklessClosedChannelException 异常通知给用户持有的这个 ChannelFuture。相关数据的发送细节,感兴趣的读者可以在回顾下笔者的 一文搞懂 Netty 发送数据全流程 这篇文章。

  
ClosedChannelException closeCause:这个参数和 Throwable cause 参数的作用差不多,都是用于在连接关闭的时候如果此时还有待发送数据未发送。就通知用户这里在参数中指定的异常。唯一不同的是 Throwable cause 负责通知给 Channel 发送数据缓冲队列 ChannelOutboundBuffer 中的 flushedEntry 队列。ClosedChannelException closeCause 负责通知给 ChannelOutboundBuffer 中的 unflushedEntry 队列。

  这里大家只需要理解个大概,稍微有个印象就行,笔者后面还会详细介绍。

  
boolean notify:由于在关闭 Channel 之后,会清理 Channel 对应的发送缓冲队列 ChannelOutboundBuffer 中存储的待发送数据,同时也会释放其中用于存储待发送数据用的 ByteBuffer ,当 ChannelOutboundBuffer 中的内存占用低于低水位线的时候,会触发 ChannelWritabilityChanged 事件。这里的参数 boolean notify 决定是否触发 ChannelWritabilityChanged 事件,由于当前是关闭操作,所以 notify = false ,不需要触发 ChannelWritabilityChanged 事件。

  在介绍完 close 方法的各个参数之后,接下来我们来看一下具体的关闭逻辑:

  2.1.1 连接关闭之前的校验工作

  

 // channel的关闭流程是否已经开始

 

   private boolean closeInitiated;

   // 关闭channel操作的指定future,来判断关闭流程进度 每个channel对应一个CloseFuture

   // 连接关闭之后,netty 会通知这个CloseFuture

   private final CloseFuture closeFuture = new CloseFuture(this);

   private void close(final ChannelPromise promise, final Throwable cause,

   final ClosedChannelException closeCause, final boolean notify) {

   if (!promise.setUncancellable()) {

   //关闭操作如果被取消则直接返回

   return;

   if (closeInitiated) {

   //如果此时channel已经开始关闭流程,则进入这里

   if (closeFuture.isDone()) {

   //如果channel已经关闭 则设置promise为success,如果promise是voidPromise类型则会跳过

   safeSetSuccess(promise);

   } else if (!(promise instanceof VoidChannelPromise)) {

   //如果promise不是voidPromise,则会在关闭完成后 通过closeFuture设置promise success

   closeFuture.addListener(new ChannelFutureListener() {

   @Override

   public void operationComplete(ChannelFuture future) throws Exception {

   promise.setSuccess();

   // 直接返回,防止重复关闭

   return;

   //当前channel现在开始进入正在关闭状态

   closeInitiated = true;

   .......关闭channel.........

  

 

  Netty 这里使用一个 boolean closeInitiated 变量来防止 Reactor 线程来重复执行关闭流程,因为 Channel 的关闭操作可以在多个业务线程中发起,这样就会导致多个业务线程向 Reactor 线程提交多个关闭 Channel 的任务。

  除此之外,Netty 还为每一个 Channel 创建了一个 CloseFuture closeFuture,用来表示 Channel 关闭的相关进度状态。当 Channel 完成关闭后,Netty 会设置 closeFuture 为 success 状态,并通知 closeFuture 上注册的 listener 。

  如果 closeInitiated == true 说明当前 Channel 的关闭操作已经开始,如果有多个业务线程先后提交过来多个关闭任务,Reactor 线程则会首先通过 closeFuture.isDone() 判断当前 Channel 是否已经完成关闭 ,如果 Channel 已经关闭,则会在 closeFuture 上注册的 listener 中设置关闭任务对应的 Promie 为 success ,进而通知到业务线程。

  

 protected final void safeSetSuccess(ChannelPromise promise) {

 

   if (!(promise instanceof VoidChannelPromise) !promise.trySuccess()) {

   logger.warn("Failed to mark a promise as success because it is done already: {}", promise);

  

 

  从这里也可以看出 VoidChannelPromise 表示一个空的 Promise ,不能对其设置 success 或者 fail , 更不能对其添加 listener 。一般用于不关心操作结果的场景。

  如果此时 Channel 的关闭流程虽然已经开始但还未完成的情况下,则将关闭任务对应 Promise (在业务线程中持有)的通知动作封装成 ChannelFutureListener 并添加到 closeFuture 中。当 Channel 关闭后,closeFuture 会被设置为 success ,并通知其中注册的 ChannelFutureListener 。

  2.1.2 Channel关闭前的准备工作

  

 private void close(final ChannelPromise promise, final Throwable cause,

 

   final ClosedChannelException closeCause, final boolean notify) {

   ...........省略连接关闭之前的校验工作........

   //当前channel是否active,这里肯定是active的

   final boolean wasActive = isActive();

   final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

   //将channel对应的写缓冲区channelOutboundBuffer设置为null 表示channel要关闭了,不允许继续发送数据

   //此时如果还在write数据,则直接释放bytebuffer,并立马 fail 相关writeFuture 并抛出newClosedChannelException异常

   //此时如果执行flush,则会直接返回

   this.outboundBuffer = null;

   //如果开启了SO_LINGER,则需要先将channel从reactor中取消掉。避免reactor线程空转浪费cpu

   Executor closeExecutor = prepareToClose();

   .............省略关闭Channel逻辑流程.......

  

 

  通过 isActive() 获取 Channel 的状态 boolean wasActive ,由于此时我们还没有关闭 Channel,所以 Channel 现在的状态肯定是 active 的。之所以在关闭流程的一开始就获取 Channel 是否 active 的状态,是因为当我们关闭 Channel 之后,需要通过这个状态来判断 channel 是否是第一次从 active 变为 inactive ,如果是第一次,则会触发 ChannelInactive 事件在 Channel 对应的 pipeline 中传播。

  在 Channel 关闭之前,还会将 Channel 对应的写入缓冲队列 ChannelOutboundBuffer 设置为 null ,表示 Channel 即将要关闭了,不允许业务线程在继续发送数据。

  在 一文搞懂 Netty 发送数据全流程 一文中我们提到过,如果 Channel 准备关闭的时候,用户还在向 Channel 写入数据,则直接释放 bytebuffer ,并立马 fail 掉相关 ChannelPromise 并抛出 newClosedChannelException 异常。

  

 @Override

 

   public final void write(Object msg, ChannelPromise promise) {

   assertEventLoop();

   //获取当前channel对应的待写入数据缓冲队列(支持用户异步写入的核心关键)

   ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

   // outboundBuffer == null说明channel准备关闭了,直接标记发送失败。

   if (outboundBuffer == null) {

   try {

   ReferenceCountUtil.release(msg);

   } finally {

   safeSetFailure(promise,

   newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));

   return;

   .............省略.........

  

 

  如果此时用户还在执行 Channel 的 flush 操作发送数据,那么发送流程直接会 return 掉,停止发送数据。

  

 @Override

 

   public final void flush() {

   assertEventLoop();

   ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

   //channel以关闭

   if (outboundBuffer == null) {

   return;

   .........省略........

  

 

  2.1.3 针对 SO_LINGER 选项的处理

  

 @Override

 

   protected Executor prepareToClose() {

   try {

   if (javaChannel().isOpen() config().getSoLinger() 0) {

   //在设置SO_LINGER后,channel会延时关闭,在延时期间我们仍然可以进行读写,这样会导致io线程eventloop不断的循环浪费cpu资源

   //所以需要在延时关闭期间 将channel注册的事件全部取消。

   doDeregister();

   * 设置了SO_LINGER,不管是阻塞socket还是非阻塞socket,在关闭的时候都会发生阻塞,所以这里不能使用Reactor线程来

   * 执行关闭任务,否则Reactor线程就会被阻塞。

   * */

   return GlobalEventExecutor.INSTANCE;

   } catch (Throwable ignore) {

   //在没有设置SO_LINGER的情况下,可以使用Reactor线程来执行关闭任务

   return null;

  

 

  要理解这段逻辑,首先我们需要理解 SO_LINGER 这个 Socket 选项,他会影响 Socket 的关闭行为。

  在默认情况下,当我们调用 Socket 的 close 方法后 ,close 方法会立即返回,剩下的事情会交给内核协议栈帮助我们处理,如果此时 Socket 对应的发送缓冲区还有数据待发送,接下来内核协议栈会将 Socket 发送缓冲区的数据发送出去,随后会向对端发送 FIN 包关闭连接。注意:此时应用程序是无法感知到这些数据是否已经发送到对端的,因为应用程序在调用 close 方法后就立马返回了,剩下的这些都是内核在替我们完成。接着主动关闭方就进入了 TCP 四次挥手的关闭流程最后进入TIME_WAIT状态。

  而 SO_LINGER 选项会控制调用 close 方法关闭 Socket 的行为。

  

 struct linger {

 

   int l_onoff; // linger active

   int l_linger; // how many seconds to linger for

  

 

  
int l_linger:如果开启了 SO_LINGER 选项,则该参数表示应用程序调用 close 方法后需要阻塞等待多长时间。单位为秒。

  
l_onoff = 1,l_linger 0:这种情况下,应用程序调用 close 方法后就不会立马返回,无论 Socket 是阻塞模式还是非阻塞模式,应用程序都会阻塞在这里。直到以下两个条件其中之一发生,才会解除阻塞返回。随后进行正常的四次挥手关闭流程。

  当 Socket 发送缓冲区的数据全部发送出去,并等到对端 ACK 后,close 方法返回。

  应用程序在 close 方法上的阻塞时间到达 l_linger 设置的值后,close 方法返回。

  
 

  l_onoff = 1,l_linger = 0:这种情况下,当应用程序调用 close 方法后会立即返回,随后内核直接清空 Socket 的发送缓冲区,并向对端发送 RST 包,主动关闭方直接跳过四次挥手进入 CLOSE 状态,注意这种情况下是不会有 TIME_WAIT 状态的。

  Netty 也提供了 SO_LINGER 选项的设置,由于一般关闭连接的行为都是由客户端发起,我们以 Netty 客户端代码为例说明:

  

public final class EchoClient {

 

   EventLoopGroup group = new NioEventLoopGroup();

   try {

   Bootstrap b = new Bootstrap();

   b.group(group)

   .channel(NioSocketChannel.class)

   .option(ChannelOption.SO_LINGER, 2)

   ..........省略........

  

 

  

public class DefaultSocketChannelConfig extends DefaultChannelConfig

 

   implements SocketChannelConfig {

   @Override

   public SocketChannelConfig setSoLinger(int soLinger) {

   try {

   if (soLinger 0) {

   javaSocket.setSoLinger(false, 0);

   } else {

   javaSocket.setSoLinger(true, soLinger);

   } catch (SocketException e) {

   throw new ChannelException(e);

   return this;

  

 

  默认情况下 SO_LINGER 选项是关闭的,在 JDK 底层设置 SO_LINGER 选项的方法 setSoLinger 中,参数 on 对应 l_onoff ,参数 linger 对应 l_linger ,单位为秒。

  

public void setSoLinger(boolean on, int linger) throws SocketException 

 

  

 

  当我们理解了 SO_LINGER 选项的工作原理及其应用之后,现在回过头来在看 prepareToClose 方法的逻辑就很容易理解了。

  

 @Override

 

   protected Executor prepareToClose() {

   try {

   if (javaChannel().isOpen() config().getSoLinger() 0) {

   //在设置SO_LINGER后,channel会延时关闭,在延时期间我们仍然可以进行读写,这样会导致io线程eventloop不断的循环浪费cpu资源

   //所以需要在延时关闭期间 将channel注册的事件全部取消。

   doDeregister();

   * 设置了SO_LINGER,不管是阻塞socket还是非阻塞socket,在关闭的时候都会发生阻塞,所以这里不能使用Reactor线程来

   * 执行关闭任务,否则Reactor线程就会被阻塞。

   * */

   return GlobalEventExecutor.INSTANCE;

   } catch (Throwable ignore) {

   //在没有设置SO_LINGER的情况下,可以使用Reactor线程来执行关闭任务

   return null;

  

 

  首先我们来关注下 prepareToClose 方法的返回值,它会返回一个 Executor ,这个 Executor 用于执行真正的 Channel 关闭任务。

  大家这里可能会有疑问,Channel 上的 IO 操作之前不都是由 Reactor 线程负责执行吗?为什么这里需要用一个单独的 Executor 来执行呢?

  原因就是如果我们设置了 SO_LINGER 选项 config().getSoLinger() 0 ,如果继续采用 Reactor 线程执行 Channel 关闭的动作,那么在这种情况下底层Socket 的 close 方法会阻塞 Reactor 线程,直到 Socket 发送缓冲区中的数据全部发送出去并收到对端 ACK ,或者 linger 指定的超时时间到达。

  由于 Reactor 线程负责多个 Channel 上的 IO 处理,如果被阻塞在这里,就会影响其他 Channel 上的 IO 处理,降低吞吐。所以当我们设置了 SO_LINGER 选项时,就不能使用 Reactor 线程来执行 Channel 关闭的动作,而是用GlobalEventExecutor.INSTANCE来负责执行 Channel 的关闭动作。

  如果我们没有设置 SO_LINGER 选项,底层 Socket 的 close 方法会立即返回并不会阻塞,所以这种情况下,依然会使用 Reactor 线程来执行 Channel 的关闭动作。

  prepareToClose 方法这种情况下会返回 null ,表示默认采用 Reactor 线程来执行 Channel 的关闭。

  这里还有一个重要的点需要和大家强调的是,当我们设置了 SO_LINGER 选项之后,Channel 的关闭动作会被阻塞并延时关闭,在延时关闭期间,Reactor 线程依然可以响应 OP_READ 事件和 OP_WRITE 事件,这可能会导致 Reactor 线程不断的自旋循环浪费 CPU 资源,所以基于这个原因,netty 这里需要将 Channel 从 Reactor 上注销掉。这样 Reactor 线程就不会在响应 Channel 上的 IO 事件了。

  2.1.4 doDeregister 注销 Channel

  

public abstract class AbstractNioChannel extends AbstractChannel {

 

   //channel注册到Selector后获得的SelectKey

   volatile SelectionKey selectionKey;

   @Override

   protected void doDeregister() throws Exception {

   eventLoop().cancel(selectionKey());

   protected SelectionKey selectionKey() {

   assert selectionKey != null;

   return selectionKey;

  

 

  

public final class NioEventLoop extends SingleThreadEventLoop {

 

   //记录socketChannel从Selector上注销的个数 达到256个 则需要将无效selectKey从SelectedKeys集合中清除掉

   private int cancelledKeys;

   private static final int CLEANUP_INTERVAL = 256;

   * 将socketChannel从selector中注销 取消监听IO事件

   * */

   void cancel(SelectionKey key) {

   key.cancel();

   cancelledKeys ++;

   // 当从selector中注销的socketChannel数量达到256个,设置needsToSelectAgain为true

   // 在io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain 中重新做一次轮询,将失效的selectKey移除,

   // 以保证selectKeySet的有效性

   if (cancelledKeys = CLEANUP_INTERVAL) {

   cancelledKeys = 0;

   needsToSelectAgain = true;

  

 

  Channel 在向 Reactor 中的 Selector 注册成功后,会得到一个 SelectionKey 。这个 SelectionKey 可以理解成 Channel 在 Selector 中的模型。

  当 Channel 需要将自己从 Selector 中注销掉时,直接可以通过调用对应的 SelectionKey#cancel 方法。此时调用 SelectionKey#isValid 将会返回 false 。

  SelectionKey#cancel 方法调用后,Selector 会将要取消的这个 SelectionKey 加入到 Selector 中的 cancelledKeys 集合中。

  

public abstract class AbstractSelector extends Selector {

 

   private final Set SelectionKey cancelledKeys = new HashSet SelectionKey

   void cancel(SelectionKey k) {

   synchronized (cancelledKeys) {

   cancelledKeys.add(k);

  

 

  随后在 Selector 的下一次轮询过程中,会将 cancelledKeys 集合中的 SelectionKey 从 Selector 中所有的 KeySet 中移除。这里的 KeySet 包括Selector用于存放 IO 就绪 SelectionKey 的 selectedKeys 集合,以及用于存放所有在 Selector 上注册的 Channel 对应 SelectionKey 的 keys 集合。

  

public abstract class SelectorImpl extends AbstractSelector {

 

   protected Set SelectionKey selectedKeys = new HashSet();

   protected HashSet SelectionKey keys = new HashSet();

   .....................省略...............

  

 

  这里需要注意的是当我们调用 SelectionKey#cancel 方法后,该 SelectionKey 并不会立马从 Selector 中删除,只不过此时调用 SelectionKey#isValid 方法会返回 false 。需要等到下次轮询 selector.selectNow() 的时候,被取消掉的 SelectionKey 才会从 Selector 中被删除掉。

  当在本次轮询期间,假如有大量的 Channel 从 Selector 中注销,就绪集合 selectedKeys 中依然会保存这些 Channel 对应 SelectionKey 直到下次轮询。那么当然会影响本次轮询结果 selectedKeys 的有效性,增加了许多不必要的遍历开销。

  所以 netty 在 NioEventLoop#cancel 方法中做了一个优化来保证 Selector 中的 IO 就绪集合 selectedKeys 的有效性,当 Selector 中注销的 Channel 数量 cancelledKeys 超过 CLEANUP_INTERVAL = 256 个时,就会将 needsToSelectAgain 标志设置为 true 。

  

 private void processSelectedKeysOptimized() {

 

   for (int i = 0; i selectedKeys.size; ++i) {

   ......循环处理Selector中的IO就绪集合selectedKeys.....

   if (needsToSelectAgain) {

   selectedKeys.reset(i + 1);

   selectAgain();

   i = -1;

  

 

  当 Reactor 线程在循环遍历处理 Selector 中的 IO 活跃 Channel 时,如果
 

  needsToSelectAgain = true ,那么就会立马执行一次 selector.selectNow() ,目的就是为了清除 Selector 中已经注销的 Selectionkey ,从而保证IO就绪集合 selectedKeys 的有效性。

  

 private void selectAgain() {

 

   needsToSelectAgain = false;

   try {

   selector.selectNow();

   } catch (Throwable t) {

   logger.warn("Failed to update SelectionKeys.", t);

  

 

  2.1.5 Channel 的关闭

  prepareToClose 方法返回的 closeExecutor 是用来执行 Channel 关闭操作的,当我们开启了 SO_LINGER 选项时,closeExecutor = GlobalEventExecutor.INSTANCE ,避免了 Reactor 线程的阻塞。

  由 GlobalEventExecutor 负责执行 doClose0 方法关闭 Channel 底层的 Socket,并通知 closeFuture 关闭结果。

  

 private void close(final ChannelPromise promise, final Throwable cause,

 

   final ClosedChannelException closeCause, final boolean notify) {

   ...........省略重进入关闭流程处理........

   ...........省略Channel关闭前的准备工作........

   Executor closeExecutor = prepareToClose();

   if (closeExecutor != null) {

   closeExecutor.execute(new Runnable() {

   @Override

   public void run() {

   try {

   // 在GlobalEventExecutor中执行channel的关闭任务,设置closeFuture,promise success

   doClose0(promise);

   } finally {

   // reactor线程中执行

   invokeLater(new Runnable() {

   @Override

   public void run() {

   if (outboundBuffer != null) {

   // cause = closeCause = ClosedChannelException, notify = false

   // 此时channel已经关闭,需要清理对应channelOutboundBuffer中的待发送数据flushedEntry

   outboundBuffer.failFlushed(cause, notify);

   //循环清理channelOutboundBuffer中的unflushedEntry

   outboundBuffer.close(closeCause);

   //这里的active = true

   //关闭channel后,会将channel从reactor中注销,首先触发ChannelInactive事件,然后触发ChannelUnregistered

   fireChannelInactiveAndDeregister(wasActive);

   } else {

   ...........省略在Reactor中Channel关闭的逻辑........

  

 

  当 Channel 的关闭操作在 closeExecutor 线程中执行完毕之后,此时 Channel 从物理上就已经关闭了,但是 Channel 中还有一些遗留的东西需要清理,比如 Channel 对应的写入缓冲队列 ChannelOutboundBuffer 中的待发送数据需要被清理掉,并通知用户线程由于 Channel 已经关闭,导致数据发送失败。

  同时 Netty 也需要让用户感知到 Channel 已经关闭的事件,所以还需要在关闭 Channel 对应的 pipeline 中触发 ChannelInactive 事件和 ChannelUnregistered 事件。

  而以上列举的这两点清理 Channel 的相关工作则需要在 Reactor 线程中完成,不能在 closeExecutor 线程中完成。这是处于线程安全的考虑,因为在 Channel 关闭之前,对于 ChannelOutboundBuffer 以及 pipeline 的操作均是由 Reactor 线程来执行的,Channel 关闭之后相关的清理工作理应继续由 Reactor 线程负责,避免多线程执行产生线程安全问题。

  2.1.5.1 doClose0 关闭 Channel

  

 // 关闭channel操作的指定future,来判断关闭流程进度 每个channel一个

 

   private final CloseFuture closeFuture = new CloseFuture(this);

   private void doClose0(ChannelPromise promise) {

   try {

   // 关闭channel,此时服务端向客户端发送fin2,服务端进入last_ack状态,客户端收到fin2进入time_wait状态

   doClose();

   // 设置clostFuture的状态为success,表示channel已经关闭

   // 调用shutdownOutput则不会通知closeFuture

   closeFuture.setClosed();

   // 通知用户promise success,关闭操作已经完成

   safeSetSuccess(promise);

   } catch (Throwable t) {

   closeFuture.setClosed();

   // 通知用户线程关闭失败

   safeSetFailure(promise, t);

  

 

  首先调用 doClose() 方法关闭底层 JDK 中的 SocketChannel 。

  

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

 

   @Override

   protected void doClose() throws Exception {

   super.doClose();

   javaChannel().close();

  

 

  这里大家需要注意的一个点是,在 JDK 底层 SocketChannel 的关闭方法中,同样也会将该 Channel 关联的所有 SelectionKey 取消掉。因为在 prepareToClose 方法中我们提到,只有我们设置了 SO_LINGER 选项时,才会在 prepareToClose 方法中调用 doDeregister 方法将 Channel 关联的 SelectionKey 从 Selector 中取消掉。

  而当我们没有设置 SO_LINGER 选项时,则不会提前调用 doDeregister 方法取消。所以需要在这里真正关闭 Channel 的地方,将其关联的所有 SelectionKey 取消掉。

  

 public final void close() throws IOException {

 

   synchronized (closeLock) {

   if (!open)

   return;

   open = false;

   implCloseChannel();

   protected final void implCloseChannel() throws IOException {

   implCloseSelectableChannel();

   synchronized (keyLock) {

   int count = (keys == null) ? 0 : keys.length;

   //关闭与该Channel相关的所有SelectionKey

   for (int i = 0; i count; i++) {

   SelectionKey k = keys[i];

   if (k != null)

   k.cancel();

  

 

  当我们调用了 doClose() 方法后,此时服务端的内核协议栈就会向客户端发出 FIN 包,服务端结束 CLOSE_WAIT 状态进入 LAST_ACK 状态。客户端收到服务端的 FIN 包后,向服务端回复 ACK 包,随后客户端进入 TIME_WAIT 状态。服务端收到客户端的 ACK 包后结束 LAST_ACK 状态进入 CLOSE 状态。

  当调用 doClose() 完成 Channel 的关闭后,就会调用 closeFuture.setClosed() 通知 Channel 的 closeFuture 关闭成功。

  

static final class CloseFuture extends DefaultChannelPromise {

 

   boolean setClosed() {

   return super.trySuccess();

  

 

  随后调用 safeSetSuccess(promise) 通知用户的 promise 关闭成功。

  2.1.5.2 清理 ChannelOutboundBuffer

  这里大家需要注意:清空 ChannelOutboundBuffer 的操作是在 Reactor 线程中执行的。

  

 if (outboundBuffer != null) {

 

   // Fail all the queued messages

   // cause = closeCause = ClosedChannelException, notify = false

   // 此时channel已经关闭,需要清理对应channelOutboundBuffer中的待发送数据flushedEntry

   outboundBuffer.failFlushed(cause, notify);

   //循环清理channelOutboundBuffer中的unflushedEntry

   outboundBuffer.close(closeCause);

  

 

  当 Channel 关闭之后,此时 Channel 中的写入缓冲队列 ChannelOutboundBuffer 中可能会有一些待发送数据,这时就需要将这些待发送数据从 ChannelOutboundBuffer 中清除掉。

  通过调用 ChannelOutboundBuffer#failFlushed 方法,循环遍历 flushedEntry 指针到 tailEntry 指针之间的 Entry 对象,将其从 ChannelOutboundBuffer 链表中删除,并释放 Entry 对象中封装的 byteBuffer ,通知用户的 promise 写入失败。并回收 Entry 对象实例。

  

public final class ChannelOutboundBuffer {

 

   void failFlushed(Throwable cause, boolean notify) {

   if (inFail) {

   return;

   try {

   inFail = true;

   for (;;) {

   // 循环清除channelOutboundBuffer中的待发送数据

   // 将entry从buffer中删除,并释放entry中的bytebuffer,通知promise failed

   if (!remove0(cause, notify)) {

   break;

   } finally {

   inFail = false;

   private boolean remove0(Throwable cause, boolean notifyWritability) {

   Entry e = flushedEntry;

   if (e == null) {

   //清空当前reactor线程缓存的所有待发送数据

   clearNioBuffers();

   return false;

   Object msg = e.msg;

   ChannelPromise promise = e.promise;

   int size = e.pendingSize;

   //从channelOutboundBuffer中删除该Entry节点

   removeEntry(e);

   if (!e.cancelled) {

   // only release message, fail and decrement if it was not canceled before.

   //释放msg所占用的内存空间

   ReferenceCountUtil.safeRelease(msg);

   //编辑promise发送失败,并通知相应的Lisener

   safeFail(promise, cause);

   //由于msg得到释放,所以需要降低channelOutboundBuffer中的内存占用水位线,并根据notifyWritability决定是否触发ChannelWritabilityChanged事件

   decrementPendingOutboundBytes(size, false, notifyWritability);

   // recycle the entry

   //回收Entry实例对象

   e.recycle();

   return true;

  

 

  在 remove0 方法中 netty 会将已经关闭的 Channel 对应的 ChannelOutboundBuffer 中还没来得及 flush 进 Socket 发送缓存区中的数据全部清除掉。这部分数据就是上图中 flushedEntry 指针到 tailEntry 指针之间的 Entry对象。

  Entry 对象中封装了用户待发送数据的 ByteBuffer,以及用于通知用户发送结果的 promise 实例。

  这里需要将这些还未来得及 flush 的 Entry 节点从 ChannelOutboundBuffer 中全部清除,并释放这些 Entry 节点中包裹的发送数据 msg 所占用的内存空间。并标记对应的 promise 为失败同时通知对应的用户 listener 。

  以上的清理逻辑主要是应对在 Channel 即将关闭之前,用户极限调用 flush 操作想要发送数据的情况。

  另外还有一种情况 Netty 这里需要考虑处理,由于在关闭 Channel 之前,用户可能还会向 ChannelOutboundBuffer 中 write 数据,但还未来得及调用 flush 操。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: