最近在做项目的时候,需要用到spring的定时任务模块做任务的调度。之前看网上的文章说,spring在默认的配置中都是使用的单线程来跑任务,如果某一个任务执行时间比较长,那么将会影响后续的任务执行,因此,大致看了下对应的源码,记录下来。
@EnableScheduling
该注解是用于开启spring定时任务的,具体的使用代码如下:
@EnableScheduling @Configuration public class ScheduledConfiguration { @Bean(destroyMethod = "destroy") public ThreadPoolTaskScheduler threadPoolTaskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(10); scheduler.setThreadNamePrefix("schedule-job-trigger-"); scheduler.initialize(); return scheduler; } }
这段代码主要是自定义了执行的线程池信息,包括了核心线程数,线程的名称已经初始化的操作。
那接下就看下这个注解的源码信息吧,
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(SchedulingConfiguration.class) @Documented public @interface EnableScheduling { }
从这个注解中可以看出,该注解引入了ShedulingConfiguration配置类。
ShedulingConfiguration
@Configuration(proxyBeanMethods = false) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class SchedulingConfiguration { @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new ScheduledAnnotationBeanPostProcessor(); } }
这个配置类也是比较简单的,就是创建了SheduledAnnotationPostProcessor对象,通过名称可以看出来,这个类其实就是spring的BeanPostProcessor的实现,主要用于业务的扩展。
SheduledAnnotationPostProcessor
该类可以看下对应的类型的集成关系:
从继承关系来看,主要还是基础的一些bean的初始化销毁的逻辑、时间监听等。
因为我们关注点主要在于线程池的使用,因此,我们就直接看初始化的代码,查看是怎么匹配对应的线程池的逻辑的。
从上面代码中,可以看出,在容器启动完成或者刷新之后或者bean在初始化完成后,都会调用finishRegistration()方法。上面两个方法其实完成的事情是一样的,唯一的区别在于是否使用spring进行启动和对bean的管理。
finishRegistration()
private void finishRegistration() { // 如果显式设置了scheduler, 这直接设置 if (this.scheduler != null) { this.registrar.setScheduler(this.scheduler); } // 查找SchedulingConfigurer配置类,并调用 if (this.beanFactory instanceof ListableBeanFactory) { Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(configurers); for (SchedulingConfigurer configurer : configurers) { configurer.configureTasks(this.registrar); } } // 如果程序中配置的有任务,并且scheduler为空,则查找TaskScheduler的实例 if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type"); try { // Search for TaskScheduler bean... this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false)); } catch (NoUniqueBeanDefinitionException ex) { if (logger.isTraceEnabled()) { logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " + ex.getMessage()); } try { // 如果TaskScheudler实例不止一个,则按照名称查找 this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true)); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskScheduler bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { if (logger.isTraceEnabled()) { logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " + ex.getMessage()); } // Search for ScheduledExecutorService bean next... try { // 如果TaskScheudler实例没有配置,则查找ScheduledExecutorService的bean this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false)); } catch (NoUniqueBeanDefinitionException ex2) { if (logger.isTraceEnabled()) { logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " + ex2.getMessage()); } try { // 如果ScheduledExecutorService的bean有多个,则按照名称查找 this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true)); } catch (NoSuchBeanDefinitionException ex3) { if (logger.isInfoEnabled()) { logger.info("More than one ScheduledExecutorService bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex2.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex2) { if (logger.isTraceEnabled()) { logger.trace("Could not find default ScheduledExecutorService bean - falling back to default: " + ex2.getMessage()); } // Giving up -> falling back to default scheduler within the registrar... logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing"); } } } // 初始化 this.registrar.afterPropertiesSet(); }
这上面的逻辑中,实际上就是从spring容器中查找ScheduledExecutorService和TaskScheduler两个实现类,如果在spring中能够找到则设置执行线程池即可。
接下来最关键的一步,就是当我们没有配置时,spring会为我们自动的创建一个线程池,主要是通过registrar.afterPropertiesSet()方法来实现的,具体代码如下:
protected void scheduleTasks() { if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } }
嗯,从上面可以看出,当我们没有设置自定义的线程池时,是通过Executors.newSingleThreadScheduledExecutor()方法来创建线程池,这个线程池只有一个线程。因此当某一个任务执行时间太长的时候,就会导致其他任务被推迟执行。
因此,在默认情况下Executors.newSingleThreadScheduledExecutor()创建的是一个单线程的线程池,任务执行时间会影响整个任务的调度的。但是,某个任务出现异常,并不会导致后续任务无法执行。因此为了减少这种影响,我们只需要通过创建ScheduledExecutorService或者TaskScheduler来自定义线程池执行即可。
加油哦 💡