调度任务漏执行补偿:服务宕机重启后,自动补跑错过的定时任务!
在分布式系统中,定时任务是保障业务稳定运行的重要组成部分。然而,服务宕机、网络抖动、部署升级等情况都可能导致定时任务漏执行,从而引发数据不一致、业务异常等问题。
- 服务宕机重启后,发现日终任务没执行
- 凌晨的批处理任务,因为网络抖动漏跑了
- 升级部署时,正好错过了重要任务
- 手工补数据麻烦,容易出错
今天,我们来探讨如何构建一个调度任务漏执行补偿机制,实现服务宕机重启后自动补跑错过的定时任务。
问题背景
定时任务漏执行的常见场景
┌─────────────────────────────────────────────────────────────┐
│ 定时任务漏执行的常见场景: │
│ │
│ 1. 服务宕机:服务突然崩溃,任务正在执行一半就中断 │
│ 2. 网络抖动:网络短暂断开,调度中心无法正常通信 │
│ 3. 部署升级:滚动发布时,部分实例任务被中断 │
│ 4. JVM Full GC:GC 停顿过长,错过任务触发时间 │
│ 5. 时钟回拨:服务器时钟回拨,导致任务触发时间判断错误 │
│ │
│ 后果: │
│ - 数据统计不完整 │
│ - 对账差异 │
│ - 业务状态不一致 │
│ - 人工补数据工作量大 │
└─────────────────────────────────────────────────────────────┘
传统解决方案的局限性
1. 手工补数
// 运维人员手动执行补偿脚本
public class ManualCompensation {
public void compensateDailySettlement() {
// 手动查询漏跑的任务
List<错过任务> missedTasks = queryMissedTasks();
// 手工执行
for (任务 task : missedTasks) {
executeTask(task);
}
}
}
问题:
- 人工操作容易出错
- 发现滞后,可能已经造成业务损失
- 工作量大,尤其是深夜故障时
2. 基于数据库状态的补偿
public class DbStateCompensation {
public void compensate() {
// 查询未完成的任务
List<任务> unfinished = queryUnfinishedTasks();
// 判断是否超时
for (任务 task : unfinished) {
if (isTimeout(task)) {
// 回滚重做
rollbackAndRetry(task);
}
}
}
}
问题:
- 无法处理服务宕机导致的"不知道任务执行到哪里"
- 需要额外的状态字段记录
整体架构设计
核心思想
┌─────────────────────────────────────────────────────────────┐
│ 调度任务漏执行补偿机制: │
│ │
│ 1. 任务触发时:记录任务执行时间点(fire_time) │
│ 2. 任务执行前:检查是否有漏跑的任务 │
│ 3. 任务执行中:记录执行进度(progress) │
│ 4. 服务重启时:扫描漏跑任务并补偿执行 │
│ │
│ 关键设计: │
│ - 任务执行记录表(schedule_task_record) │
│ - 补偿扫描器(MissedTaskScanner) │
│ - 乐观锁防重复执行 │
│ - 幂等性保证 │
└─────────────────────────────────────────────────────────────┘
架构流程图
服务启动
↓
加载补偿扫描器
↓
扫描任务执行记录表
↓
查找漏跑任务(状态=PENDING 且 fire_time < 当前时间 - 容忍窗口)
↓
执行补偿任务
↓
更新任务状态(COMPLETED/FAILED)
核心代码实现
1. 任务执行记录实体
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "schedule_task_record")
public class ScheduleTaskRecord {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String taskName;
private String taskGroup;
private LocalDateTime fireTime;
private LocalDateTime startTime;
private LocalDateTime endTime;
private String status;
private Integer progress;
private String result;
private Integer retryCount;
private String triggerType;
private String executorIp;
private LocalDateTime createTime;
}
public enum TaskStatus {
PENDING("待执行"),
RUNNING("执行中"),
COMPLETED("已完成"),
FAILED("执行失败"),
COMPENSATED("已补偿");
}
2. 任务执行记录服务
@Service
@Slf4j
public class ScheduleTaskRecordService {
@Autowired
private ScheduleTaskRecordRepository recordRepository;
public void recordTaskFired(ScheduleTaskDO task) {
ScheduleTaskRecord record = ScheduleTaskRecord.builder()
.taskName(task.getTaskName())
.taskGroup(task.getTaskGroup())
.fireTime(LocalDateTime.now())
.status(TaskStatus.PENDING.name())
.triggerType("SCHEDULE")
.createTime(LocalDateTime.now())
.build();
recordRepository.save(record);
log.info("记录任务触发: taskName={}, fireTime={}",
task.getTaskName(), record.getFireTime());
}
public void recordTaskStart(Long recordId) {
recordRepository.findById(recordId).ifPresent(record -> {
record.setStatus(TaskStatus.RUNNING.name());
record.setStartTime(LocalDateTime.now());
recordRepository.save(record);
log.info("任务开始执行: recordId={}", recordId);
});
}
public void recordTaskComplete(Long recordId, String result) {
recordRepository.findById(recordId).ifPresent(record -> {
record.setStatus(TaskStatus.COMPLETED.name());
record.setEndTime(LocalDateTime.now());
record.setResult(result);
recordRepository.save(record);
log.info("任务执行完成: recordId={}, result={}", recordId, result);
});
}
public void recordTaskFailed(Long recordId, String error) {
recordRepository.findById(recordId).ifPresent(record -> {
record.setStatus(TaskStatus.FAILED.name());
record.setEndTime(LocalDateTime.now());
record.setResult(error);
recordRepository.save(record);
log.info("任务执行失败: recordId={}, error={}", recordId, error);
});
}
public void recordProgress(Long recordId, Integer progress) {
recordRepository.findById(recordId).ifPresent(record -> {
record.setProgress(progress);
recordRepository.save(record);
});
}
}
3. 漏跑任务扫描器
@Component
@Slf4j
public class MissedTaskScanner {
@Autowired
private ScheduleTaskRecordRepository recordRepository;
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private ScheduleTaskRecordService recordService;
@Value("${scheduler.compensation.enabled:true}")
private boolean compensationEnabled;
@Value("${scheduler.compensation.tolerance-minutes:5}")
private int toleranceMinutes;
@PostConstruct
public void init() {
if (compensationEnabled) {
log.info("启动漏跑任务扫描补偿...");
scanAndCompensate();
}
}
@Scheduled(fixedDelayString = "${scheduler.compensation.scan-interval-ms:60000}")
public void scanAndCompensate() {
try {
List<ScheduleTaskRecord> missedTasks = findMissedTasks();
for (ScheduleTaskRecord task : missedTasks) {
try {
compensateTask(task);
} catch (Exception e) {
log.error("补偿任务失败: taskId={}", task.getId(), e);
}
}
if (!missedTasks.isEmpty()) {
log.info("本次扫描到 {} 个漏跑任务,已全部处理", missedTasks.size());
}
} catch (Exception e) {
log.error("漏跑任务扫描异常", e);
}
}
private List<ScheduleTaskRecord> findMissedTasks() {
LocalDateTime threshold = LocalDateTime.now()
.minusMinutes(toleranceMinutes);
return recordRepository.findMissedTasks(
TaskStatus.PENDING.name(),
TaskStatus.RUNNING.name(),
threshold
);
}
@Transactional
public void compensateTask(ScheduleTaskRecord task) {
log.info("开始补偿任务: id={}, taskName={}, fireTime={}",
task.getId(), task.getTaskName(), task.getFireTime());
task.setStatus(TaskStatus.RUNNING.name());
task.setStartTime(LocalDateTime.now());
recordRepository.save(task);
try {
TaskContext context = TaskContext.builder()
.recordId(task.getId())
.taskName(task.getTaskName())
.taskGroup(task.getTaskGroup())
.fireTime(task.getFireTime())
.isCompensation(true)
.build();
taskExecutor.execute(context);
task.setStatus(TaskStatus.COMPENSATED.name());
task.setEndTime(LocalDateTime.now());
task.setResult("补偿执行成功");
recordRepository.save(task);
log.info("任务补偿成功: id={}", task.getId());
} catch (Exception e) {
log.error("任务补偿执行异常: id={}", task.getId(), e);
task.setStatus(TaskStatus.FAILED.name());
task.setEndTime(LocalDateTime.now());
task.setResult("补偿执行失败: " + e.getMessage());
recordRepository.save(task);
throw e;
}
}
}
4. 任务执行器
@Component
@Slf4j
public class TaskExecutor {
@Autowired
private ScheduleTaskRecordService recordService;
@Autowired
private Map<String, TaskHandler> taskHandlerMap;
public void execute(TaskContext context) {
String taskKey = context.getTaskGroup() + ":" + context.getTaskName();
TaskHandler handler = taskHandlerMap.get(taskKey);
if (handler == null) {
throw new IllegalArgumentException("未找到任务处理器: " + taskKey);
}
recordService.recordTaskStart(context.getRecordId());
try {
Object result = handler.execute(context);
recordService.recordTaskComplete(context.getRecordId(),
JSON.toJSONString(result));
} catch (Exception e) {
recordService.recordTaskFailed(context.getRecordId(),
e.getMessage());
throw e;
}
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TaskContext {
private Long recordId;
private String taskName;
private String taskGroup;
private LocalDateTime fireTime;
private boolean isCompensation;
private Map<String, Object> params;
}
5. 任务执行记录 Repository
@Repository
public interface ScheduleTaskRecordRepository extends JpaRepository<ScheduleTaskRecord, Long> {
@Query("""
SELECT r FROM ScheduleTaskRecord r
WHERE r.status IN :statuses
AND r.fireTime < :threshold
AND r.fireTime > :retentionDate
ORDER BY r.fireTime ASC
""")
List<ScheduleTaskRecord> findMissedTasks(
@Param("statuses") List<String> statuses,
@Param("threshold") LocalDateTime threshold,
@Param("retentionDate") LocalDateTime retentionDate);
@Query("""
SELECT r FROM ScheduleTaskRecord r
WHERE r.taskName = :taskName
AND r.taskGroup = :taskGroup
AND r.fireTime = :fireTime
""")
List<ScheduleTaskRecord> findByTaskAndFireTime(
@Param("taskName") String taskName,
@Param("taskGroup") String taskGroup,
@Param("fireTime") LocalDateTime fireTime);
List<ScheduleTaskRecord> findByStatusAndFireTimeBetween(
String status, LocalDateTime start, LocalDateTime end);
}
6. 定时任务调度器
@Component
@Slf4j
public class SchedulerManager {
@Autowired
private ScheduleTaskRecordService recordService;
@Autowired
private TaskExecutor taskExecutor;
private final Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
public void scheduleTask(ScheduleTaskDO task) {
ScheduledFuture<?> future = task.getScheduler().scheduleAtFixedRate(
() -> executeTask(task),
task.getInitialDelay(),
task.getPeriod(),
task.getUnit()
);
scheduledTasks.put(task.getTaskKey(), future);
log.info("定时任务已注册: taskKey={}, period={}",
task.getTaskKey(), task.getPeriod());
}
private void executeTask(ScheduleTaskDO task) {
log.info("触发定时任务: taskName={}", task.getTaskName());
recordService.recordTaskFired(task);
TaskContext context = TaskContext.builder()
.taskName(task.getTaskName())
.taskGroup(task.getTaskGroup())
.fireTime(LocalDateTime.now())
.isCompensation(false)
.build();
try {
taskExecutor.execute(context);
} catch (Exception e) {
log.error("定时任务执行异常: taskName={}", task.getTaskName(), e);
}
}
public void cancelTask(String taskKey) {
ScheduledFuture<?> future = scheduledTasks.get(taskKey);
if (future != null) {
future.cancel(false);
scheduledTasks.remove(taskKey);
log.info("定时任务已取消: taskKey={}", taskKey);
}
}
}
补偿策略详解
1. 时间窗口容忍
@Configuration
public class CompensationConfig {
@Value("${scheduler.compensation.tolerance-minutes:5}")
private int toleranceMinutes;
@Value("${scheduler.compensation.max-retention-days:7}")
private int maxRetentionDays;
@Bean
public CompensationPolicy tolerancePolicy() {
return new ToleranceWindowPolicy(toleranceMinutes);
}
}
public interface CompensationPolicy {
boolean shouldCompensate(ScheduleTaskRecord task);
int getPriority();
}
public class ToleranceWindowPolicy implements CompensationPolicy {
private final int toleranceMinutes;
public ToleranceWindowPolicy(int toleranceMinutes) {
this.toleranceMinutes = toleranceMinutes;
}
@Override
public boolean shouldCompensate(ScheduleTaskRecord task) {
LocalDateTime deadline = task.getFireTime()
.plusMinutes(toleranceMinutes);
return LocalDateTime.now().isAfter(deadline);
}
}
2. 幂等性保证
@Component
public class IdempotencyGuard {
@Autowired
private ScheduleTaskRecordRepository recordRepository;
@Autowired
private RedissonClient redissonClient;
public boolean tryAcquire(ScheduleTaskRecord task) {
String lockKey = "task:lock:" + task.getTaskGroup() + ":"
+ task.getTaskName() + ":" + task.getFireTime();
RLock lock = redissonClient.getLock(lockKey);
return lock.tryLock(0, 30, TimeUnit.MINUTES);
}
public void release(ScheduleTaskRecord task) {
String lockKey = "task:lock:" + task.getTaskGroup() + ":"
+ task.getTaskName() + ":" + task.getFireTime();
RLock lock = redissonClient.getLock(lockKey);
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
3. 进度记录与断点续传
@Component
public class ProgressRecorder {
@Autowired
private ScheduleTaskRecordService recordService;
public void recordProgress(TaskContext context, int progress) {
recordService.recordProgress(context.getRecordId(), progress);
}
public ProgressState getProgressState(TaskContext context) {
ScheduleTaskRecord record = recordService.getRecord(context.getRecordId());
if (record == null || record.getProgress() == null) {
return new ProgressState(0, null);
}
return new ProgressState(record.getProgress(), record.getResult());
}
}
@Data
@AllArgsConstructor
public class ProgressState {
private int progress;
private String checkpoint;
}
监控与告警
1. 补偿任务监控
@Component
@Slf4j
public class CompensationMonitor {
@Autowired
private ScheduleTaskRecordRepository recordRepository;
@Scheduled(fixedRate = 300000)
public void checkCompensationStatus() {
LocalDateTime oneHourAgo = LocalDateTime.now().minusHours(1);
List<ScheduleTaskRecord> compensatedTasks = recordRepository
.findByStatusAndFireTimeBetween(
TaskStatus.COMPENSATED.name(),
oneHourAgo,
LocalDateTime.now()
);
if (!compensatedTasks.isEmpty()) {
log.warn("过去1小时有 {} 个任务被补偿执行", compensatedTasks.size());
for (ScheduleTaskRecord task : compensatedTasks) {
log.warn("补偿任务详情: taskName={}, fireTime={}, actualExecuteTime={}",
task.getTaskName(), task.getFireTime(), task.getStartTime());
}
}
}
@Scheduled(fixedRate = 300000)
public void checkLongRunningTasks() {
LocalDateTime threshold = LocalDateTime.now().minusMinutes(30);
List<ScheduleTaskRecord> longRunningTasks = recordRepository
.findByStatusAndFireTimeBetween(
TaskStatus.RUNNING.name(),
LocalDateTime.MIN,
threshold
);
if (!longRunningTasks.isEmpty()) {
log.error("发现 {} 个长时间运行的任务,可能已挂死", longRunningTasks.size());
for (ScheduleTaskRecord task : longRunningTasks) {
alertService.sendAlert("LONG_RUNNING_TASK", task);
}
}
}
}
2. 告警服务
@Component
@Slf4j
public class AlertService {
public void sendAlert(String alertType, ScheduleTaskRecord task) {
String message = buildAlertMessage(alertType, task);
log.error("发送告警: type={}, message={}", alertType, message);
// 实际发送告警(邮件、钉钉、企业微信等)
// sendEmail(message);
// sendDingTalk(message);
}
private String buildAlertMessage(String alertType, ScheduleTaskRecord task) {
return String.format(
"【%s告警】\n" +
"任务名称: %s\n" +
"任务组: %s\n" +
"触发时间: %s\n" +
"当前状态: %s\n" +
"执行进度: %d%%",
alertType,
task.getTaskName(),
task.getTaskGroup(),
task.getFireTime(),
task.getStatus(),
task.getProgress() != null ? task.getProgress() : 0
);
}
}
配置说明
server:
port: 8080
spring:
datasource:
url: jdbc:mysql://localhost:3306/scheduler_demo?useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root
scheduler:
compensation:
enabled: true
tolerance-minutes: 5
scan-interval-ms: 60000
max-retention-days: 7
logging:
level:
com.example.scheduler: DEBUG
| 配置项 | 说明 | 默认值 |
|---|---|---|
| scheduler.compensation.enabled | 是否启用补偿机制 | true |
| scheduler.compensation.tolerance-minutes | 容忍时间(分钟) | 5 |
| scheduler.compensation.scan-interval-ms | 扫描间隔(毫秒) | 60000 |
| scheduler.compensation.max-retention-days | 记录保留天数 | 7 |
性能对比
补偿效果对比
| 场景 | 无补偿 | 有补偿 | 改进 |
|---|---|---|---|
| 服务宕机重启 | 任务漏执行,数据不一致 | 自动补跑,数据完整 | ✓ |
| 凌晨批处理漏跑 | 手工补数,耗时 2h | 自动补偿,2min | ✓ |
| 网络抖动 | 任务丢失,需人工介入 | 自动重试,无需干预 | ✓ |
| JVM Full GC | 任务错过,业务异常 | 延迟补偿,自动恢复 | ✓ |
补偿时间分布
容忍时间: 5分钟
任务漏跑 → 服务重启 → 扫描漏跑任务 → 补偿执行
↓ ↓ ↓ ↓
12:00 12:03 12:04 12:05
最大延迟: tolerance-minutes + scan-interval + execute-time
常见问题
Q: 如何避免补偿任务与正常任务重复执行?
A: 通过以下手段保证幂等性:
- 分布式锁:
taskName + taskGroup + fireTime作为锁 key - 状态机:任务状态只能 PENDING → RUNNING → COMPLETED/FAILED
- 唯一索引:在数据库建立
(taskName, taskGroup, fireTime)唯一索引
Q: 补偿任务执行失败怎么办?
A: 采用指数退避重试策略:
- 首次失败:立即重试
- 第二次失败:等待 1 分钟
- 第三次失败:等待 5 分钟
- 超过最大重试次数:标记为 FAILED,发送告警
Q: 如何控制补偿任务的并发度?
A: 使用信号量限制:
@Value("${scheduler.compensation.max-concurrency:3}")
private int maxConcurrency;
private Semaphore semaphore = new Semaphore(maxConcurrency);
public void compensateTask(ScheduleTaskRecord task) {
semaphore.acquire();
try {
// 执行补偿
} finally {
semaphore.release();
}
}
Q: 补偿任务会影响正常任务的执行吗?
A: 不会。补偿任务和正常任务使用不同的线程池:
- 正常任务:
scheduled-tasks-pool - 补偿任务:
compensation-pool
总结
通过本文的优化方案,我们可以实现:
- 自动检测漏跑任务:服务重启后自动扫描并补偿
- 零人工干预:无需运维人员半夜起床补数据
- 数据完整性保证:所有应该执行的任务都会被执行
- 可追溯可监控:完整的执行记录和告警机制
关键设计:
- 任务执行记录表:记录任务触发时间、状态、进度
- 漏跑任务扫描器:定时扫描并补偿漏跑任务
- 时间窗口容忍:避免网络抖动等瞬时故障误补偿
- 分布式锁:保证补偿任务的幂等性
- 进度记录:支持断点续传
在实际生产环境中,建议根据业务特点调整容忍时间和扫描间隔,确保补偿机制的及时性和准确性。
源码获取
文章已同步至小程序博客栏目,需要源码的请关注小程序博客。
公众号:服务端技术精选
小程序码:
标题:调度任务漏执行补偿:服务宕机重启后,自动补跑错过的定时任务!
作者:jiangyi
地址:http://jiangyi.space/articles/2026/05/13/1778385477325.html
公众号:服务端技术精选
- 问题背景
- 定时任务漏执行的常见场景
- 传统解决方案的局限性
- 1. 手工补数
- 2. 基于数据库状态的补偿
- 整体架构设计
- 核心思想
- 架构流程图
- 核心代码实现
- 1. 任务执行记录实体
- 2. 任务执行记录服务
- 3. 漏跑任务扫描器
- 4. 任务执行器
- 5. 任务执行记录 Repository
- 6. 定时任务调度器
- 补偿策略详解
- 1. 时间窗口容忍
- 2. 幂等性保证
- 3. 进度记录与断点续传
- 监控与告警
- 1. 补偿任务监控
- 2. 告警服务
- 配置说明
- 性能对比
- 补偿效果对比
- 补偿时间分布
- 常见问题
- Q: 如何避免补偿任务与正常任务重复执行?
- Q: 补偿任务执行失败怎么办?
- Q: 如何控制补偿任务的并发度?
- Q: 补偿任务会影响正常任务的执行吗?
- 总结
- 源码获取
评论
0 评论