spring定时任务的实现,spring boot动态定时任务
目录
为什么需要定时任务爪哇定时任务的原理计时器TimerTaskScheduledThreadPoolExecutorTimer VS ScheduledThreadPoolExecutorSpring定时任务@已安排定时任务原理(源码)
为什么需要定时任务
定时任务的应用场景十分广泛,如定时清理文件、定时生成报表、定时数据同步备份等。
Java定时任务的原理
jdk自带的库中,有两种技术可以实现定时任务,一种是计时器,另一种是ScheduledThreadPoolExecutor
Timer+TimerTask
计时器是一个线程,控制执行任务所需要执行的内容
公共类计时器{ /** *计时器任务队列。这个数据结构与计时器*线程共享。计时器通过其各种调度调用*产生任务,计时器线程消耗任务,适当地执行计时器任务,*并在它们过时时从队列中删除它们* private最终任务队列队列=新任务队列();/** *计时器线程. private final TimerThread thread=new TimerThread(queue);}其中,需要注意,计时器类有几个方法创建不同的线程执行:
延时执行
//其中的耽搁是延时时间,表示多少毫秒后执行一次任务公共无效计划(TimerTask任务,长延迟){ if(延迟0)抛出新的IllegalArgumentException(负延迟。);sched(任务,系统。当前时间毫秒()延迟,0);}指定时间点执行
//到达指定时间时间的时候执行一次task public void schedule(TimerTask task,Date time) { sched(task,time.getTime(),0);}延时周期执行
//经过耽搁毫秒后按每时期毫秒执行一次的周期执行任务公共无效计划(TimerTask任务,长延迟,长周期){ if(延迟0)抛出新的IllegalArgumentException(负延迟。);if(周期=0)抛出新的IllegalArgumentException(非正句点。);sched(任务,系统。当前时间毫秒()延迟,-周期);}指定时间点后周期执行
//到达指定时间第一次之后按照每时期毫秒执行一次的周期执行task public void schedule(TimerTask task,Date firstTime,long period){ if(period=0)throw new IllegalArgumentException(非正句号.);sched(task,firstTime.getTime(),-period);}TimerTask是一个实现了可运行的接口的类,所以能够放到线程去执行:
公共摘要
class TimerTask implements Runnable { /** * This object is used to control access to the TimerTask internals. */ final Object lock = new Object(); 。。。。。。}示例:
public class JavaTimerJob { public static void main(String[] args) { Timer timer = new Timer(); Task task = new Task(); //当前时间开始,每1秒执行一次 timer.schedule(task, new Date(),1000); } }class Task extends TimerTask { @Override public void run() { System.out.println(new Date()+": This is my job..."); }}
执行结果:
Tue May 30 13:45:47 CST 2022: This is my job...Tue May 30 13:45:48 CST 2022: This is my job...Tue May 30 13:45:49 CST 2022: This is my job...Tue May 30 13:45:50 CST 2022: This is my job...。。。。
弊端:Timer是单线程的,一旦定时任务中某一过程时刻抛出异常,将会导致整体线程停止,定时任务停止。
ScheduledThreadPoolExecutor
继承了ThreadPoolExecutor
,,是一个基于线程池的调度器 通过实现ScheduledExecutorService
接口方法去实现任务调度,主要方法如下:
延时执行
//command是待执行的线程,delay表示延时时长,unit代表时间单位public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t;}
延时周期执行
//command是待执行的线程,initialDelay表示延时时长,period代表执行间隔时长,unit代表时间单位public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t;}
每段延时间隔执行
//command是待执行的线程,initialDelay表示延时时长,delay代表每次执行线程前的延时时长,unit代表时间单位public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t;}
示例:
public class JavaScheduledThreadPoolExecutor { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(8); //延时1秒后开始执行,每3秒执行一次 scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println(new Date()+": This is my job..."); } }, 1, 3, TimeUnit.SECONDS); }}
执行结果:
Tue May 30 15:05:16 CST 2022: This is my job...Tue May 30 15:05:19 CST 2022: This is my job...Tue May 30 15:05:22 CST 2022: This is my job...Tue May 30 15:05:25 CST 2022: This is my job...。。。。。
Timer VS ScheduledThreadPoolExecutor
Timer
是单线程,如果开启多个线程服务,将会出现竞争,一旦出现异常,线程停止,定时任务停止;兼容性更高,jdk1.3后使用ScheduledThreadPoolExecutor
基于线程池实现多线程,且自动调整线程数,线程出错并不会影响整体定时任务执行。在jdk1.5后可使用
Spring定时任务
Spring原生定时任务主要依靠@Scheduled
注解实现:
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@Repeatable(Schedules.class)public @interface Scheduled { String CRON_DISABLED = "-"; String cron() default ""; //类似于corn表达式,可以指定定时任务执行的延迟及周期规则 String zone() default ""; //指明解析cron表达式的时区。 long fixedDelay() default -1; //在最后一次调用结束和下一次调用开始之间以固定周期(以毫秒为单位)执行带注解的方法。(要等待上次任务完成后) String fixedDelayString() default ""; //同上面作用一样,只是String类型 long fixedRate() default -1; //在调用之间以固定的周期(以毫秒为单位)执行带注解的方法。(不需要等待上次任务完成) String fixedRateString() default ""; //同上面作用一样,只是String类型 long initialDelay() default -1; //第一次执行fixedRate()或fixedDelay()任务之前延迟的毫秒数 。 String initialDelayString() default ""; //同上面作用一样,只是String类型}
Spring静态定时任务示例:
@Slf4j@Componentpublic class TestJob { //每40秒执行一次 @Scheduled(cron = "0/40 * * * * ?") public void logJob(){ if(log.isDebugEnabled()){ log.debug("现在是:{}",LocalDateTime.now()); } }}
执行结果:
现在是:2022-05-30T16:03:40.006现在是:2022-05-30T16:04现在是:2022-05-30T16:04:40.003
@Scheduled定时任务原理(源码)
①项目启动扫描带有注解@Scheduled
的所有方法信息由ScheduledAnnotationBeanPostProcessor
的postProcessAfterInitialization
方法实现功能:
public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean bean instanceof TaskScheduler bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass)) { //获取定时任务的方法 Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledMethods) -> //调用processScheduled方法将定时任务方法存放到任务队列中 scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean " + beanName + ": " + annotatedMethods); } } } return bean;}
②调用processScheduled
方法将定时任务方法存放到任务队列中
protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { //创建任务线程 Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = "Exactly one of the cron, fixedDelay(String), or fixedRate(String) attributes is required"; Set<ScheduledTask> tasks = new LinkedHashSet<>(4); //解析任务执行初始延迟 long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify initialDelay or initialDelayString, not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value "" + initialDelayString + "" - cannot parse into long"); } } } //解析cron表达式 String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "initialDelay not supported for cron triggers"); processedSchedule = true; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } // At this point we dont need to differentiate between initial delay set or not anymore if (initialDelay < 0) { initialDelay = 0; } //解析fixedDelay参数 long fixedDelay = scheduled.fixedDelay(); if (fixedDelay >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; //存放任务到任务队列中 tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { if (this.embeddedValueResolver != null) { fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedDelay = parseDelayAsLong(fixedDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value "" + fixedDelayString + "" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } } //解析fixedRate参数 long fixedRate = scheduled.fixedRate(); if (fixedRate >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this.embeddedValueResolver != null) { fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedRate = parseDelayAsLong(fixedRateString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value "" + fixedRateString + "" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } } // 断言检查 Assert.isTrue(processedSchedule, errorMessage); //并发控制将任务队列存入注册任务列表 synchronized (this.scheduledTasks) { Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method " + method.getName() + ": " + ex.getMessage()); }}
③将任务解析并添加到任务队列后,交由ScheduledTaskRegistrar
类的scheduleTasks
方法添加(注册)定时任务到环境中:
protected void scheduleTasks() { if (this.taskScheduler == null) { //获取ScheduledExecutorService对象,实际上都是使用ScheduledThreadPoolExecutor执行定时任务调度 this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } }}private void addScheduledTask(@Nullable ScheduledTask task) { if (task != null) { this.scheduledTasks.add(task); }}
由上述源码可以看出,Spring原生定时任务的大概步骤如下:
1.扫描带@Scheduled注解的类和方法(ScheduledAnnotationBeanPostProcessor.postProcessAfterInitialization(........))
2.将定时任务解析完成后加入任务队列(ScheduledAnnotationBeanPostProcessor.processScheduled(........))
3.将定时任务注册到当前运行环境,等待执行(ScheduledTaskRegistrar.scheduleTasks(.......)) 且@Scheduled的底层调度实现是ScheduledThreadPoolExecutor
以上就是一文搞懂如何实现Java,Spring动态启停定时任务的详细内容,更多关于Java Spring启停定时任务的资料请关注盛行IT其它相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。