netty 分布式,

  netty 分布式,

  00-1010 AbstractByteBuf属性和构造方法概述。首先看这个类的属性和构造方法。我们来看几个最简单的方法。我们重点介绍第二种验证方法,保证ensureWritable(长度)。我们遵循扩展方法,最后将写指针移回长度字节。

  00-1010熟悉Nio的朋友应该熟悉jdk底层的byteBuffer,即字节缓冲区,主要用于网络底层的io读写。当通道中有数据时,将通道中的数据读入字节缓冲区,当要向对方写入数据时,将字节缓冲区中的数据写入通道。

  但是jdk的byteBuffer用起来不方便。例如,标记位置的指针位置只有一个。读写时需要通过flip()方法频繁移动指针位置,容易出错。而且byteBuffer的内存一旦分配就不能更改,不支持动态扩展。当读写内容大于缓冲内存时,会发生索引越界异常。

  Netty的ByteBuf重新定义了jdk的byteBuffer,它也是一个用于在网络io中读取数据的字节缓冲区,但使用起来大大简化,支持自动扩容,所以不用担心读写数据大小超过初始分配的大小。

  ByteBuf根据其分类有不同的实现方式。有些是直接基于jdk底层byteBuffer实现的,有些是基于字节数组实现的。对于byteBuf的分类,我们会在下一节讲到。

  ByteBuf维护两个指针,一个是读指针,另一个是写指针。这两个指针相互独立。在读操作期间,只有读指针移动,读的字节数由指针位置记录。

  在同一个写操作过程中,只会移动写指针,写指针的位置会记录写入的字节数。

  每次读写操作时,都会检查指针的位置,读指针的位置不能超过写指针的位置,否则会抛出异常。

  类似地,如果写指针不能超过缓冲区分配的内存,缓冲区将被扩展。

  具体的指针操作如下图所示:

  00-1010在讲AbstractByteBuf之前,我们先来了解一下ByteBuf这个类,它是所有ByteBuf的顶层抽象,它定义了大量ByteBuf操作的抽象方法,供子类实现。

  AbstractByteBuf也是buffer的抽象类,定义了ByteBuf的骨架操作,比如参数检查、自动扩展、一些读写操作的指针移动等。但是不同的bytebuf具体实现是不一样的,这个情况给它的子类。

  AbstractByteBuf继承了这个类并实现了它的大部分方法。

  00-1010//读指针int readerIndex//写指针int writerIndex//保存读指针private int markedReaderIndex//保存写指针private int markedWriterIndex//最大分配容量private int maxCapacityprotected AbstractByteBuf(int max capacity){ if(max capacity 0){ throw new IllegalArgumentException( maxCapacity : max capacity (expected :=0));} this . max capacity=max capacity;}我们可以看到读写指针的成员标量是在属性中定义的,读写指针的位置是保存的。

  在构造函数中,可以传入可以分配的最大内存,然后将其赋给成员变量。

  

目录

@ override public int max capacity(){ return max capacity;} @ override public int reader index(){ return reader index;} @ override public int writer index(){ return writer index;}这些获取最大内存和读写指针的方法是所有bytebuf共有的,所以可以在AbstractByteBuf中定义。

 

  我们以一个writeBytes方法为例,让学生熟悉AbstractByteBuf的哪些部分是自己实现的,哪些部分交给子类实现:

  @ override public byte buf write bytes(byte buf src){ write bytes(src,src.readableBytes()

  ); return this;}这个方法是将源的ByteBuf(参数)中的字节写入到自身ByteBuf中

  首先这里调用了自身的writeBytes方法, 并传入参数ByteBuf本身, 以及Bytebuf的可读字节数, 我们跟到readbleBytes()方法中, 其实就是调用了自身的方法:

  

@Overridepublic int readableBytes() { return writerIndex - readerIndex;}

我们看到, 这里可读字节数就是返回了写指针到读指针之间的长度

 

  我们再继续跟到writeBytes(src, src.readableBytes())中:

  

@Overridepublic ByteBuf writeBytes(ByteBuf src, int length) { if (length > src.readableBytes()) { throw new IndexOutOfBoundsException(String.format( "length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src)); } writeBytes(src, src.readerIndex(), length); src.readerIndex(src.readerIndex() + length); return this;}

这里同样调用了自身的方法首先会对参数进行验证, 就是写入自身的长度不能超过源ByteBuf的可读字节数

 

  这里又调用了一个wirte方法, 参数传入源Bytebuf, 其可读字节数, 写入的长度, 这里写入的长度我们知道就是源ByteBuf的可读字节数

  我们再跟到writeBytes(src, src.readerIndex(), length);

  

public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) { ensureAccessible(); ensureWritable(length); setBytes(writerIndex, src, srcIndex, length); writerIndex += length; return this;}

 

  

我们重点关注第二个校验方法ensureWritable(length)

public ByteBuf ensureWritable(int minWritableBytes) { if (minWritableBytes < 0) { throw new IllegalArgumentException(String.format( "minWritableBytes: %d (expected: >= 0)", minWritableBytes)); } ensureWritable0(minWritableBytes); return this;}

然后我们再跟到ensureWritable0(minWritableBytes)方法中:

 

  

private void ensureWritable0(int minWritableBytes) { if (minWritableBytes <= writableBytes()) { return; } if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } //自动扩容 int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity); capacity(newCapacity);}

开始做了两个参数校验, 第一个表示当前ByteBuf写入的长度如果要小于可写字节数, 则返回

 

  第二个可以换种方式去看minWritableBytes+ writerIndex> maxCapacity 也就是需要写入的长度+写指针必须要小于最大分配的内存, 否则报错, 注意这里最大分配内存不带表当前内存, 而是byteBuf所能分配的最大内存

  如果需要写入的长度超过了可写字节数, 并且需要写入的长度+写指针不超过最大内存, 则就开始了ByteBuf非常经典也非常重要的操作, 也就是自动扩容

  

int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

其中alloc()返回的是当前bytebuf返回的缓冲区分配器对象, 我们之后的小节会讲到, 这里调用了其calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity)方法为其扩容, 其中传入的参数writerIndex + minWritableBytes代表所需要的容量, maxCapacity为最大容量

 

  

 

  

我们跟到扩容的方法里面去

public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { //合法性校验 if (minNewCapacity < 0) { throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)"); } if (minNewCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity)); } //阈值为4mb final int threshold = 1048576 * 4; //最小需要扩容内存(总内存) == 阈值 if (minNewCapacity == threshold) { //返回阈值 return threshold; } //最小扩容内存>阈值 if (minNewCapacity > threshold) { //newCapacity为需要扩容内存 int newCapacity = minNewCapacity / threshold * threshold; //目标容量+阈值>最大容量 if (newCapacity > maxCapacity - threshold) { //将最大容量作为新容量 newCapacity = maxCapacity; } else { //否则, 目标容量+阈值 newCapacity += threshold; } return newCapacity; } //如果小于阈值 int newCapacity = 64; //目标容量<需要扩容的容量 while (newCapacity < minNewCapacity) { //倍增 newCapacity <<= 1; } //目标容量和最大容量返回一个最小的 return Math.min(newCapacity, maxCapacity);}

扩容相关的逻辑注释也写的非常清楚, 如果小于阈值(4mb), 采用倍增的方式, 如果大于阈值(4mb), 采用平移4mb的方式

 

  我们回到writeBytes(ByteBuf src, int srcIndex, int length):

  

public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) { ensureAccessible(); ensureWritable(length); setBytes(writerIndex, src, srcIndex, length); writerIndex += length; return this;}

再往下看setBytes(writerIndex, src, srcIndex, length), 这里的参数的意思是从当前byteBuf的writerIndex节点开始写入, 将源缓冲区src的读指针位置, 写lenght个字节, 这里的方法中AbstractByteBuf类并没有提供实现, 因为不同类型的BtyeBuf实现的方式是不一样的, 所以这里交给了子类去实现

 

  

 

  

最后将写指针后移length个字节

最后我们回到writeBytes(ByteBuf src, int length)方法中:

 

  

public ByteBuf writeBytes(ByteBuf src, int length) { if (length > src.readableBytes()) { throw new IndexOutOfBoundsException(String.format( "length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src)); } writeBytes(src, src.readerIndex(), length); src.readerIndex(src.readerIndex() + length); return this;}

当writeBytes(src, src.readerIndex(), length)写完之后, 通过src.readerIndex(src.readerIndex() + length)将源缓冲区的读指针后移lenght个字节

 

  以上对AbstractByteBuf的简单介绍和其中写操作的方法的简单剖析

  以上就是Netty分布式ByteBuf使用的底层实现方式源码解析的详细内容,更多关于Netty分布式ByteBuf使用底层实现的资料请关注盛行IT其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: