netty客户端并发写数据,netty实现客户端

  netty客户端并发写数据,netty实现客户端

  00-1010概述:节I 3360初始化NioSockectChannelConfig,创建一个通道,并跟随它到其父类DefaultChannelConfig的构造函数,然后返回到AdaptiveRecvByteBufAllocator的构造函数,并继续跟随ChannelMetadata的构造函数,返回到DefaultChannelConfig的构造函数。

  00-1010门户:NioEventLoop任务队列执行

  在前一章中,我们学习了服务器启动和与eventLoop相关的逻辑。eventLoop如何处理轮询后的客户端访问事件?在本章中,我们将一步一步地继续分析客户端访问后的相关逻辑。

  

目录

 

  00-1010在分析接入过程之前,我们先补充一下第一章关于渠道创建的知识。

  在第一章中,我们分析了通道的创建,在NioServerSocketChannel中有一个构造方法:

  public NioServerSocketChannel(ServerSocketChannel Channel){ super(null,channel,SelectionKey。OP _ ACCEPT);config=new NioServerSocketChannelConfig(this,javaChannel()。socket());}当时我们并没有分析config的相关知识。在这一章中,我们先对此做一个补充。这里我们看到每个NioServerSocketChannel都有一个config属性,存储了NioServerSocketChannel的相关配置,这里创建了一个NioServerSocketChannelConfig对象,传入了当前通道和通道对应的java底层的socket对象。NioServerSocketChannelConfig实际上是NioServerSocketChannel的内部类。

  让我们遵循NioServerSocketChannelConfig类:的构造方法。

  private NioServerSocketChannelConfig(NioServerSocketChannel channel channel,server socket javaSocket){ super(channel,javaSocket);}我们继续沿用其父类DefaultServerSocketChannelConfig :的构造方法

  public DefaultServerSocketChannelConfig(ServerSocketChannel channel channel,server socket javaSocket){ super(channel);if(Java socket==null){ throw new NullPointerException( Java socket );} this . Java socket=Java socket;}这里我们继续调用其父类的构造方法,保存jdk底部的socket对象,调用其父类DefaultChannelConfig的构造方法

  

前文概述:

public DefaultChannelConfig(Channel Channel){ this(Channel,new adapteverecvbytebufolucator());}这里调用了自己的构造函数,传入了channel和一个AdaptiveRecvByteBufAllocator对象。

 

  AdaptiveRecvByteBufAllocator是一个缓冲区分配器,用于分配一个缓冲区Bytebuf。Bytebuf的相关内容将在后续章节中详细讲解。这里可以简单介绍一下作为理解,就当是对后面知识的预习。

  Bytebuf相当于jdk的ByetBuffer,netty对其进行了重新封装,用于在channel中读写字节流。熟悉Nio的同学应该对这个比较熟悉。AdaptiveRecvByteBufAllocator是Netty中用来分配ByetBuffer的缓冲区分配器。根据它的名字,不难看出这个缓冲区是一个可变大小的字节缓冲区。

  我们遵循adaptiverecvbytebufolucator 3360的构造方法。

  java;">public AdaptiveRecvByteBufAllocator() { //DEFAULT_MINIMUM:最小缓冲区长度64字节 //DEFAULT_INITIAL:初始容量1024字节 //最大容量65536字节 this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);}这里调用自身的构造方法并且传入了三个属性, 这三个属性的含义分别为:

  DEFAULT_MINIMUM:代表要分配的缓冲区长度最少为64个字节

  DEFAULT_INITIAL:代表要分配的缓冲区的初始容量为1024个字节

  DEFAULT_MAXIMUM:代表要分配的缓冲区最大容量为65536个字节

  我们跟到this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM)方法中

  

public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { //忽略验证代码 //最小容量在table中的下标 int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; } //最大容量在table中的下标 int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; } this.initial = initial;}

其中这里初始化了三个属性, 分别是:

 

  minIndex:最小容量在size_table中的下标

  maxIndex:最大容量在table中的下标

  initial:初始容量1024个字节

  这里的size_table就是一个数组, 里面盛放着byteBuf可分配的内存大小的集合, 分配的bytebuf无论是扩容还是收缩, 内存大小都属于size_table中的元素, 那么这个数组是如何初始化的, 我们跟到这个属性中:

  

private static final int[] SIZE_TABLE;

我们看到是一个final修饰的静态成员变量, 我们跟到static块中看它的初始化过程:

 

  

static { //List集合 List<Integer> sizeTable = new ArrayList<Integer>(); //从16开始, 每递增16添加到List中, 直到大于等于512 for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } //从512开始, 倍增添加到List中, 直到内存溢出 for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } //初始化数组 SIZE_TABLE = new int[sizeTable.size()]; //将list的内容放入数组中 for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); }}

首先创建一个Integer类型的list用于盛放内存元素

 

  这里通过两组循环为list添加元素

  首先看第一组循环:

  

for (int i = 16; i < 512; i += 16) { sizeTable.add(i);}

这里是通过16平移的方式, 直到512个字节, 将每次平移之后的内存大小添加到list中

 

  再看第二组循环

  

for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i);}

超过512之后, 再通过倍增的方式循环, 直到int类型内存溢出, 将每次倍增之后大小添加到list中

 

  最后初始化SIZE_TABLE数组, 将list中的元素按下表存放到数组中

  这样就初始化了内存数组

  

 

  

再回到AdaptiveRecvByteBufAllocator的构造方法中

public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { //忽略验证代码 //最小容量在table中的下标 int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; } //最大容量在table中的下标 int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; } this.initial = initial;}

这里分别根据传入的最小和最大容量去SIZE_TABLE中获取其下标

 

  我们跟到getSizeTableIndex(minimum)中:

  

private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } int mid = low + high >>> 1; int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { return mid + 1; } }}

这里是通过二分查找去获取其下表

 

  if (SIZE_TABLE[minIndex] < minimum)这里判断最小容量下标所属的内存大小是否小于最小值, 如果小于最小值则下标+1

  最大容量的下标获取原理同上, 判断最大容量下标所属内存大小是否大于最大值, 如果是则下标-1

  我们回到DefaultChannelConfig的构造方法:

  

public DefaultChannelConfig(Channel channel) { this(channel, new AdaptiveRecvByteBufAllocator());}

刚才我们剖析过了AdaptiveRecvByteBufAllocator()的创建过程, 我们继续跟到this()中:

 

  

protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) { setRecvByteBufAllocator(allocator, channel.metadata()); this.channel = channel;}

我们看到这里初始化了channel, 在channel初始化之前, 调用了setRecvByteBufAllocator(allocator, channel.metadata())方法, 顾名思义, 这是用于设置缓冲区分配器的方法, 第一个参数是我们刚刚分析过的新建的AdaptiveRecvByteBufAllocator对象, 第二个传入的是与channel绑定的ChannelMetadata对象, ChannelMetadata对象是什么?

 

  我们跟进到metadata()方法当中, 由于是channel是NioServerSocketChannel, 所以调用到了NioServerSocketChannel的metadata()方法:

  

public ChannelMetadata metadata() { return METADATA;}

这里返回了一个成员变量METADATA, 跟到这个成员变量中:

 

  

private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);

这里创建了一个ChannelMetadata对象, 并在构造方法中传入false和16

 

  

 

  

继续跟到ChannelMetadata的构造方法中

public ChannelMetadata(boolean hasDisconnect, int defaultMaxMessagesPerRead) { //省略验证代码 //false this.hasDisconnect = hasDisconnect; //16 this.defaultMaxMessagesPerRead = defaultMaxMessagesPerRead;}

这里做的事情非常简单, 只初始化了两个属性:

 

  

hasDisconnect=false

 

  defaultMaxMessagesPerRead=16

  

defaultMaxMessagesPerRead=16代表在读取对方的链接或者channel的字节流时(无论server还是client), 最多只循环16次, 后面的讲解将会看到

 

  剖析完了ChannelMetadata对象的创建, 我们回到DefaultChannelConfig的构造方法:

  

protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) { setRecvByteBufAllocator(allocator, channel.metadata()); this.channel = channel;}

跟到setRecvByteBufAllocator(allocator, channel.metadata())方法中:

 

  

private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) { if (allocator instanceof MaxMessagesRecvByteBufAllocator) { ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead()); } else if (allocator == null) { throw new NullPointerException("allocator"); } rcvBufAllocator = allocator;}

首先会判断传入的缓冲区分配器是不是MaxMessagesRecvByteBufAllocator类型的, 因为AdaptiveRecvByteBufAllocator实现了MaxMessagesRecvByteBufAllocator接口, 所以此条件成立

 

  

之后将其转换成MaxMessagesRecvByteBufAllocator类型,

 

  然后调用其maxMessagesPerRead(metadata.defaultMaxMessagesPerRead())方法,

  这里会走到其子类DefaultMaxMessagesRecvByteBufAllocator的maxMessagesPerRead(int maxMessagesPerRead)方法中,

  其中参数metadata.defaultMaxMessagesPerRead()返回就是ChannelMetadata的属性defaultMaxMessagesPerRead,

  也就是16

  

跟到maxMessagesPerRead(int maxMessagesPerRead)方法中:

 

  

public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) { //忽略验证代码 //初始化为16 this.maxMessagesPerRead = maxMessagesPerRead; return this;}

这里将自身属性maxMessagesPerRead设置为16, 然后返回自身

 

  

 

  

回到DefaultChannelConfig的构造方法

private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) { if (allocator instanceof MaxMessagesRecvByteBufAllocator) { ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead()); } else if (allocator == null) { throw new NullPointerException("allocator"); } rcvBufAllocator = allocator;}

设置完了内存分配器的maxMessagesPerRead属性,最后将DefaultChannelConfig自身的成员变量rcvBufAllocator设置成我们初始化完毕的allocator对象

 

  至此,有关channelConfig有关的初始化过程剖析完成

  以上就是Netty分布式客户端接入流程初始化源码分析的详细内容,更多关于Netty分布式客户端接入流程初始化的资料请关注盛行IT其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: