springboot定时任务实现的几种方式,springboot定时任务手动调用

  springboot定时任务实现的几种方式,springboot定时任务手动调用

  

目录

一背景二动态定时任务调度三多节点任务执行问题四后记

 

  

一 背景

项目中需要一个可以动态新增定时定时任务的功能,现在项目中使用的是xxl-工单定时任务调度系统,但是经过一番对xxl-工单功能的了解,发现xxl-工单对项目动态新增定时任务,动态删除定时任务的支持并不是那么好,所以需要自己手动实现一个定时任务的功能

 

  

二 动态定时任务调度

1 技术选择

 

  定时器或调度执行服务

  这两个都能实现定时任务调度,先看下计时器的定时任务调度

  公共类MyTimerTask扩展TimerTask {私有字符串名称公共MyTimerTask(字符串名称){ this . name=name } public String getName(){ return name;} public void set name(String name){ this。name=名称;} @覆盖公共void run(){//任务日历实例=日历。getinstance();System.out.println(新的简单日期格式( yyyy-MM-dd HH:mm:ss ).格式(实例。gettime());} }定时器Timer=新计时器();MyTimerTask timerTask=new MyTimerTask( no。1 );//首次执行,在当前时间的数字一(一)秒以后,之后每隔两秒钟执行一次timer.schedule(timerTask,1000L,2000 l);在看下ScheduledThreadPoolExecutor的实现

  //org。阿帕奇。公地。郎3。并发。basicthreadfactoryscheduledexecutorservice executorService=new ScheduledThreadPoolExecutor(1,new BasicThreadFactory .构建器()。命名模式(“示例-计划-池-% d”).守护进程(真)。build());执行服务。scheduleatfixedrate(new Runnable(){ @ Override public void run(){//do something } },initialDelay,period,TimeUnit .小时);两个都能实现定时任务,那他们的区别呢,使用阿里p3c会给出建议和区别

  多线程并行处理定时任务时,计时器运行多个时间任务时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。从建议上来看,是一定要选择ScheduledExecutorService了,我们看看源码看看为什么计时器出现问题会终止执行

  /** *计时器线程. private final TimerThread thread=new TimerThread(queue);公共计时器(){ this( Timer-序列号());}公共计时器(字符串名){ thread。setname(名称);线程。start();}新建对象时,我们看到开启了一个线程,那么这个线程在做什么呢?一起看看

  类定时器线程扩展线程{ boolean newTasksMayBeScheduled=true;/** * 每一件一个任务都是一个quene */私有任务队列队列;定时器线程(任务队列队列){ this.queu

  e = queue; } public void run() { try { mainLoop(); } finally { // Someone killed this Thread, behave as if Timer cancelled synchronized(queue) { newTasksMayBeScheduled = false; queue.clear(); // 清除所有任务信息 } } } /** * The main timer loop. (See class comment.) */ private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { // Wait for queue to become non-empty while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) // Task hasnt yet fired; wait queue.wait(executionTime - currentTime); } if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } }}我们看到,执行了 mainLoop(),里面是 while (true)方法无限循环,获取程序中任务对象中的时间和当前时间比对,相同就执行,但是一旦报错,就会进入finally中清除掉所有任务信息。

  这时候我们已经找到了答案,timer是在被实例化后,启动一个线程,不间断的循环匹配,来执行任务,他是单线程的,一旦报错,线程就终止了,所以不会执行后续的任务,而ScheduledThreadPoolExecutor是多线程执行的,就算其中有一个任务报错了,并不影响其他线程的执行。

  2 使用ScheduledThreadPoolExecutor

  从上面看,使用ScheduledThreadPoolExecutor还是比较简单的,但是我们要实现的更优雅一些,所以选择 TaskScheduler来实现

  

@Componentpublic class CronTaskRegistrar implements DisposableBean { private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16); @Autowired private TaskScheduler taskScheduler; public TaskScheduler getScheduler() { return this.taskScheduler; } public void addCronTask(Runnable task, String cronExpression) { addCronTask(new CronTask(task, cronExpression)); } private void addCronTask(CronTask cronTask) { if (cronTask != null) { Runnable task = cronTask.getRunnable(); if (this.scheduledTasks.containsKey(task)) { removeCronTask(task); } this.scheduledTasks.put(task, scheduleCronTask(cronTask)); } } public void removeCronTask(Runnable task) { Set<Runnable> runnables = this.scheduledTasks.keySet(); Iterator it1 = runnables.iterator(); while (it1.hasNext()) { SchedulingRunnable schedulingRunnable = (SchedulingRunnable) it1.next(); Long taskId = schedulingRunnable.getTaskId(); SchedulingRunnable cancelRunnable = (SchedulingRunnable) task; if (taskId.equals(cancelRunnable.getTaskId())) { ScheduledTask scheduledTask = this.scheduledTasks.remove(schedulingRunnable); if (scheduledTask != null){ scheduledTask.cancel(); } } } } public ScheduledTask scheduleCronTask(CronTask cronTask) { ScheduledTask scheduledTask = new ScheduledTask(); scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger()); return scheduledTask; } @Override public void destroy() throws Exception { for (ScheduledTask task : this.scheduledTasks.values()) { task.cancel(); } this.scheduledTasks.clear(); }}

TaskScheduler是本次功能实现的核心类,但是他是一个接口

 

  

public interface TaskScheduler { /** * Schedule the given {@link Runnable}, invoking it whenever the trigger * indicates a next execution time. * <p>Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param trigger an implementation of the {@link Trigger} interface, * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object * wrapping a cron expression * @return a {@link ScheduledFuture} representing pending completion of the task, * or {@code null} if the given Trigger object never fires (i.e. returns * {@code null} from {@link Trigger#nextExecutionTime}) * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) * @see org.springframework.scheduling.support.CronTrigger */ @Nullable ScheduledFuture<?> schedule(Runnable task, Trigger trigger);

前面的代码可以看到,我们在类中注入了这个类,但是他是接口,我们怎么知道是那个实现类呢,以往出现这种情况要在类上面加@Primany或者@Quality来执行实现的类,但是我们看到我的注入上并没有标记,因为是通过另一种方式实现的

 

  

@Configurationpublic class SchedulingConfig { @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); // 定时任务执行线程池核心线程数 taskScheduler.setPoolSize(4); taskScheduler.setRemoveOnCancelPolicy(true); taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-"); return taskScheduler; }}

在spring初始化时就注册了Bean TaskScheduler,而我们可以看到他的实现是ThreadPoolTaskScheduler,在网上的资料中有人说ThreadPoolTaskScheduler是TaskScheduler的默认实现类,其实不是,还是需要我们去指定,而这种方式,当我们想替换实现时,只需要修改配置类就行了,很灵活。

 

  而为什么说他是更优雅的实现方式呢,因为他的核心也是通过ScheduledThreadPoolExecutor来实现的

  

public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException { Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized"); return this.scheduledExecutor;}

 

  

三 多节点任务执行问题

这次的实现过程中,我并没有选择xxl-job来进行实现,而是采用了TaskScheduler来实现,这也产生了一个问题,xxl-job是分布式的程序调度系统,当想要执行定时任务的应用使用xxl-job时,无论应用程序中部署多少个节点,xxl-job只会选择其中一个节点作为定时任务执行的节点,从而不会产生定时任务在不同节点上同时执行,导致重复执行问题,而使用TaskScheduler来实现,就要考虑多节点重复执行问题。当然既然有问题,就有解决方案

 

  · 方案一 将定时任务功能拆出来单独部署,且只部署一个节点 · 方案二 使用redis setNx的形式,保证同一时间只有一个任务在执行

  我选择的是方案二来执行,当然还有一些方式也能保证不重复执行,这里就不多说了,一下是我的实现

  

public void executeTask(Long taskId) { if (!redisService.setIfAbsent(String.valueOf(taskId),"1",2L, TimeUnit.SECONDS)) { log.info("已有执行中定时发送短信任务,本次不执行!"); return; }

 

  

四 后记

其实定时任务应该每一个开发都会用到的工具,以前并没有了解其中的实现,这次的功能开发过程中也算是对其内涵的进一步了解,以后遇到定时任务的处理也更清晰,更有效率了。

 

  到此这篇关于SpringBoot定时任务功能详细解析的文章就介绍到这了,更多相关SpringBoot定时任务内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: