Spring Scheduler实现解析
Spring3.0之后, 增加了调度器功能, 提供的@Schedule注解, 想必大家都用过, 有了它, 基本上Quartz就可以下岗了. 那么它内部是如何实现的呢?
首先给出几个结论:
- 调度器本质上还是通过juc的ScheduledExecutorService进行的
- 调度器启动后你无法通过修改系统时间达到让它马上执行的效果
- 被@Schedule注解的方法如果有任何Throwable出现, 不会中断后续Task, 只会打印Error日志
主要与这几个类相关: ScheduledAnnotationBeanPostProcessor, ScheduledTaskRegistrar, TaskScheduler, ReschedulingRunnable
1. ScheduledAnnotationBeanPostProcessor
写在前面:
BeanPostProcessor 是Spring中的基础组件, 可以帮助我们进行Bean的预处理和后处理, 请注意, 它是对容器中所有的bean都生效的, 所以如果自己实现的话必须包含过滤条件. Spring组件中, 一共有不到20个BeanPostProcessor, 需要顺序的时候则实现Ordered接口自定义优先级即可
功能介绍:
负责@Schedule注解的扫描, 构建ScheduleTask向ScheduledTaskRegistrar中注册, 调用ScheduledTaskRegistrar.afterPropertiesSet()
核心代码:
public Object postProcessAfterInitialization(final Object bean, String beanName) {
final Class<?> targetClass = AopUtils.getTargetClass(bean);
ReflectionUtils.doWithMethods(targetClass, new MethodCallback() {
public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
Scheduled annotation = AnnotationUtils.getAnnotation(method, Scheduled.class);//查找注解
if (annotation != null) {
//省略
//构建CronTask并注册到ScheduledTaskRegistrar
String cron = annotation.cron();
if (!"".equals(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (embeddedValueResolver != null) {
cron = embeddedValueResolver.resolveStringValue(cron);
}
registrar.addCronTask(new CronTask(runnable, cron));
}
//省略
}
}
}
}
public void onApplicationEvent(ContextRefreshedEvent event) {
//省略
this.registrar.afterPropertiesSet();
}
}
2. ScheduledTaskRegistrar
功能介绍:
它是Schedule中所支持的三种Task的一个容器, 内部维护了这些Task List和executor的引用, 并负责将Task置入executor中执行
核心代码:
protected void scheduleTasks() {
if (this.cronTasks != null) {
for (CronTask task : cronTasks) {
this.scheduledFutures.add(this.taskScheduler.schedule(
task.getRunnable(), task.getTrigger()));
}
}
}
3. TaskScheduler
功能介绍:
TaskScheduler是Spring中专门用于进行定时操作的接口, 主要的实现类有三个 ThreadPoolTaskScheduler, ConcurrentTaskScheduler, TimerManagerTaskScheduler, 前两个delegate juc里面的ScheduledExecutor, 最后一个delegate commonj.timers.TimerManager. 这个类的作用主要是将task和executor用ReschedulingRunnable包装起来进行生命周期管理
核心代码:
public ScheduledFuture schedule(Runnable task, Trigger trigger) {
ScheduledExecutorService executor = getScheduledExecutor();
try {
ErrorHandler errorHandler =
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));//无默认handler, 则只打印LOG 不进行rethrow
return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
4. ReschedulingRunnable
写在前面:
DelegatingErrorHandlingRunnable将Runnable代理, 进行异常捕获并可以使用自己的handler进行处理
handler有两种, 通过TaskUtils.getDefaultErrorHandler(true)可以看到, 如果是true, 则说明是可重复任务, 则有异常捕获后只进行Log, false则Log之后rethrow Exception
功能介绍:
主要进行task的提交, 执行, 重提交, 取消, 这一套生命周期管理
public ScheduledFuture schedule() {
synchronized (this.triggerContextMonitor) {
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);//根据当前时间和Crontab的格式, 找出下一次执行的时间点, 比如 2015-01-02 23:00:00 CST, 注意这里的时区使用的是Default的
if (this.scheduledExecutionTime == null) {
return null;
}
long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();//将下一次执行时间点与当前时间相减, 得到等待时间, 所以在task启动的时候就已经定了下一次执行还有多少ms了
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS); //调用ScheduledExecutor执行Task, 进行Schedule
return this;
}
}
@Override
public void run() {
Date actualExecutionTime = new Date();
super.run();//执行被@Schedule注解的方法
Date completionTime = new Date();
synchronized (this.triggerContextMonitor) {
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
if (!this.currentFuture.isCancelled()) {
schedule();//如果未被取消则重新调度
}
}
}
整个调度器的结构就是这样, 主要要熟悉 DelegatingErrorHandlingRunnable, BeanPostProcessor, Ordered, EmbeddedValueResolverAware, ApplicationContextAware 这些Spring的基本组件, 万变不离其宗