netty异步业务线程,netty多线程
目录
异线程回收对象跟到pushLater方法中跟到分配方法中回到pushLater方法中简单看下环的类的定义回到pushLater方法中前文传送门:Netty分布式高性能工具类同线程下回收对象解析
异线程回收对象
就是创建对象和回收对象不在同一条线程的情况下, 对象回收的逻辑
我们之前小节简单介绍过, 异线程回收对象, 是不会放在当前线程的堆中的, 而是放在一个WeakOrderQueue的数据结构中, 回顾我们之前的一个图:
8-6-1
相关的逻辑, 我们跟到源码中:
首先从回收对象的入口方法开始,默认手柄的使再循环方法:
public void recycle(Object Object){ if(Object!=value){ throw new IllegalArgumentException( object不属于句柄);} stack.push(这个);}这部分我们并不陌生, 跟到推方法中:
void push(DefaultHandle?item){线程当前线程=线程。当前线程();if(thread==当前线程){ push now(item);} else { pushLater(item,当前线程);}}上一小节分析过, 同线程会走到pushNow,有关具体逻辑也进行了分析
如果不是同线程, 则会走到pushLater方法, 传入处理对象和当前线程对象
跟到pushLater方法中
私有void pushLater(DefaultHandle?item,Thread thread) { MapStack?WeakOrderQueue delayedRecycled=DELAYED _ recycled。get();WeakOrderQueue queue=delayedrecycled。get(这个);if(queue==null){ if(delayedrecycled。size()=maxDelayedQueues){ delayedrecycled。把(这个,WeakOrderQueue .假人);返回;} if((queue=weakorderqueue。分配(this,thread))==null){ return;} delayedRecycled.put(this,queue);} else if (queue==WeakOrderQueue .哑){ return} queue.add(项目);}首先通过DELAYED_RECYCLED.get()获取一个延迟回收对象
我们跟到延迟_回收中:
private static final FastThreadLocalMapStack?WeakOrderQueue DELAYED _ RECYCLED=new FastThreadLocalMapStack?WeakOrderQueue(){ @覆盖受保护的MapStack?WeakOrderQueue initialValue() {返回新的WeakHashMapStack?WeakOrderQueue();}};这里我们看到延迟_回收是一个快速线程本地对象,初始值方法创建一个弱哈希映射对象,WeakHashMap是一个地图,钥匙为堆栈,值为我们刚才提到过的WeakOrderQueue
从中
我们可以分析到, 每个线程都维护了一个WeakHashMap对象
WeakHashMap中的元素, 是一个stack和WeakOrderQueue的映射, 说明了不同的stack, 对应不同的WeakOrderQueue
这里的映射关系可以举个例子说明:
比如线程1创建了一个对象, 在线程3进行了回收, 线程2创建了一个对象, 同样也在线程3进行了回收, 那么线程3对应的WeakHashMap中就会有两个元素:
线程1的stack和线程2的WeakOrderQueue, 线程2和stack和线程2的WeakOrderQueue
我们回到pushLater方法中:
继续往下看:
WeakOrderQueue queue = delayedRecycled.get(this)
拿到了当前线程的WeakHashMap对象delayedRecycled之后, 然后通过delayedRecycled创建对象的线程的stack, 拿到WeakOrderQueue
这里的this, 就是创建对象的那个线程所属的stack, 这个stack是绑定在handle中的, 创建handle对象时候进行的绑定
假设当前线程是线程2, 创建handle的线程是线程1, 这里通过handle的stack拿到线程1的WeakOrderQueue
if(queue ==null)说明线程2没有回收过线程1的对象, 则进入if块的逻辑:
首先看判断if(delayedRecycled.size() >= maxDelayedQueues)
delayedRecycled.size()表示当前线程回收其他创建对象的线程的线程个数, 也就是有几个其他的线程在当前线程回收对象
maxDelayedQueues表示最多能回收的线程个数, 这里如果朝超过这个值, 就表示当前线程不能在回收其他线程的对象了
通过delayedRecycled.put(this, WeakOrderQueue.DUMMY)标记, 创建对象的线程的stack, 所对应的WeakOrderQueue不可用, DUMMY我们可以理解为不可用
如果没有超过maxDelayedQueues, 则通过if判断中的WeakOrderQueue.allocate(this, thread)这种方式创建一个WeakOrderQueue
allocate传入this, 也就是创建对象的线程对应的stack, 假设是线程1, thread就是当前线程, 假设是线程2
跟到allocate方法中
static WeakOrderQueue allocate(Stack<?> stack, Thread thread) { return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY) ? new WeakOrderQueue(stack, thread) : null;}
reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)表示线程1的stack还能不能分配LINK_CAPACITY个元素, 如果可以, 则直接通过new的方式创建一个WeakOrderQueue对象
再跟到reserveSpace方法中:
private static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) { assert space >= 0; for (;;) { int available = availableSharedCapacity.get(); if (available < space) { return false; } if (availableSharedCapacity.compareAndSet(available, available - space)) { return true; } }}
参数availableSharedCapacity表示线程1的stack允许外部线程给其缓存多少个对象, 之前我们分析过是16384, space默认是16
方法中通过一个cas操作, 将16384减去16, 表示stack可以给其他线程缓存的对象数为16384-16
而这16个元素, 将由线程2缓存
回到pushLater方法中
创建之后通过delayedRecycled.put(this, queue)将stack和WeakOrderQueue进行关联
最后通过queue.add(item), 将创建的WeakOrderQueue添加一个handle
讲解WeakOrderQueue之前, 我们首先了解下WeakOrderQueue的数据结构
WeakOrderQueue维护了多个link, link之间是通过链表进行连接, 每个link可以盛放16个handle,
我们刚才分析过, 在reserveSpace方法中将stack.availableSharedCapacity-16, 其实就表示了先分配16个空间放在link里, 下次回收的时候, 如果这16空间没有填满, 则可以继续往里盛放
如果16个空间都已填满, 则通过继续添加link的方式继续分配16个空间用于盛放handle
WeakOrderQueue和WeakOrderQueue之间也是通过链表进行关联
可以根据下图理解上述逻辑:
8-6-2
根据以上思路, 我们跟到WeakOrderQueue的构造方法中:
private WeakOrderQueue(Stack<?> stack, Thread thread) { head = tail = new Link(); owner = new WeakReference<Thread>(thread); synchronized (stack) { next = stack.head; stack.head = this; } availableSharedCapacity = stack.availableSharedCapacity;}
这里有个head和tail, 都指向一个link对象, 这里我们可以分析到, 其实在WeakOrderQueue中维护了一个链表, head分别代表头结点和尾节点, 初始状态下, 头结点和尾节点都指向同一个节点
简单看下link的类的定义
private static final class Link extends AtomicInteger { private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY]; private int readIndex; private Link next;}
每次创建一个Link, 都会创建一个DefaultHandle类型的数组用于盛放DefaultHandle对象, 默认大小是16个
readIndex是一个读指针, 我们之后小节会进行分析
next节点则指向下一个link
回到WeakOrderQueue的构造方法中:
owner是对向前线程进行一个包装, 代表了当前线程
接下来在一个同步块中, 将当前创建的WeakOrderQueue插入到stack指向的第一个WeakOrderQueue, 也就是stack的head属性, 指向我们创建的WeakOrderQueue, 如图所示
8-6-3
如果线程2创建一个和stack关联的WeakOrderQueue, stack的head节点就就会指向线程2创建WeakOrderQueue
如果之后线程3也创建了一个和stack关联的WeakOrderQueue, stack的head节点就会指向新创建的线程3的WeakOrderQueue
然后线程3的WeakOrderQueue再指向线程2的WeakOrderQueue
也就是无论哪个线程创建一个和同一个stack关联的WeakOrderQueue的时候, 都插入到stack指向的WeakOrderQueue列表的头部
这样就可以将stack和其他线程释放对象的容器WeakOrderQueue进行绑定
回到pushLater方法中
private void pushLater(DefaultHandle<?> item, Thread thread) { Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); WeakOrderQueue queue = delayedRecycled.get(this); if (queue == null) { if (delayedRecycled.size() >= maxDelayedQueues) { delayedRecycled.put(this, WeakOrderQueue.DUMMY); return; } if ((queue = WeakOrderQueue.allocate(this, thread)) == null) { return; } delayedRecycled.put(this, queue); } else if (queue == WeakOrderQueue.DUMMY) { return; } queue.add(item);}
根据之前分析的WeakOrderQueue的数据结构, 我们分析最后一步, 也就是WeakOrderQueue的add方法
我们跟进WeakOrderQueue的add方法:
void add(DefaultHandle<?> handle) { handle.lastRecycledId = id; Link tail = this.tail; int writeIndex; if ((writeIndex = tail.get()) == LINK_CAPACITY) { if (!reserveSpace(availableSharedCapacity, LINK_CAPACITY)) { return; } this.tail = tail = tail.next = new Link(); writeIndex = tail.get(); } tail.elements[writeIndex] = handle; handle.stack = null; tail.lazySet(writeIndex + 1);}
首先, 看handle.lastRecycledId = id
lastRecycledId表示handle上次回收的id, 而id表示WeakOrderQueue的id, weakOrderQueue每次创建的时候, 会为自增一个唯一的id
Link tail =this.tail表示拿到当前WeakOrderQueue的中指向最后一个link的指针, 也就是尾指针
再看if((writeIndex = tail.get()) == LINK_CAPACITY)
tail.get()表示获取当前link中已经填充元素的个数, 如果等于16, 说明元素已经填充满
然后通过eserveSpace方法判断当前WeakOrderQueue是否还能缓存stack的对象, eserveSpace方法我们刚才已经分析过, 会根据stack的属性availableSharedCapacity-16的方式判断还能否缓存stack的对象, 如果不能再缓存stack的对象, 则返回
如果还能继续缓存, 则在创建一个link, 并将尾节点指向新创建的link, 并且原来尾节点的next的节点指向新创建的link
然后拿到当前link的writeIndex, 也就是写指针, 如果是新创建的link中没有元素, writeIndex为0
之后将尾部的link的elements属性, 也就是一个DefaultHandle类型的数组, 通过数组下标的方式将第writeIndex个节点赋值为要回收的handle
然后将handle的stack属性设置为null, 表示当前handle不是通过stack进行回收的
最后将tail节点的元素个数进行+1, 表示下一次将从writeIndex+1的位置往里写
以上就是异线程回收对象的逻辑
以上就是Netty分布式高性能工具类异线程下回收对象解析的详细内容,更多关于Netty分布式异线程回收对象的资料请关注盛行IT其它相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。