netty 释放bytebuf,netty bytebuf使用

  netty 释放bytebuf,netty bytebuf使用

  00-1010在分析先出逻辑之前,首先我们介绍一下cache对象的数据结构。以acheForTiny类型为例,我们遵循createSubPageCaches方法,返回到PoolArena的allocate方法。我们遵循normalizeCapacity方法并返回到allocate方法。allocateTiny是缓存分配的条目。让我们简单看一下缓存分配方法。这个类继承了init方法。上一节简单分析了directArena内存分配的一般流程,知道先命中缓存,不命中就分配一个连续内存。本节带您分析命中缓存的相关逻辑。

  00-1010回顾上一节的内容,我们说PoolThreadCache中维护了三个缓存数组(实际上是六个缓存数组,这里只是以Direct为例,堆类型的逻辑是一样的)。3360 tinySubpageDirectCache、smallSubpageDirectCache和normalDirectCaches分别代表Tiny类型、Small类型和normal类型的缓存数组。

  这三个数组保存在PoolThreadCache 3360的成员变量中。

  private final MemoryRegionCacheByteBuffer[]tinySubPageDirectCaches;private final memory regioncachebytebuffer[]smallSubPageDirectCaches;private final MemoryRegionCacheByteBuffer[]normalDirectCaches;其中在构造方法中执行初始化3360。

  tinySubPageDirectCaches=createSubPageCaches(tinyCacheSize,PoolArena.numTinySubpagePools,SizeClass。微小);smallSubPageDirectCaches=createSubPageCaches(smallCacheSize,directarena . numsmallsubpagepools,SizeClass。小);normalDirectCaches=createNormalCaches(normalCacheSize,maxCachedBufferCapacity,directArena);

  

目录

private static T MemoryRegionCacheT[]createSubPageCaches(int cacheSize,int numCaches,size class size class){ if(cacheSize 0){ @ suppress warnings( unchecked )MemoryRegionCacheT[]cache=new memoryregioncaches[num caches];for(int I=0;i cache.lengthi ) { cache[i]=新子页面内存区域缓存(cacheSize,size class);}返回缓存;} else {返回null}}上面的小节这里已经分析过了。这里,创建了一个缓存阵列。这种缓存数组的长度,即numCaches,在不同的类型中是不同的。微型的长度是32,小型的长度是4,普通的长度是3。

 

  众所周知,缓存数组中的每个节点代表一个缓存对象,其中维护着一个队列。队列的大小由PooledByteBufAllocator类中的TinyCache、SmallCache和Normal Cache属性决定,这在上一节中已经分析过。

  对于每个缓存对象,队列中缓存的ByteBuf的大小是固定的,netty将每个缓冲区类型分为不同的长度规格,每个缓存队列中缓存的ByteBuf的长度是相同的规格,而缓冲区数组的长度是规格的个数。

  比如tiny型,netty将其长度分为32种规格,每种规格都是16的整数倍,即包含0b、16B、32B、48B、64B、80B、96b、96B。

  ..496B总共32种规格, 而在其缓存数组tinySubPageDirectCaches中, 这每一种规格代表数组中的一个缓存对象缓存的ByteBuf的大小, 我们以tinySubPageDirectCaches[1]为例(这里下标选择1是因为下标为0代表的规格是0B, 其实就代表一个空的缓存, 这里不进行举例), 在tinySubPageDirectCaches[1]的缓存对象中所缓存的ByteBuf的缓冲区长度是16B, 在tinySubPageDirectCaches[2]中缓存的ByteBuf长度都为32B, 以此类推, tinySubPageDirectCaches[31]中缓存的ByteBuf长度为496B

  有关类型规则的分配如下:

  tiny:总共32个规格, 均是16的整数倍, 0B, 16B, 32B, 48B, 64B, 80B, 96B......496B

  small:4种规格, 512b, 1k, 2k, 4k

  nomal:3种规格, 8k, 16k, 32k

  这样, PoolThreadCache中缓存数组的数据结构为

  

 

  大概了解缓存数组的数据结构, 我们再继续剖析在缓冲中分配内存的逻辑

  

 

  

回到PoolArena的allocate方法中

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { //规格化 final int normCapacity = normalizeCapacity(reqCapacity); if (isTinyOrSmall(normCapacity)) { int tableIdx; PoolSubpage<T>[] table; //判断是不是tinty boolean tiny = isTiny(normCapacity); if (tiny) { // < 512 //缓存分配 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { return; } //通过tinyIdx拿到tableIdx tableIdx = tinyIdx(normCapacity); //subpage的数组 table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } //拿到对应的节点 final PoolSubpage<T> head = table[tableIdx]; synchronized (head) { final PoolSubpage<T> s = head.next; //默认情况下, head的next也是自身 if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, handle, reqCapacity); if (tiny) { allocationsTiny.increment(); } else { allocationsSmall.increment(); } return; } } allocateNormal(buf, reqCapacity, normCapacity); return; } if (normCapacity <= chunkSize) { //首先在缓存上进行内存分配 if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { //分配成功, 返回 return; } //分配不成功, 做实际的内存分配 allocateNormal(buf, reqCapacity, normCapacity); } else { //大于这个值, 就不在缓存上分配 allocateHuge(buf, reqCapacity); }}

首先通过normalizeCapacity方法进行内存规格化

 

  

 

  

我们跟到normalizeCapacity方法中

int normalizeCapacity(int reqCapacity) { if (reqCapacity < 0) { throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)"); } if (reqCapacity >= chunkSize) { return reqCapacity; } //如果>tiny if (!isTiny(reqCapacity)) { // >= 512 //找一个2的幂次方的数值, 确保数值大于等于reqCapacity int normalizedCapacity = reqCapacity; normalizedCapacity --; normalizedCapacity = normalizedCapacity >>> 1; normalizedCapacity = normalizedCapacity >>> 2; normalizedCapacity = normalizedCapacity >>> 4; normalizedCapacity = normalizedCapacity >>> 8; normalizedCapacity = normalizedCapacity >>> 16; normalizedCapacity ++; if (normalizedCapacity < 0) { normalizedCapacity >>>= 1; } return normalizedCapacity; } //如果是16的倍数 if ((reqCapacity & 15) == 0) { return reqCapacity; } //不是16的倍数, 变成最大小于当前值的值+16 return (reqCapacity & ~15) + 16;}

if(!isTiny(reqCapacity))代表如果大于tiny类型的大小, 也就是512, 则会找一个2的幂次方的数值, 确保这个数值大于等于reqCapacity

 

  如果是tiny, 则继续往下

  if((reqCapacity & 15) == 0)这里判断如果是16的倍数, 则直接返回

  如果不是16的倍数, 则返回(reqCapacity & ~15) + 16, 也就是变成最小大于当前值的16的倍数值

  从上面规格化逻辑看出, 这里将缓存大小规格化成固定大小, 确保每个缓存对象缓存的ByteBuf容量统一

  

 

  

回到allocate方法中

if(isTinyOrSmall(normCapacity))这里是根据规格化后的大小判断是否tiny或者small类型, 我们跟到方法中:

 

  

boolean isTinyOrSmall(int normCapacity) { return (normCapacity & subpageOverflowMask) == 0;}

这里是判断如果normCapacity小于一个page的大小, 也就是8k代表其实tiny或者small

 

  继续看allocate方法:

  如果当前大小是tiny或者small, 则isTiny(normCapacity)判断是否是tiny类型, 跟进去:

  

static boolean isTiny(int normCapacity) { return (normCapacity & 0xFFFFFE00) == 0;}

这里是判断如果小于512, 则认为是tiny

 

  再继续看allocate方法:

  如果是tiny, 则通过cache.allocateTiny(this, buf, reqCapacity, normCapacity)在缓存上进行分配

  我们就以tiny类型为例, 分析在缓存上分配ByteBuf的流程

  

 

  

allocateTiny是缓存分配的入口

我们跟进去, 进入到了PoolThreadCache的allocateTiny方法中:

 

  

boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);}

这里有个方法cacheForTiny(area, normCapacity), 这个方法的作用是根据normCapacity找到tiny类型缓存数组中的一个缓存对象

 

  我们跟进cacheForTiny:

  

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) { int idx = PoolArena.tinyIdx(normCapacity); if (area.isDirect()) { return cache(tinySubPageDirectCaches, idx); } return cache(tinySubPageHeapCaches, idx);}

PoolArena.tinyIdx(normCapacity)是找到tiny类型缓存数组的下标

 

  继续跟tinyIdx:

  

static int tinyIdx(int normCapacity) { return normCapacity >>> 4;}

这里直接将normCapacity除以16, 通过前面的内容我们知道, tiny类型缓存数组中每个元素规格化的数据都是16的倍数, 所以通过这种方式可以找到其下标, 参考图5-2, 如果是16B会拿到下标为1的元素, 如果是32B则会拿到下标为2的元素

 

  

 

  

回到acheForTiny方法中

if(area.isDirect())这里判断是否是分配堆外内存, 因为我们是按照堆外内存进行举例, 所以这里为true

 

  再继续跟到cache(tinySubPageDirectCaches, idx)方法中:

  

private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) { if (cache == null idx > cache.length - 1) { return null; } return cache[idx];}

这里我们看到直接通过下标的方式拿到了缓存数组中的对象

 

  回到PoolThreadCache的allocateTiny方法中:

  

boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);}

拿到了缓存对象之后, 我们跟到allocate(cacheForTiny(area, normCapacity), buf, reqCapacity)方法中:

 

  

private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) { if (cache == null) { return false; } boolean allocated = cache.allocate(buf, reqCapacity); if (++ allocations >= freeSweepAllocationThreshold) { allocations = 0; trim(); } return allocated;}

这里通过cache.allocate(buf, reqCapacity)进行继续进行分配

 

  再继续往里跟, 跟到内部类MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法中:

  

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) { Entry<T> entry = queue.poll(); if (entry == null) { return false; } initBuf(entry.chunk, entry.handle, buf, reqCapacity); entry.recycle(); ++ allocations; return true;}

这里首先通过queue.poll()这种方式弹出一个entry, 我们之前的小节分析过, MemoryRegionCache维护着一个队列, 而队列中的每一个值是一个entry

 

  

 

  

我们简单看下Entry这个类

static final class Entry<T> { final Handle<Entry<?>> recyclerHandle; PoolChunk<T> chunk; long handle = -1; //代码省略}

这里重点关注chunk和handle的这两个属性, chunk代表一块连续的内存, 我们之前简单介绍过, netty是通过chunk为单位进行内存分配的, 我们之后会对chunk进行剖析

 

  handle相当于一个指针, 可以唯一定位到chunk里面的一块连续的内存, 之后也会详细分析

  这样, 通过chunk和handle就可以定位ByteBuf中指定一块连续内存, 有关ByteBuf相关的读写, 都会在这块内存中进行

  我们回到MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法:

  

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) { Entry<T> entry = queue.poll(); if (entry == null) { return false; } initBuf(entry.chunk, entry.handle, buf, reqCapacity); entry.recycle(); ++ allocations; return true;}

弹出entry之后, 通过initBuf(entry.chunk, entry.handle, buf, reqCapacity)这种方式给ByteBuf初始化, 这里参数传入我们刚才分析过的当前Entry的chunk和hanle

 

  因为我们分析的tiny类型的缓存对象是SubPageMemoryRegionCache类型,所以我们继续跟到SubPageMemoryRegionCache类的initBuf(entry.chunk, entry.handle, buf, reqCapacity)方法中:

  

protected void initBuf( PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) { chunk.initBufWithSubpage(buf, handle, reqCapacity);}

这里的chunk调用了initBufWithSubpage(buf, handle, reqCapacity)方法, 其实就是PoolChunk类中的方法

 

  我们继续跟initBufWithSubpage:

  

void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) { initBufWithSubpage(buf, handle, bitmapIdx(handle), reqCapacity);}

这里有关bitmapIdx(handle)相关的逻辑, 会在后续的章节进行剖析, 这里继续往里跟:

 

  

private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) { assert bitmapIdx != 0; int memoryMapIdx = memoryMapIdx(handle); PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)]; assert subpage.doNotDestroy; assert reqCapacity <= subpage.elemSize; buf.init( this, handle, runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize, arena.parent.threadCache());}

这里我们先关注init方法, 因为我们是以PooledUnsafeDirectByteBuf为例, 所以这里走的是PooledUnsafeDirectByteBuf的init方法

 

  

 

  

跟进init方法

void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { super.init(chunk, handle, offset, length, maxLength, cache); initMemoryAddress();}

首先调用了父类的init方法, 再跟进去:

 

  

void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { //初始化 assert handle >= 0; assert chunk != null; //在哪一块内存上进行分配的 this.chunk = chunk; //这一块内存上的哪一块连续内存 this.handle = handle; memory = chunk.memory; this.offset = offset; this.length = length; this.maxLength = maxLength; tmpNioBuf = null; this.cache = cache;}

这里将PooledUnsafeDirectByteBuf的各个属性进行了初始化

 

  this.chunk = chunk这里初始化了chunk, 代表当前的ByteBuf是在哪一块内存中分配的

  this.handle = handle这里初始化了handle, 代表当前的ByteBuf是这块内存的哪个连续内存

  有关offset和length, 我们会在之后的小节进行分析, 在这里我们只需要知道, 通过缓存分配ByteBuf, 我们只需要通过一个chunk和handle, 就可以确定一块内存

  以上就是通过缓存分配ByteBuf对象的过程

  我们回到MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法:

  

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) { Entry<T> entry = queue.poll(); if (entry == null) { return false; } initBuf(entry.chunk, entry.handle, buf, reqCapacity); entry.recycle(); ++ allocations; return true;}

分析完了initBuf方法, 再继续往下看

 

  entry.recycle()这步是将entry对象进行回收, 因为entry对象弹出之后没有再被引用, 可能gc会将entry对象回收, netty为了将对象进行循环利用, 就将其放在对象回收站进行回收

  我们跟进recycle方法

  

void recycle() { chunk = null; handle = -1; recyclerHandle.recycle(this);}

chunk = null和handle = -1表示当前Entry不指向任何一块内存

 

  recyclerHandle.recycle(this)将当前entry回收, 有关对象回收站, 我们会在后面的章节详细剖析

  以上就是命中缓存的流程, 因为这里我们是假设缓中有值的情况下进行分配的, 如果第一次分配, 缓存中是没有值的, 那么在缓存中没有值的情况下, netty是如何进行分配的呢?我们再之后的小节会进行剖析

  更多关于Netty分布式ByteBuf使用命中缓存分配的资料请关注盛行IT其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: