netty eventloop源码分析,
目录
选择操作的入口NioEventLoop的奔跑方法轮询超正析象管事件重建选择器()方法分析完了选择器的创建和优化的过程,这一小节分析挑选相关操作
select操作的入口
NioEventLoop的run方法
受保护的void run(){ for(;){尝试{切换(选择策略。calculate strategy(select now supplier,hasTasks()){ case select strategy .继续:继续;案例选择策略。选择: //轮询超正析象管事件(1)选择(唤醒。getandset(false));如果(醒来。get()){ selector。醒来();}默认: }取消键=0;needsToSelectAgain=false//默认是50 final int io=this。io比率;if(io ratio==100){ try { processSelectedKeys();}最后{ run all tasks();} } else { //记录下开始时间最终长ioStartTime=system。纳米时间();尝试{ //处理轮询到的key(2)processSelectedKeys();}最后{ //计算耗时最终长io时间=系统。nano time()-ioStartTime;//执行task(3)runall tasks(io时间*(100-io比)/io比);} } } catch(Throwable t){ handleLoopException(t);} //代码省略}}代码比较长,其实主要分为三部分:
1.轮询超正析象管事件
2.处理轮询到的键
3.执行工作
这一小节,主要剖析第一部分
轮询io事件
首先转换块中默认会走到选择策略。挑选中,执行选择(唤醒。getandset(false))方法
参数醒醒吧。getandset(false)代表当前挑选操作是未唤醒状态
进入到选择(唤醒。getandset(false))方法中
私有空选择(布尔旧唤醒)引发io异常{选择器选择器=this。选择器;请尝试{ int select CNT=0;//当前系统的纳秒数长电流时间毫微秒=系统。纳米时间();//截止时间=当前时间队列第一个任务剩余执行时间long selectDeadLineNanos=当前时间毫微秒延迟毫微秒(当前时间毫微秒);for(;) {
//阻塞时间(毫秒)=(截止时间-当前时间+0.5毫秒) long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } //进行阻塞式的select操作 int selectedKeys = selector.select(timeoutMillis); //轮询次数 selectCnt ++; //如果轮询到一个事件(selectedKeys != 0), 或者当前select操作需要唤醒(oldWakenUp), //或者在执行select操作时已经被外部线程唤醒(wakenUp.get()), //或者任务队列已经有任务(hasTask), 或者定时任务队列中有任务(hasScheduledTasks()) if (selectedKeys != 0 oldWakenUp wakenUp.get() hasTasks() hasScheduledTasks()) { break; } //省略 //记录下当前时间 long time = System.nanoTime(); //当前时间-开始时间>=超时时间(条件成立, 执行过一次select操作, 条件不成立, 有可能发生空轮询) if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { //代表已经进行了一次阻塞式select操作, 操作次数重置为1 selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { //省略日志代码 //如果空轮询的次数大于一个阈值(512), 解决空轮询的bug rebuildSelector(); selector = this.selector; selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } //代码省略 } catch (CancelledKeyException e) { //省略 }}首先通过longcurrentTimeNanos = System.nanoTime()获取系统的纳秒数
继续往下看:
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
delayNanos(currentTimeNanos)代表距定时任务中第一个任务剩余多长时间, 这个时间+当前时间代表这次操作不能超过的时间, 因为超过之后定时任务不能严格按照预定时间执行, 其中定时任务队列是已经按照执行时间有小到大排列好的队列, 所以第一个任务则是最近需要执行的任务, selectDeadLineNanos就代表了当前操作不能超过的时间
然后就进入到了无限for循环
for循环中我们关注:
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L
selectDeadLineNanos - currentTimeNanos+500000L代表截止时间-当前时间+0.5毫秒的调整时间,除以1000000表示将计算的时间转化为毫秒数
最后算出的时间就是selector操作的阻塞时间,并赋值到局部变量的timeoutMillis中
后面有个判断if(imeoutMillis<0),代表当前时间已经超过了最后截止时间+0.5毫秒, selectCnt == 0代表没有进行select操作,满足这两个条件,则执行selectNow()之后,将selectCnt赋值为1之后跳出循环
如果没超过截止时间,就进行了if(hasTasks() && wakenUp.compareAndSet(false,true))判断
这里我们关注hasTasks()方法,这里是判断当前NioEventLoop所绑定的taskQueue是否有任务,如果有任务,则执行selectNow()之后,将selectCnt赋值为1之后跳出循环(跳出循环之后去执行任务队列中的任务)
hasTasks()方法可以自己跟一下,非常简单
如果没有满足上述条件,就会执行intselectedKeys = selector.select(timeoutMillis)进行阻塞式轮询,并且自增轮询次数,而后会进行如下判断:
if (selectedKeys != 0 oldWakenUp wakenUp.get() hasTasks() hasScheduledTasks()) { break;}
selectedKeys != 0代表已经有轮询到的事件, oldWakenUp代表当前select操作是否需要唤醒, wakenUp.get()说明已经被外部线程唤醒, hasTasks()代表任务队列是否有任务, hasScheduledTasks()代表定时任务队列是否任务,满足条件之一,就跳出循环
longtime = System.nanoTime()记录了当前的时间,之后有个判断:
if(time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos)
这里的意思是当前时间-阻塞时间>方法开始执行的时间,这里说明已经完整的执行完成了一个阻塞的select()操作,将selectCnt设置成1
如果此条件不成立,说明没有完整执行select()操作,可能触发了一次空轮询,根据前一个selectCnt++这步我们知道,每触发一次空轮询selectCnt都会自增
之后会进入第二个判断
SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD
其中SELECTOR_AUTO_REBUILD_THRESHOLD默认是512,这个判断意思就是空轮询的次数如果超过512次,则会认为是发生了epoll bug,这样会通过rebuildSelector()方法重新构建selector,然后将重新构建的selector赋值到局部变量selector,执行一次selectNow(),将selectCnt初始化1,跳出循环
rebuildSelector()方法
rebuildSelector()方法中,看netty是如何解决epoll bug的
public void rebuildSelector() { //是否是由其他线程发起的 if (!inEventLoop()) { //如果是其他线程发起的, 将rebuildSelector()封装成任务队列, 由NioEventLoop进行调用 execute(new Runnable() { @Override public void run() { rebuildSelector(); } }); return; } final Selector oldSelector = selector; final Selector newSelector; if (oldSelector == null) { return; } try { //重新创建一个select newSelector = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } int nChannels = 0; for (;;) { try { //拿到旧select中所有的key for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { Object a = key.attachment(); //代码省略 //获取key注册的事件 int interestOps = key.interestOps(); //将key注册的事件取消 key.cancel(); //注册到重新创建的新的selector中 SelectionKey newKey = key.channel().register(newSelector, interestOps, a); //如果channel是NioChannel if (a instanceof AbstractNioChannel) { //重新赋值 ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { //代码省略 } } } catch (ConcurrentModificationException e) { continue; } break; } selector = newSelector; //代码省略}
首先会判断是不是当前NioEventLoop线程执行的,如果不是,则将构建方法封装成task由当前NioEventLoop执行
finalSelector oldSelector = selector表示拿到旧的selector
然后通过newSelector = openSelector()创建新的selector
通过for循环遍历所有注册在selector中的key
Object a = key.attachment()是获取channel,第一章讲过,在注册时,将自身作为属性绑定在key上
for循环体中,通过intinterestOps = key.interestOps()获取其注册的事件
key.cancel()将注册的事件进行取消
SelectionKey newKey = key.channel().register(newSelector, interestOps, a)
将channel以及注册的事件注册在新的selector中
if(ainstanceofAbstractNioChannel)判断是不是NioChannel
如果是NioChannel,则通过((AbstractNioChannel) a).selectionKey = newKey将自身的属性selectionKey赋值为新返回的key
selector = newSelector将自身NioEventLoop属性selector赋值为新创建的newSelector
至此,就是netty解决epoll bug的步骤,其实就是创建一个新的selector,将旧selector中注册的channel和事件重新注册到新的selector中,然后将自身selector属性替换成新创建的selector
以上就是Netty源码分析NioEventLoop执行select操作入口的详细内容,更多关于Netty分布式NioEventLoop selector操作的资料请关注盛行IT其它相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。