java / spring · 24 5 月, 2024 1

spring schedule线程池配置实现原理

最近在做项目的时候,需要用到spring的定时任务模块做任务的调度。之前看网上的文章说,spring在默认的配置中都是使用的单线程来跑任务,如果某一个任务执行时间比较长,那么将会影响后续的任务执行,因此,大致看了下对应的源码,记录下来。

@EnableScheduling

该注解是用于开启spring定时任务的,具体的使用代码如下:

@EnableScheduling
@Configuration
public class ScheduledConfiguration {

    @Bean(destroyMethod = "destroy")
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10);
        scheduler.setThreadNamePrefix("schedule-job-trigger-");
        scheduler.initialize();
        return scheduler;
    }
}

这段代码主要是自定义了执行的线程池信息,包括了核心线程数,线程的名称已经初始化的操作。

那接下就看下这个注解的源码信息吧,

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}

从这个注解中可以看出,该注解引入了ShedulingConfiguration配置类。

ShedulingConfiguration

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {

    @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
        return new ScheduledAnnotationBeanPostProcessor();
    }

}

这个配置类也是比较简单的,就是创建了SheduledAnnotationPostProcessor对象,通过名称可以看出来,这个类其实就是spring的BeanPostProcessor的实现,主要用于业务的扩展。

SheduledAnnotationPostProcessor

该类可以看下对应的类型的集成关系:

从继承关系来看,主要还是基础的一些bean的初始化销毁的逻辑、时间监听等。

因为我们关注点主要在于线程池的使用,因此,我们就直接看初始化的代码,查看是怎么匹配对应的线程池的逻辑的。

从上面代码中,可以看出,在容器启动完成或者刷新之后或者bean在初始化完成后,都会调用finishRegistration()方法。上面两个方法其实完成的事情是一样的,唯一的区别在于是否使用spring进行启动和对bean的管理。

finishRegistration()

private void finishRegistration() {
        // 如果显式设置了scheduler, 这直接设置
        if (this.scheduler != null) {
            this.registrar.setScheduler(this.scheduler);
        }

        // 查找SchedulingConfigurer配置类,并调用
        if (this.beanFactory instanceof ListableBeanFactory) {
            Map<String, SchedulingConfigurer> beans =
                    ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
            List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
            AnnotationAwareOrderComparator.sort(configurers);
            for (SchedulingConfigurer configurer : configurers) {
                configurer.configureTasks(this.registrar);
            }
        }

        // 如果程序中配置的有任务,并且scheduler为空,则查找TaskScheduler的实例
        if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
            try {
                // Search for TaskScheduler bean...
                this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
            }
            catch (NoUniqueBeanDefinitionException ex) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " +
                            ex.getMessage());
                }
                try {
                    // 如果TaskScheudler实例不止一个,则按照名称查找
                    this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    if (logger.isInfoEnabled()) {
                        logger.info("More than one TaskScheduler bean exists within the context, and " +
                                "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                                "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                ex.getBeanNamesFound());
                    }
                }
            }
            catch (NoSuchBeanDefinitionException ex) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " +
                            ex.getMessage());
                }
                // Search for ScheduledExecutorService bean next...
                try {
                    // 如果TaskScheudler实例没有配置,则查找ScheduledExecutorService的bean
                    this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
                }
                catch (NoUniqueBeanDefinitionException ex2) {
                    if (logger.isTraceEnabled()) {
                        logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " +
                                ex2.getMessage());
                    }
                    try {
                        // 如果ScheduledExecutorService的bean有多个,则按照名称查找
                        this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
                    }
                    catch (NoSuchBeanDefinitionException ex3) {
                        if (logger.isInfoEnabled()) {
                            logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
                                    "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                                    "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                    "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                    ex2.getBeanNamesFound());
                        }
                    }
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    if (logger.isTraceEnabled()) {
                        logger.trace("Could not find default ScheduledExecutorService bean - falling back to default: " +
                                ex2.getMessage());
                    }
                    // Giving up -> falling back to default scheduler within the registrar...
                    logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
                }
            }
        }

        // 初始化
        this.registrar.afterPropertiesSet();
    }

这上面的逻辑中,实际上就是从spring容器中查找ScheduledExecutorService和TaskScheduler两个实现类,如果在spring中能够找到则设置执行线程池即可。

接下来最关键的一步,就是当我们没有配置时,spring会为我们自动的创建一个线程池,主要是通过registrar.afterPropertiesSet()方法来实现的,具体代码如下:

protected void scheduleTasks() {
        if (this.taskScheduler == null) {
            this.localExecutor = Executors.newSingleThreadScheduledExecutor();
            this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
        }
        if (this.triggerTasks != null) {
            for (TriggerTask task : this.triggerTasks) {
                addScheduledTask(scheduleTriggerTask(task));
            }
        }
        if (this.cronTasks != null) {
            for (CronTask task : this.cronTasks) {
                addScheduledTask(scheduleCronTask(task));
            }
        }
        if (this.fixedRateTasks != null) {
            for (IntervalTask task : this.fixedRateTasks) {
                addScheduledTask(scheduleFixedRateTask(task));
            }
        }
        if (this.fixedDelayTasks != null) {
            for (IntervalTask task : this.fixedDelayTasks) {
                addScheduledTask(scheduleFixedDelayTask(task));
            }
        }
    }

嗯,从上面可以看出,当我们没有设置自定义的线程池时,是通过Executors.newSingleThreadScheduledExecutor()方法来创建线程池,这个线程池只有一个线程。因此当某一个任务执行时间太长的时候,就会导致其他任务被推迟执行。

因此,在默认情况下Executors.newSingleThreadScheduledExecutor()创建的是一个单线程的线程池,任务执行时间会影响整个任务的调度的。但是,某个任务出现异常,并不会导致后续任务无法执行。因此为了减少这种影响,我们只需要通过创建ScheduledExecutorService或者TaskScheduler来自定义线程池执行即可。