netty eventloop源码分析,netty 队列

  netty eventloop源码分析,netty 队列

  

目录

执行任务队列跟进runAllTasks方法:我们跟进fetchFromScheduledTaskQueue()方法回到runAllTasks(长时间超时)方法中回到runAllTasks(长时间超时)方法章节小结前文传送门:NioEventLoop处理超正析象管(图片Orthicon)事件

 

  

执行任务队列

继续回到NioEventLoop的运行()方法:

 

  受保护的void run(){ for(;){尝试{切换(选择策略。calculate strategy(select now supplier,hasTasks()){ case select strategy .继续:继续;案例选择策略。选择: //轮询超正析象管事件(1)选择(唤醒。getandset(false));如果(醒来。get()){ selector。醒来();}默认: }取消键=0;needsToSelectAgain=false//默认是50 final int io=this。io比率;if(io ratio==100){ try { processSelectedKeys();}最后{ run all tasks();} } else { //记录下开始时间最终长ioStartTime=system。纳米时间();尝试{ //处理轮询到的key(2)processSelectedKeys();}最后{ //计算耗时最终长io时间=系统。nano time()-ioStartTime;//执行task(3)runall tasks(io时间*(100-io比)/io比);} } } catch(Throwable t){ handleLoopException(t);} //代码省略}}我们看到处理完轮询到的键之后,首先记录下耗时,然后通过运行所有任务(io时间*(100-io比率)/io比率)执行任务队列中的任务

  我们知道ioRatio默认是50,所以执行完ioTime * (100 - ioRatio)/ioRatio后,方法传入的值为很多时候,也就是processSelectedKeys()的执行时间:

  

跟进runAllTasks方法:

受保护的布尔运行所有任务(长时间超时nanos){//定时任务队列中聚合任务fetchFromScheduledTaskQueue();//从普通taskQ里面拿一个任务runnable task=轮询task();//任务为空,则直接返回if (task==null) { //跑完所有的任务执行收尾的操作afterrunningtasks();返回false} //如果队列不为空//首先算一个截止时间(50毫秒,因为执行任务,不要超过这个时间)最终长截止时间

 

   = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //执行每一个任务 for (;;) { safeExecute(task); //标记当前跑完的任务 runTasks ++; //当跑完64个任务的时候, 会计算一下当前时间 if ((runTasks & 0x3F) == 0) { //定时任务初始化到当前的时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超过截止时间则不执行(nanoTime()是耗时的) if (lastExecutionTime >= deadline) { break; } } //如果没有超过这个时间, 则继续从普通任务队列拿任务 task = pollTask(); //直到没有任务执行 if (task == null) { //记录下最后执行时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true;}首先会执行fetchFromScheduledTaskQueue()这个方法,这个方法的意思是从定时任务队列中聚合任务,也就是将定时任务中找到可以执行的任务添加到taskQueue中

  

 

  

我们跟进fetchFromScheduledTaskQueue()方法

private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //从定时任务队列中抓取第一个定时任务 //寻找截止时间为nanoTime的任务 Runnable scheduledTask = pollScheduledTask(nanoTime); //如果该定时任务队列不为空, 则塞到普通任务队列里面 while (scheduledTask != null) { //如果添加到普通任务队列过程中失败 if (!taskQueue.offer(scheduledTask)) { //则重新添加到定时任务队列中 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //继续从定时任务队列中拉取任务 //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中 scheduledTask = pollScheduledTask(nanoTime); } return true;}
longnanoTime = AbstractScheduledEventExecutor.nanoTime()

 

  

代表从定时任务初始化到现在过去了多长时间

 

  

Runnable scheduledTask= pollScheduledTask(nanoTime)

 

  

代表从定时任务队列中拿到小于nanoTime时间的任务,因为小于初始化到现在的时间,说明该任务需要执行了

 

  跟到其父类AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:

  

protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); //拿到定时任务队列 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; //peek()方法拿到第一个任务 ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos() <= nanoTime) { //从队列中删除 scheduledTaskQueue.remove(); //返回该任务 return scheduledTask; } return null;}

我们看到首先获得当前类绑定的定时任务队列的成员变量

 

  如果不为空,则通过scheduledTaskQueue.peek()弹出第一个任务

  如果当前任务小于传来的时间,说明该任务需要执行,则从定时任务队列中删除

  我们继续回到fetchFromScheduledTaskQueue()方法中:

  

private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //从定时任务队列中抓取第一个定时任务 //寻找截止时间为nanoTime的任务 Runnable scheduledTask = pollScheduledTask(nanoTime); //如果该定时任务队列不为空, 则塞到普通任务队列里面 while (scheduledTask != null) { //如果添加到普通任务队列过程中失败 if (!taskQueue.offer(scheduledTask)) { //则重新添加到定时任务队列中 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //继续从定时任务队列中拉取任务 //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中 scheduledTask = pollScheduledTask(nanoTime); } return true;}

弹出需要执行的定时任务之后,我们通过taskQueue.offer(scheduledTask)添加到taskQueue中,如果添加失败,则通过

 

  

scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)

 

  

重新添加到定时任务队列中

 

  如果添加成功,则通过pollScheduledTask(nanoTime)方法继续添加,直到没有需要执行的任务

  这样就将定时任务队列需要执行的任务添加到了taskQueue中

  

 

  

回到runAllTasks(long timeoutNanos)方法中

protected boolean runAllTasks(long timeoutNanos) { //定时任务队列中聚合任务 fetchFromScheduledTaskQueue(); //从普通taskQ里面拿一个任务 Runnable task = pollTask(); //task为空, 则直接返回 if (task == null) { //跑完所有的任务执行收尾的操作 afterRunningAllTasks(); return false; } //如果队列不为空 //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //执行每一个任务 for (;;) { safeExecute(task); //标记当前跑完的任务 runTasks ++; //当跑完64个任务的时候, 会计算一下当前时间 if ((runTasks & 0x3F) == 0) { //定时任务初始化到当前的时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超过截止时间则不执行(nanoTime()是耗时的) if (lastExecutionTime >= deadline) { break; } } //如果没有超过这个时间, 则继续从普通任务队列拿任务 task = pollTask(); //直到没有任务执行 if (task == null) { //记录下最后执行时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true;}

首先通过Runnable task = pollTask()从taskQueue中拿一个任务

 

  任务不为空,则通过

  

finallongdeadline = ScheduledFutureTask.nanoTime() + timeoutNanos

 

  

计算一个截止时间,任务的执行时间不能超过这个时间

 

  然后在for循环中通过safeExecute(task)执行task

  我们跟到safeExecute(task)中:

  

protected static void safeExecute(Runnable task) { try { //直接调用run()方法执行 task.run(); } catch (Throwable t) { //发生异常不终止 logger.warn("A task raised an exception. Task: {}", task, t); }}

这里直接调用task的run()方法进行执行,其中发生异常,只打印一条日志,代表发生异常不终止,继续往下执行

 

  

 

  

回到runAllTasks(long timeoutNanos)方法

protected boolean runAllTasks(long timeoutNanos) { //定时任务队列中聚合任务 fetchFromScheduledTaskQueue(); //从普通taskQ里面拿一个任务 Runnable task = pollTask(); //task为空, 则直接返回 if (task == null) { //跑完所有的任务执行收尾的操作 afterRunningAllTasks(); return false; } //如果队列不为空 //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //执行每一个任务 for (;;) { safeExecute(task); //标记当前跑完的任务 runTasks ++; //当跑完64个任务的时候, 会计算一下当前时间 if ((runTasks & 0x3F) == 0) { //定时任务初始化到当前的时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超过截止时间则不执行(nanoTime()是耗时的) if (lastExecutionTime >= deadline) { break; } } //如果没有超过这个时间, 则继续从普通任务队列拿任务 task = pollTask(); //直到没有任务执行 if (task == null) { //记录下最后执行时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true;}

每次执行完task, runTasks自增

 

  这里if((runTasks & 0x3F) == 0)代表是否执行了64个任务,如果执行了64个任务,则会通过lastExecutionTime = ScheduledFutureTask.nanoTime()记录定时任务初始化到现在的时间,如果这个时间超过了截止时间,则退出循环

  如果没有超过截止时间,则通过task = pollTask()继续弹出任务执行

  这里执行64个任务统计一次时间,而不是每次执行任务都统计,主要原因是因为获取系统时间是个比较耗时的操作,这里是netty的一种优化方式

  如果没有task需要执行,则通过afterRunningAllTasks()做收尾工作,最后记录下最后的执行时间

  以上就是有关执行任务队列的相关逻辑

  

 

  

章节小结

本章学习了有关NioEventLoopGroup的创建, NioEventLoop的创建和启动,以及多路复用器的轮询处理和task执行的相关逻辑,通过本章学习,我们应该掌握如下内容:

 

   1. NioEventLoopGroup如何选择分配NioEventLoop

   2. NioEventLoop如何开启

   3. NioEventLoop如何进行select操作

   4. NioEventLoop如何执行task

  以上就是Netty分布式NioEventLoop任务队列执行源码分析的详细内容,更多关于Netty分布式NioEventLoop执行任务队列的资料请关注盛行IT其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: