java / spring · 10 5 月, 2024 1

spring 使用ForkJoinPool异步事件处理

最近在开发功能的时候,因为涉及到在做一个操作的时候,需要将之前已经有的数据做状态变更,但是由于单批次操作比较复杂,需要操作很多次数据库,因此在接口响应时间上超过了10s的时间,因此,将这部分耗时操作通过异步的方式来处理,这样的话,即可以保证数据的正确性,同时也可以在接口响应上缩短时间。

也许这里就有人会问了,为什么不适用MQ这些中间件去处理,哈哈哈,这是因为这个项目没有这些组件。所以只有考虑其他的方法了。

ApplicationEvent

首先就是在使用spring事件通知的时候,需要我们自定一个ApplicationEvent对象,该对象实际上是用来承接处理业务逻辑所需要的必要参数。因此,我自定义了一个事件对象:

import com.ai.bss.res.console.mclm.model.TfCInventoryCount;
import lombok.ToString;
import org.springframework.context.ApplicationEvent;

import java.util.List;

/**
 * @author xianglj
 * @date 2024/5/9 9:58
 */
@ToString
public class CustomEvent extends ApplicationEvent {

    private String operateLotId;
    private List<TfCInventoryCount> counts;
    private Long jobId;

    public InventoryCountChangeEvent(Object source) {
        super(source);
    }

    public InventoryCountChangeEvent(String operateLotId, List<TfCInventoryCount> counts, Long jobId) {
        super(operateLotId);
        this.operateLotId = operateLotId;
        this.counts = counts;
        this.jobId = jobId;
    }

    public String getOperateLotId() {
        return operateLotId;
    }

    public List<TfCInventoryCount> getCounts() {
        return counts;
    }

    public Long getJobId() {
        return jobId;
    }
}

ApplicationListener

有了事件对象之后,就需要一个类来处理对应的事件,因此,在spring中,需要我们创建一个类,用于实现ApplicationListener的类,并实现onApplicationEvent()方法来处理具体的逻辑,则代码如下:

/**
 * @author xianglj
 * @date 2024/5/9 9:55
 */
@Slf4j
@Service
public class InventoryCountJobServiceImpl 
        implements ApplicationListener<InventoryCountChangeEvent> {

    @Override
    public void onApplicationEvent(InventoryCountChangeEvent event) {
        log.info("onApplicationEvent: 收到消息, {}", event);
        // 此处省略业务逻辑
        log.info("onApplicationEvent: 事件处理完毕: {}, 用时: {}", event, stopWatch.stop().elapse(TimeUnit.MILLISECONDS));
    }
}

到这里,一个类就算完成完成了,但是,在spring调用时间通知类的时候,是同步的调用。因此,为了能够异步的调用事件通知逻辑,我们需要在启动类上加上@EnableAsync注解.

@EnableAsync

该注解用于启动spring中的异步调用的逻辑,该注解需要相当于一个开关,则具体的代码如下:

@EnableAsync
@EnableDiscoveryClient
@EnableTransactionManagement
@SpringBootApplication()
@Slf4j
public class CustomApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext ac = SpringApplication.run(CustomApplication.class, args);
        log.info("当前resBase版本:{}", "202010101403");
    }
}

@Async

当启用了异步调用的注解之后,我们就可以使用@Async注解来实现哪些方法可以异步调用,因此,为了能够在处理消息事件的时候,能够异步处理,我们则可以在对应的方法上加上改注解:

@Async()
@Override
public void onApplicationEvent(InventoryCountChangeEvent event) {
    log.info("onApplicationEvent: 收到消息, {}", event);
    // 省略业务逻辑..
    log.info("onApplicationEvent: 事件处理完毕: {}, 用时: {}", event, stopWatch.stop().elapse(TimeUnit.MILLISECONDS));
}

实现到这里之后,基本上就可以实现异步事件消费了。

自定义线程池

但是,当我们需要处理的事件越来越多的时候,就会发现,事件的处理耗时越来越长,这是因为,我们在@Async的时候,如果没有指定线程池,这个时候就会让异步处理的事件统一使用同一个线程池,这个时候,如果线程池中某个人物处理时间很长,那么就会对其他的事件处理产生影响。因此,为了避免不同业务之间相互影响,我们可以使用线程池做业务隔离,这样就不会有对应的问题。

ForkJoinPoolFactoryBean

该类主要是由spring提供的一个工厂bean, 主要作用是帮助我们创建ForkJoinPool线程池,以下创建代码:

    @Bean
    public ForkJoinPoolFactoryBean changeCollectedForkJoinPool() {
        ForkJoinPoolFactoryBean factoryBean = new ForkJoinPoolFactoryBean();
        factoryBean.setCommonPool(false);
        factoryBean.setAsyncMode(false);
        factoryBean.setParallelism(30);
        factoryBean.setAwaitTerminationSeconds(10);

        ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = new CustomForkJoinPoolThreadFactory("changeCollectedForkJoinPool");
        factoryBean.setThreadFactory(threadFactory);
        return factoryBean;
    }

    @Data
    public static class CustomForkJoinPoolThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {

        private String prefixName;
        private AtomicLong counter = new AtomicLong();

        public CustomForkJoinPoolThreadFactory(String prefixName) {
            if (StringUtils.isBlank(prefixName)) {
                prefixName = "customForkJoinPoolThread";
            }
            this.prefixName = prefixName;
        }

        public CustomForkJoinPoolThreadFactory() {
            this(null);
        }

        @Override
        public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
            String threadName = this.prefixName + "-" + counter.incrementAndGet();
            return new CustomForkJoinWorkThread(pool, ClassLoader.getSystemClassLoader(), threadName);
        }

        public class CustomForkJoinWorkThread extends ForkJoinWorkerThread {

            /**
             * Creates a ForkJoinWorkerThread operating in the given pool.
             *
             * @param pool the pool this thread works in
             * @throws NullPointerException if pool is null
             */
            public CustomForkJoinWorkThread(ForkJoinPool pool, String threadName) {
                super(pool);
                if (StringUtils.isNotBlank(threadName)) {
                    setName(threadName);
                }
            }

            public CustomForkJoinWorkThread(ForkJoinPool pool, ClassLoader ccl, String threadName) {
                this(pool, threadName);
                setContextClassLoader(ccl);
            }
        }
    }

上面代码主要为了创建ForkJoinPoolFactoryBean对象,其中我对线程池的名称做了一些定制化的改动,进攻参考。

@Async(“changeCollectedForkJoinPool“)

当我们有了对应的bean之后,在上面方法中,只需要加上对应的bean名称即可,对应代码如下:

@Async("changeCollectedForkJoinPool")
    @Override
    public void onApplicationEvent(InventoryCountChangeEvent event) {
        log.info("onApplicationEvent: 收到消息, {}", event);
        // 省略业务逻辑
        log.info("onApplicationEvent: 事件处理完毕: {}, 用时: {}", event, stopWatch.stop().elapse(TimeUnit.MILLISECONDS));
    }

执行程序,将会看到以下的输出:

到此,异步处理事件完成了。

但是在实际开发过程中,因为spring的事件处理都是在内存的,如果服务器发生了宕机或者重启会导致数据的丢失和任务事件的丢失,因此是个时候我们可以考虑将任务信息持久化到数据库或者三方的缓存组件中,然后通过定时任务定时的获取并在此处理事件的方式,这样能够保证业务逻辑的正确性。

以上为spring 异步处理逻辑的实现方式,如果有疑问,欢迎留言交流。