netty eventloop源码分析,netty的eventloop
目录
EventLoopGroup介绍函数1:我们先来看看注册的通道函数2:执行一些可运行的任务EventLoop介绍NioEventLoop介绍EpollEventLoop介绍后续
00-1010上一篇文章提到EventLoopGroup主要负责2件事,这里再重复一遍:
它主要包括两个功能:注册通道和执行一些可运行的任务。
00-1010通道在选择器上注册,选择器会调度通道的相关事件,如读、写、接受等事件。
EventLoopGroup的设计是它包含多个EventLoop(每个event loop内部通常包含一个线程)。在执行上述注册的过程中,需要选择EventLoopGroup中的一个来执行上述注册行为。这里有一个选择政策的问题。选择策略的界面是EventExecutorChooser,您也可以自定义一个实现。
从上面可以看出,EventLoopGroup所做的大部分工作都是初始化上述多个Eventloops、EventExecutorChooser等一般性工作。具体的注册通道还是交给它内部的EventLoop来实现。
EventLoopGroup介绍
EventLoopGroup继承了EventExecutorGroup,它也是EventExecutors的集合。EventExecutorGroup还负责EventExecutor的初始化。EventExecutorGroup还选择它的一个内部EventExecutors来执行Runnable任务的具体执行。
netty中的许多任务都是异步执行的。一旦当前线程要对EventLoop进行相关操作,比如向EventLoop注册通道,如果当前线程和要操作的EventLoop内部的线程不一样,当前线程只是向EventLoop提交一个注册任务,对外返回一个ChannelFuture。
总结:EventLoopGroup包含了以上两个函数,更多的是一个集合,但是具体的函数实现是选择一个内部的item元素来执行相关的任务。这里的内部item元素通常同时实现EventLoop和EventExecutor,比如NioEventLoop等。
我们继续看EventLoopGroup的整体类图。
从图中可以看出,有两个分支:
1 MultithreadEventLoopGroup:用于封装多线程的初始化逻辑,指定线程数量等。即初始化相应数量的EventLoops,并将每个eventloop分配给一个线程。
上图中的newChild方法,NioEventLoopGroup由NioEventLoop实现,EpollEventLoopGroup由EpollEventLoop实现。
如NioEventLoopGroup的实现:
受保护的EventLoop newChild(执行器执行器,对象.args)抛出异常{ return new NioEventLoop(this,executor,(SelectorProvider) args[0],((SelectStrategyFactory) args[1])。newSelectStrategy(),(RejectedExecutionHandler)args[2]);}2 EventLoop接口实现EventLoopGroup接口,主要是因为EventLoopGroup中的函数接口仍然依赖于内部的EventLoop来完成具体的操作。
功能1:先来看看注册Channel
EventLoop的主要工作是注册通道,对通道的读写等事件进行监控和管理,涉及到不同的监控方法。在linux下,有三种方法可以监控事件。
选择、轮询、epoll
java选择器接口的当前实现如下:
PollSelectorImpl:已实现轮询模式。
EPollSelectorImpl:执行epoll模式。
Netty使用以下方法:
NioEventLoop:采用jdk。
Selector接口(使用PollSelectorImpl的poll方式)来实现对Channel的事件检测
EpollEventLoop
:没有采用jdk Selector的接口实现EPollSelectorImpl,而是Netty自己实现的epoll方式来实现对Channel的事件检测,所以在EpollEventLoop中就不存在jdk的Selector。
NioEventLoop介绍
对于NioEventLoopGroup的功能,NioEventLoop都要做实际的实现,NioEventLoop既要实现注册功能,又要实现运行Runnable任务
对于注册Channel:NioEventLoop将Channel注册到NioEventLoop内部的PollSelectorImpl上,来监听该Channel的读写事件
对于运行Runnable任务:NioEventLoop的父类的父类SingleThreadEventExecutor实现了运行Runnable任务,在SingleThreadEventExecutor中,有一个任务队列还有一个分配的线程
private final Queue<Runnable> taskQueue;private volatile Thread thread;
NioEventLoop在该线程中不仅要执行Selector带来的IO事件,还要不断的从上述taskQueue中取出任务来执行这些非IO事件。下面我们来详细看下这个过程
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { ... } }}
来详细说下这个过程:
1 计算当前是否需要执行select过程如果当前没有Runnable任务,则执行select(这个select过程稍后详细来说)。
如果当前有Runnable任务,则要去执行处理流程,此时顺便执行下selector.selectNow(),万一有事件发生那就赚了,没有白走这次处理流程
2 根据IO任务的时间占比设置来执行IO任务和非IO任务,即上面提到的Runnable任务如果ioRatio=100则每次都是执行全部的IO任务,执行全部的非IO任务 默认ioRatio=50,即一半时间用于处理IO任务,另一半时间用于处理非IO任务。怎么去控制非IO任务所占用时间呢?
这里是每执行64个非IO任务(这里可能是每个非IO任务比较短暂,减少一些判断带来的消耗)就判断下占用时间是否超过了上述时间限制
接下来详细看下上述select过程
Selector selector = this.selector;try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } // If a task was submitted when wakenUp value was true, the task didnt get a chance to call // Selector#wakeup. So we need to check task queue again before executing select operation. // If we dont, the task might be pended until select operation was timed out. // It might be pended until idle timeout if IdleStateHandler existed in pipeline. if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 oldWakenUp wakenUp.get() hasTasks() hasScheduledTasks()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; } if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or its client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; }} catch (CancelledKeyException e) {...}
1 首先计算此次select过程的截止时间
protected long delayNanos(long currentTimeNanos) { ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); if (scheduledTask == null) { return SCHEDULE_PURGE_INTERVAL; } return scheduledTask.delayNanos(currentTimeNanos); }
这里其实就是从一个定时 任务队列中取出定时任务,如果有则计算出离当前定时任务的下一次执行时间之差,如果没有则按照固定的1s作为select过程的时间
2 将当前时间差转化成ms如果当前时间差不足0.5ms的话,即timeoutMillis<=0,并且是第一次执行,则认为时间太短执行执行一次selectNow
3 如果有任务,则立即执行一次selectNow,跳出for循环4 然后就是普通的selector.select(timeoutMillis)在这段时间内如果有事件则跳出for循环,如果没有事件则已经花费对应的时间差了,再次执行for循环,计算的timeoutMillis就会小于0,也会跳出for循环
在上述逻辑中,基本selectCnt都是1,不会出现很多次,而这里针对selectCnt有很多次的处理是基于一个情况:
selector.select(timeoutMillis)
Selector的正常逻辑是一旦有事件就返回,没有事件则最多等待timeoutMillis时间。 然而底层操作系统实现可能有bug,会出现:即使没有产生事件就直接返回了,并没有按照要求等待timeoutMillis时间。
现在的解决办法就是: 记录上述出现的次数,一旦超过512这个阈值(可设置),就重新建立新的Selector,并将之前的Channel也全部迁移到新的Selector上
至此,NioEventLoop的主逻辑流程就介绍完了,之后就该重点介绍其中对于IO事件的处理了。然后就会引出来ChannelPipeline的处理流程
EpollEventLoop介绍
EpollEventLoop和NioEventLoop的主流程逻辑基本上是差不多的,不同之处就在于EpollEventLoop用epoll方式替换NioEventLoop中的PollSelectorImpl的poll方式。
这里不再详细说明了,之后会详细的说明Netty的epoll方式和jdk中的epoll方式的区别。
后续
下一篇就要详细描述下NioEventLoop对于IO事件的处理,即ChannelPipeline的处理流程。
以上就是分布式Netty源码分析EventLoopGroup及介绍的详细内容,更多关于分布式Netty EventLoopGroup源码分析的资料请关注盛行IT其它相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。