分布式netty服务端,netty handler传递数据
目录
处理接入事件创建处理我们看其RecvByteBufAllocator接口跟进newHandle()方法中继续回到阅读()方法我们跟进重置中前文传送门:客户端接入流程初始化源码分析
上一小节我们剖析完成了与引导绑定的频道配置初始化相关的流程,
这一小节继续剖析客户端连接事件的处理
处理接入事件创建handle
回到上一章NioEventLoop的processSelectedKey()方法
private void processSelectedKey(选择密钥k,AbstractNioChannel ch) { //获取到引导中的不安全的最终摘要频道.nio unsafe。unsafe();//如果这个键不是合法的,说明这个引导可能有问题如果(!k.isValid()) { //代码省略}试试{ //如果是合法的,拿到键的超正析象管事件int ready ops=k . ready ops();//链接事件if ((readyOps SelectionKey .OP_CONNECT)!=0){ int ops=k . interest ops();ops=~SelectionKey .OP _ CONNECTk。利息运营(ops);不安全。完成连接();} //写事件if ((readyOps SelectionKey .OP_WRITE)!=0) { ch.unsafe().强制刷新();} //读事件和接受链接事件//如果当前NioEventLoop是工作线程的话,这里就是操作_读取事件//如果是当前NioEventLoop是老板线程的话,这里就是op _接受事件if ((readyOps (SelectionKey .OP_READ 选择键. OP_ACCEPT))!=0 就绪操作==0){不安全。read();如果(!栗色iso pen()){ return;} } } catch (CancelledKeyException忽略){不安全。关闭(不安全。void无极());}}我们看其中的如果判断:
if ((readyOps (SelectionKey .OP_READ 选择键. OP_ACCEPT))!=0 readyOps==0)上一小节我们分析过,如果当前NioEventLoop是工作线程的话,这里就是操作_读取事件,如果是当前NioEventLoop是老板线程的话,这里就是op _接受事件,这里我们以老板线程为例进行分析
之前我们讲过,无论处理操作_读取事件还是op _接受事件,都走的危险的的阅读()方法,这里危险的是通过引导拿到,我们知道如果是处理接受事件,这里的引导是NioServerSocketChannel,这里与之绑定的危险的是NioMessageUnsafe
我们跟到NioMessageUnsafe的阅读()方法:
public void read() { //必须是NioEventLoop方法调用的,不能通过外部线程调用断言事件循环().inEventLoop();//服务端引导的config最终通道配置config=config();//服务端引导的管道最终通道管道管道=管道();//处理服务端接入的速率最终RecvByteBufAllocator .Handle allocHandle=unsafe().recvBufAllocHandle();//设置配置全部
ocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //创建jdk底层的channel //readBuf用于临时承载读到链接 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器将读到的链接进行计数 allocHandle.incMessagesRead(localRead); //连接数是否超过最大值 } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍历每一条客户端连接 for (int i = 0; i < size; i ++) { readPending = false; //传递事件, 将创建NioSokectChannel进行传递 //最终会调用ServerBootstrap的内部类ServerBootstrapAcceptor的channelRead()方法 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代码省略 } finally { //代码省略 }}首先获取与NioServerSocketChannel绑定config和pipeline, config我们上一小节进行分析过, pipeline我们将在下一章进行剖析
我们看这一句:
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
这里通过RecvByteBufAllocator接口调用了其内部接口Handler
我们看其RecvByteBufAllocator接口
public interface RecvByteBufAllocator { Handle newHandle(); interface Handle { int guess(); void reset(ChannelConfig config); void incMessagesRead(int numMessages); void lastBytesRead(int bytes); int lastBytesRead(); void attemptedBytesRead(int bytes); int attemptedBytesRead(); boolean continueReading(); void readComplete(); }}
我们看到RecvByteBufAllocator接口只有一个方法newHandle(), 顾名思义就是用于创建Handle对象的方法, 而Handle中的方法, 才是实际用于操作的方法
在RecvByteBufAllocator实现类中包含Handle的子类, 具体实现关系如下:
回到read()方法中再看这段代码:
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
unsafe()返回当前channel绑定的unsafe对象, recvBufAllocHandle()最终会调用AbstractChannel内部类AbstractUnsafe的recvBufAllocHandle()方法
跟进AbstractUnsafe的recvBufAllocHandle()方法:
public RecvByteBufAllocator.Handle recvBufAllocHandle() { //如果不存在, 则创建一个recvHandle的实例 if (recvHandle == null) { recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle;}
如果如果是第一次执行到这里, 自身属性recvHandle为空, 会创建一个recvHandle实例, config()返回NioServerSocketChannel绑定的ChannelConfig, getRecvByteBufAllocator()获取其RecvByteBufAllocator对象, 这两部分上一小节剖析过了, 这里通过newHandle()创建一个Handle, 这里会走到AdaptiveRecvByteBufAllocator类中的newHandle()方法中
跟进newHandle()方法中
public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial);}
这里创建HandleImpl传入了三个参数, 这三个参数我们上一小节剖析过, minIndex为最小内存在SIZE_TABLE中的下标, maxIndex为最大内存在SEIZE_TABEL中的下标, initial是初始内存, 我们跟到HandleImpl的构造方法中:
public HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex(initial); nextReceiveBufferSize = SIZE_TABLE[index];}
初始化minIndex和maxIndex, 根据initial找到当前的下标, nextReceiveBufferSize是根据当前的下标找到对应的内存
这样, 我们就创建了个Handle对象
在这里我们需要知道, 这个handle, 是和channel唯一绑定的属性, 而AdaptiveRecvByteBufAllocator对象是和ChannelConfig对象唯一绑定的, 间接也是和channel进行唯一绑定
继续回到read()方法
public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel的pipeline final ChannelPipeline pipeline = pipeline(); //处理服务端接入的速率 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //设置配置 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //创建jdk底层的channel //readBuf用于临时承载读到链接 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器将读到的链接进行计数 allocHandle.incMessagesRead(localRead); //连接数是否超过最大值 } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍历每一条客户端连接 for (int i = 0; i < size; i ++) { readPending = false; //传递事件, 将创建NioSokectChannel进行传递 //最终会调用ServerBootstrap的内部类ServerBootstrapAcceptor的channelRead()方法 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代码省略 } finally { //代码省略 }}
继续往下跟:
allocHandle.reset(config);
这个段代码是重新设置配置, 也就是将之前的配置信息进行初始化, 最终会走到, DefaultMaxMessagesRecvByteBufAllocator中的内部类MaxMessageHandle的reet中
我们跟进reset中
public void reset(ChannelConfig config) { this.config = config; maxMessagePerRead = maxMessagesPerRead(); totalMessages = totalBytesRead = 0;}
这里仅仅对几个属性做了赋值, 简单介绍下这几个属性:
config
:当前channelConfig对象
maxMessagePerRead
:表示读取消息的时候可以读取几次(循环次数), maxMessagesPerRead()返回的是RecvByteBufAllocator的maxMessagesPerRead属性, 上一小节已经做过剖析
totalMessages
:代表目前读循环已经读取的消息个数, 在NIO传输模式下也就是已经执行的循环次数, 这里初始化为0
totalBytesRead
:代表目前已经读取到的消息字节总数, 这里同样也初始化为0
我们继续往下走, 这里首先是一个do-while循环, 循环体里通过int localRead = doReadMessages(readBuf)这种方式将读取到的连接数放入到一个List集合中, 这一步我们下一小节再分析, 我们继续往下走:
我们首先看allocHandle.incMessagesRead(localRead)这一步, 这里的localRead表示这次循环往readBuf中放入的连接数, 在Nio模式下这, 如果读取到一条连接会返回1
跟到中的MaxMessageHandle的incMessagesRead(int amt)方法中:
public final void incMessagesRead(int amt) { totalMessages += amt;}
这里将totalMessages增加amt, 也就是+1
这里totalMessage, 刚才已经剖析过, 在NIO传输模式下也就是已经执行的循环次数, 这里每次执行一次循环都会加一
再去看循环终止条件allocHandle.continueReading()
跟到MaxMessageHandle的continueReading()方法中:
public boolean continueReading() { //config.isAutoRead()默认返回true // totalMessages < maxMessagePerRead //totalMessages代表当前读到的链接, 默认是1 //maxMessagePerRead每一次最大读多少链接(默认16) return config.isAutoRead() && attemptedBytesRead == lastBytesRead && totalMessages < maxMessagePerRead && totalBytesRead < Integer.MAX_VALUE;}
我们逐个分析判断条件:
config.isAutoRead(): 这里默认为true
attemptedBytesRead == lastBytesRead: 表示本次读取的字节数和最后一次读取的字节数相等, 因为到这里都没有进行字节数组的读取操作, 所以默认都为0, 这里也返回true
totalMessages < maxMessagePerRead
表示当前读取的次数是否小于最大读取次数, 我们知道totalMessages每次循环都会自增, 而maxMessagePerRead默认值为16, 所以这里会限制循环不能超过16次, 也就是最多一次只能读取16条连接
totalBytesRead < Integer.MAX_VALUE
表示读取的字节数不能超过int类型的最大值
这里就剖析完了Handle的创建和初始化过程, 并且剖析了循环终止条件等相关的逻辑
以上就是Netty分布式客户端处理接入事件handle源码解析的详细内容,更多关于Netty分布式客户端接入事件handle的资料请关注盛行IT其它相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。