Spring SchedulingConfigurer 实现动态定时任务 | 字数总计: 2.8k | 阅读时长: 12分钟 | 阅读量:
本文提供相关源码,请放心食用,详见网页侧边栏或底部,有疑问请评论或 Issue
一、前言
大家在日常工作中,一定使用过 Spring 的 @Scheduled
注解吧,通过该注解可以非常方便的帮助我们实现任务的定时执行。
但是该注解是不支持运行时动态修改执行间隔的,不知道你在业务中有没有这些需求和痛点:
在服务运行时能够动态修改定时任务的执行频率和执行开关,而无需重启服务和修改代码
能够基于配置,在不同环境/机器上,实现定时任务执行频率的差异化
这些都可以通过 Spring 的 SchedulingConfigurer
注解来实现。
这个注解其实大家并不陌生,如果有使用过 @Scheduled 的话,因为 @Scheduled 默认是单线程执行的,因此如果存在多个任务同时触发,可能触发阻塞。使用 SchedulingConfigurer 可以配置用于执行 @Scheduled 的线程池,来避免这个问题。
1 2 3 4 5 6 7 8 @Configuration public class ScheduleConfig implements SchedulingConfigurer { @Override public void configureTasks (ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10 )); } }
但其实这个接口,还可以实现动态定时任务的功能,下面来演示如何实现。
二、功能实现
后续定义的类开头的 DS
是 Dynamic Schedule
的缩写。
使用到的依赖,除了 Spring 外,还包括:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <dependency > <groupId > org.apache.commons</groupId > <artifactId > commons-lang3</artifactId > </dependency > <dependency > <groupId > org.apache.commons</groupId > <artifactId > commons-collections4</artifactId > <version > 4.4</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <scope > provided</scope > <version > 1.18.18</version > </dependency >
2.1 @EnableScheduling
首先需要开启 @EnableScheduling
注解,直接在启动类添加即可:
1 2 3 4 5 6 7 @EnableScheduling @SpringBootApplication public class DSApplication { public static void main (String[] args) { SpringApplication.run(DSApplication.class, args); } }
2.2 IDSTaskInfo
定义一个任务信息的接口,后续所有用于动态调整的任务信息对象,都需要实现该接口。
id
:该任务信息的唯一 ID,用于唯一标识一个任务
cron
:该任务执行的 cron 表达式。
isValid
:任务开关
isChange
:用于标识任务参数是否发生了改变
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public interface IDSTaskInfo { long getId () ; String getCron () ; boolean isValid () ; boolean isChange (IDSTaskInfo oldTaskInfo) ; }
2.3 DSContainer
顾名思义,是存放 IDSTaskInfo 的容器。
具有以下成员变量:
scheduleMap
:用于暂存 IDSTaskInfo 和实际任务 ScheduledTask 的映射关系。其中:
task_id:作为主键,确保一个 IDSTaskInfo 只会被注册进一次
T:暂存当初注册时的 IDSTaskInfo,用于跟最新的 IDSTaskInfo 比较参数是否发生变化
ScheduledTask:暂存当初注册时生成的任务,如果需要取消任务的话,需要拿到该对象
Semaphore:确保每个任务实际执行时只有一个线程执行,不会产生并发问题
taskRegistrar
:Spring 的任务注册管理器,用于注册任务到 Spring 容器中
name
:调用方提供的类名
具有以下成员方法:
void checkTask(final T taskInfo, final TriggerTask triggerTask)
:检查 IDSTaskInfo,判断是否需要注册/取消任务。具体的逻辑包括:
Semaphore getSemaphore()
:获取信号量属性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.tuple.Pair;import org.springframework.scheduling.config.ScheduledTask;import org.springframework.scheduling.config.ScheduledTaskRegistrar;import org.springframework.scheduling.config.TriggerTask;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.Semaphore;@Slf4j public class DSContainer <T extends IDSTaskInfo > { private final Map<Long, Pair<T, Pair<ScheduledTask, Semaphore>>> scheduleMap = new ConcurrentHashMap<>(); private final ScheduledTaskRegistrar taskRegistrar; private final String name; public DSContainer (ScheduledTaskRegistrar scheduledTaskRegistrar, final String name) { this .taskRegistrar = scheduledTaskRegistrar; this .name = name; } public void checkTask (final T taskInfo, final TriggerTask triggerTask) { final long taskId = taskInfo.getId(); if (scheduleMap.containsKey(taskId)) { if (taskInfo.isValid()) { final T oldTaskInfo = scheduleMap.get(taskId).getLeft(); if (oldTaskInfo.isChange(taskInfo)) { log.info("DSContainer will register {} again because task config change, taskId: {}" , name, taskId); cancelTask(taskId); registerTask(taskInfo, triggerTask); } } else { log.info("DSContainer will cancelTask {} because task not valid, taskId: {}" , name, taskId); cancelTask(taskId); } } else { if (taskInfo.isValid()) { log.info("DSContainer will register {} task, taskId: {}" , name, taskId); registerTask(taskInfo, triggerTask); } } } public Semaphore getSemaphore (final long taskId) { return this .scheduleMap.get(taskId).getRight().getRight(); } private void registerTask (final T taskInfo, final TriggerTask triggerTask) { final ScheduledTask latestTask = taskRegistrar.scheduleTriggerTask(triggerTask); this .scheduleMap.put(taskInfo.getId(), Pair.of(taskInfo, Pair.of(latestTask, new Semaphore(1 )))); } private void cancelTask (final long taskId) { final Pair<T, Pair<ScheduledTask, Semaphore>> pair = this .scheduleMap.remove(taskId); if (pair != null ) { pair.getRight().getLeft().cancel(); } } }
2.4 AbstractDSHandler
下面定义实际的动态线程池处理方法,这里采用抽象类实现,将共用逻辑封装起来,方便扩展。
具有以下抽象方法:
List<T> listTaskInfo()
:获取所有的任务信息。
void doProcess(T taskInfo)
:实现实际执行任务的业务逻辑。
具有以下公共方法:
void configureTasks(ScheduledTaskRegistrar taskRegistrar)
:创建 DSContainer 对象,并创建一个单线程的任务定时执行,调用 scheduleTask() 方法处理实际逻辑。
void scheduleTask()
:首先加载所有任务信息,然后基于 cron 表达式生成 TriggerTask 对象,调用 checkTask() 方法确认是否需要注册/取消任务。当达到执行时间时,调用 execute() 方法,执行任务逻辑。
void execute(final T taskInfo)
:获取信号量,成功后执行任务逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 import lombok.extern.slf4j.Slf4j;import org.apache.commons.collections4.CollectionUtils;import org.springframework.scheduling.annotation.SchedulingConfigurer;import org.springframework.scheduling.config.ScheduledTaskRegistrar;import org.springframework.scheduling.config.TriggerTask;import org.springframework.scheduling.support.CronTrigger;import java.util.List;import java.util.Objects;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;@Slf4j public abstract class AbstractDSHandler <T extends IDSTaskInfo > implements SchedulingConfigurer { private DSContainer<T> dsContainer; private final String CLASS_NAME = getClass().getSimpleName(); protected abstract List<T> listTaskInfo () ; protected abstract void doProcess (T taskInfo) throws Throwable ; @Override public void configureTasks (ScheduledTaskRegistrar taskRegistrar) { dsContainer = new DSContainer<>(taskRegistrar, CLASS_NAME); taskRegistrar.addFixedDelayTask(this ::scheduleTask, 1000 ); } private void scheduleTask () { CollectionUtils.emptyIfNull(listTaskInfo()).forEach(taskInfo -> dsContainer.checkTask(taskInfo, new TriggerTask(() -> this .execute(taskInfo), triggerContext -> new CronTrigger(taskInfo.getCron()).nextExecutionTime(triggerContext) )) ); } private void execute (final T taskInfo) { final long taskId = taskInfo.getId(); try { Semaphore semaphore = dsContainer.getSemaphore(taskId); if (Objects.isNull(semaphore)) { log.error("{} semaphore is null, taskId: {}" , CLASS_NAME, taskId); return ; } if (semaphore.tryAcquire(3 , TimeUnit.SECONDS)) { try { doProcess(taskInfo); } catch (Throwable throwable) { log.error("{} doProcess error, taskId: {}" , CLASS_NAME, taskId, throwable); } finally { semaphore.release(); } } else { log.warn("{} too many executor, taskId: {}" , CLASS_NAME, taskId); } } catch (InterruptedException e) { log.warn("{} interruptedException error, taskId: {}" , CLASS_NAME, taskId); } catch (Exception e) { log.error("{} execute error, taskId: {}" , CLASS_NAME, taskId, e); } } }
三、快速测试
至此就完成了动态任务的框架搭建,下面让我们来快速测试下。为了尽量减少其他技术带来的复杂度,本次测试不涉及数据库和真实的定时任务,完全采用模拟实现。
3.1 模拟定时任务
为了模拟一个定时任务,我定义了一个 foo()
方法,其中只输出一句话。后续我将通过定时调用该方法,来模拟定时任务。
1 2 3 4 5 6 7 8 9 10 import lombok.extern.slf4j.Slf4j;import java.time.LocalTime;@Slf4j public class SchedulerTest { public void foo () { log.info("{} Execute com.github.jitwxs.sample.ds.test.SchedulerTest#foo" , LocalTime.now()); } }
3.2 实现 IDSTaskInfo
首先定义 IDSTaskInfo,我这里想通过反射来实现调用 foo()
方法,因此 reference
表示的是要调用方法的全路径。另外我实现了 isChange()
方法,只要 cron、isValid、reference 发生了变动,就认为该任务的配置发生了改变。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import com.github.jitwxs.sample.ds.config.IDSTaskInfo;import lombok.Builder;import lombok.Data;@Data @Builder public class SchedulerTestTaskInfo implements IDSTaskInfo { private long id; private String cron; private boolean isValid; private String reference; @Override public boolean isChange (IDSTaskInfo oldTaskInfo) { if (oldTaskInfo instanceof SchedulerTestTaskInfo) { final SchedulerTestTaskInfo obj = (SchedulerTestTaskInfo) oldTaskInfo; return !this .cron.equals(obj.cron) || this .isValid != obj.isValid || !this .reference.equals(obj.getReference()); } else { throw new IllegalArgumentException("Not Support SchedulerTestTaskInfo type" ); } } }
3.3 实现 AbstractDSHandler
有几个需要关注的:
(1)listTaskInfo()
返回值我使用了 volatile 变量,便于我修改它,模拟任务信息数据的改变。
(2)doProcess()
方法中,读取到 reference 后,使用反射进行调用,模拟定时任务的执行。
(3)额外实现了 ApplicationListener
接口,当服务启动后,每隔一段时间修改下任务信息,模拟业务中调整配置。
服务启动后,foo() 定时任务将每 10s 执行一次。
10s 后,将 foo() 定时任务执行周期从每 10s 执行调整为 1s 执行。
10s 后,关闭 foo() 定时任务执行。
10s 后,开启 foo() 定时任务执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 import com.github.jitwxs.sample.ds.config.AbstractDSHandler;import org.springframework.context.ApplicationEvent;import org.springframework.context.ApplicationListener;import org.springframework.stereotype.Component;import java.lang.reflect.Method;import java.util.Collections;import java.util.List;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.LockSupport;@Component public class SchedulerTestDSHandler extends AbstractDSHandler <SchedulerTestTaskInfo > implements ApplicationListener { public volatile List<SchedulerTestTaskInfo> taskInfoList = Collections.singletonList( SchedulerTestTaskInfo.builder() .id(1 ) .cron("0/10 * * * * ? " ) .isValid(true ) .reference("com.github.jitwxs.sample.ds.test.SchedulerTest#foo" ) .build() ); @Override protected List<SchedulerTestTaskInfo> listTaskInfo () { return taskInfoList; } @Override protected void doProcess (SchedulerTestTaskInfo taskInfo) throws Throwable { final String reference = taskInfo.getReference(); final String[] split = reference.split("#" ); if (split.length != 2 ) { return ; } try { final Class<?> clazz = Class.forName(split[0 ]); final Method method = clazz.getMethod(split[1 ]); method.invoke(clazz.newInstance()); } catch (Exception e) { e.printStackTrace(); } } @Override public void onApplicationEvent (ApplicationEvent applicationEvent) { Executors.newScheduledThreadPool(1 ).scheduleAtFixedRate(() -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10 )); taskInfoList = Collections.singletonList( SchedulerTestTaskInfo.builder() .id(1 ) .cron("0/1 * * * * ? " ) .isValid(true ) .reference("com.github.jitwxs.sample.ds.test.SchedulerTest#foo" ) .build() ); LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10 )); taskInfoList = Collections.singletonList( SchedulerTestTaskInfo.builder() .id(1 ) .cron("0/1 * * * * ? " ) .isValid(false ) .reference("com.github.jitwxs.sample.ds.test.SchedulerTest#foo" ) .build() ); LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10 )); taskInfoList = Collections.singletonList( SchedulerTestTaskInfo.builder() .id(1 ) .cron("0/1 * * * * ? " ) .isValid(true ) .reference("com.github.jitwxs.sample.ds.test.SchedulerTest#foo" ) .build() ); }, 12 , 86400 , TimeUnit.SECONDS); } }
3.4 运行程序
整个应用包结构如下:
运行程序后,在控制台可以观测到如下输出:
四、后记
以上完成了动态定时任务的介绍,你能够根据本篇文章,实现以下需求吗:
本文基于 cron 表达式实现了频率控制,你能改用 fixedDelay 或 fixedRate 实现吗?
基于数据库/配置文件/配置中心,实现对服务中定时任务的动态频率调整和任务的启停。
开发一个数据表历史数据清理功能,能够动态配置要清理的表、清理的规则、清理的周期。
开发一个数据表异常数据告警功能,能够动态配置要扫描的表、告警的规则、扫描的周期。
Spring SchedulingConfigurer 实现动态定时任务