调度任务漏执行补偿:服务宕机重启后,自动补跑错过的定时任务!

在分布式系统中,定时任务是保障业务稳定运行的重要组成部分。然而,服务宕机、网络抖动、部署升级等情况都可能导致定时任务漏执行,从而引发数据不一致、业务异常等问题。

  • 服务宕机重启后,发现日终任务没执行
  • 凌晨的批处理任务,因为网络抖动漏跑了
  • 升级部署时,正好错过了重要任务
  • 手工补数据麻烦,容易出错

今天,我们来探讨如何构建一个调度任务漏执行补偿机制,实现服务宕机重启后自动补跑错过的定时任务。

问题背景

定时任务漏执行的常见场景

┌─────────────────────────────────────────────────────────────┐
│  定时任务漏执行的常见场景:                                    │
│                                                             │
│  1. 服务宕机:服务突然崩溃,任务正在执行一半就中断              │
│  2. 网络抖动:网络短暂断开,调度中心无法正常通信               │
│  3. 部署升级:滚动发布时,部分实例任务被中断                   │
│  4. JVM Full GC:GC 停顿过长,错过任务触发时间                │
│  5. 时钟回拨:服务器时钟回拨,导致任务触发时间判断错误         │
│                                                             │
│  后果:                                                      │
│  - 数据统计不完整                                            │
│  - 对账差异                                                  │
│  - 业务状态不一致                                            │
│  - 人工补数据工作量大                                        │
└─────────────────────────────────────────────────────────────┘

传统解决方案的局限性

1. 手工补数

// 运维人员手动执行补偿脚本
public class ManualCompensation {

    public void compensateDailySettlement() {
        // 手动查询漏跑的任务
        List<错过任务> missedTasks = queryMissedTasks();

        // 手工执行
        for (任务 task : missedTasks) {
            executeTask(task);
        }
    }
}

问题

  • 人工操作容易出错
  • 发现滞后,可能已经造成业务损失
  • 工作量大,尤其是深夜故障时

2. 基于数据库状态的补偿

public class DbStateCompensation {

    public void compensate() {
        // 查询未完成的任务
        List<任务> unfinished = queryUnfinishedTasks();

        // 判断是否超时
        for (任务 task : unfinished) {
            if (isTimeout(task)) {
                // 回滚重做
                rollbackAndRetry(task);
            }
        }
    }
}

问题

  • 无法处理服务宕机导致的"不知道任务执行到哪里"
  • 需要额外的状态字段记录

整体架构设计

核心思想

┌─────────────────────────────────────────────────────────────┐
│  调度任务漏执行补偿机制:                                      │
│                                                             │
│  1. 任务触发时:记录任务执行时间点(fire_time)               │
│  2. 任务执行前:检查是否有漏跑的任务                           │
│  3. 任务执行中:记录执行进度(progress)                       │
│  4. 服务重启时:扫描漏跑任务并补偿执行                          │
│                                                             │
│  关键设计:                                                  │
│  - 任务执行记录表(schedule_task_record)                    │
│  - 补偿扫描器(MissedTaskScanner)                           │
│  - 乐观锁防重复执行                                           │
│  - 幂等性保证                                                │
└─────────────────────────────────────────────────────────────┘

架构流程图

服务启动
    ↓
加载补偿扫描器
    ↓
扫描任务执行记录表
    ↓
查找漏跑任务(状态=PENDING 且 fire_time < 当前时间 - 容忍窗口)
    ↓
执行补偿任务
    ↓
更新任务状态(COMPLETED/FAILED)

核心代码实现

1. 任务执行记录实体

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "schedule_task_record")
public class ScheduleTaskRecord {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String taskName;

    private String taskGroup;

    private LocalDateTime fireTime;

    private LocalDateTime startTime;

    private LocalDateTime endTime;

    private String status;

    private Integer progress;

    private String result;

    private Integer retryCount;

    private String triggerType;

    private String executorIp;

    private LocalDateTime createTime;
}

public enum TaskStatus {
    PENDING("待执行"),
    RUNNING("执行中"),
    COMPLETED("已完成"),
    FAILED("执行失败"),
    COMPENSATED("已补偿");
}

2. 任务执行记录服务

@Service
@Slf4j
public class ScheduleTaskRecordService {

    @Autowired
    private ScheduleTaskRecordRepository recordRepository;

    public void recordTaskFired(ScheduleTaskDO task) {
        ScheduleTaskRecord record = ScheduleTaskRecord.builder()
                .taskName(task.getTaskName())
                .taskGroup(task.getTaskGroup())
                .fireTime(LocalDateTime.now())
                .status(TaskStatus.PENDING.name())
                .triggerType("SCHEDULE")
                .createTime(LocalDateTime.now())
                .build();

        recordRepository.save(record);
        log.info("记录任务触发: taskName={}, fireTime={}",
                task.getTaskName(), record.getFireTime());
    }

    public void recordTaskStart(Long recordId) {
        recordRepository.findById(recordId).ifPresent(record -> {
            record.setStatus(TaskStatus.RUNNING.name());
            record.setStartTime(LocalDateTime.now());
            recordRepository.save(record);
            log.info("任务开始执行: recordId={}", recordId);
        });
    }

    public void recordTaskComplete(Long recordId, String result) {
        recordRepository.findById(recordId).ifPresent(record -> {
            record.setStatus(TaskStatus.COMPLETED.name());
            record.setEndTime(LocalDateTime.now());
            record.setResult(result);
            recordRepository.save(record);
            log.info("任务执行完成: recordId={}, result={}", recordId, result);
        });
    }

    public void recordTaskFailed(Long recordId, String error) {
        recordRepository.findById(recordId).ifPresent(record -> {
            record.setStatus(TaskStatus.FAILED.name());
            record.setEndTime(LocalDateTime.now());
            record.setResult(error);
            recordRepository.save(record);
            log.info("任务执行失败: recordId={}, error={}", recordId, error);
        });
    }

    public void recordProgress(Long recordId, Integer progress) {
        recordRepository.findById(recordId).ifPresent(record -> {
            record.setProgress(progress);
            recordRepository.save(record);
        });
    }
}

3. 漏跑任务扫描器

@Component
@Slf4j
public class MissedTaskScanner {

    @Autowired
    private ScheduleTaskRecordRepository recordRepository;

    @Autowired
    private TaskExecutor taskExecutor;

    @Autowired
    private ScheduleTaskRecordService recordService;

    @Value("${scheduler.compensation.enabled:true}")
    private boolean compensationEnabled;

    @Value("${scheduler.compensation.tolerance-minutes:5}")
    private int toleranceMinutes;

    @PostConstruct
    public void init() {
        if (compensationEnabled) {
            log.info("启动漏跑任务扫描补偿...");
            scanAndCompensate();
        }
    }

    @Scheduled(fixedDelayString = "${scheduler.compensation.scan-interval-ms:60000}")
    public void scanAndCompensate() {
        try {
            List<ScheduleTaskRecord> missedTasks = findMissedTasks();

            for (ScheduleTaskRecord task : missedTasks) {
                try {
                    compensateTask(task);
                } catch (Exception e) {
                    log.error("补偿任务失败: taskId={}", task.getId(), e);
                }
            }

            if (!missedTasks.isEmpty()) {
                log.info("本次扫描到 {} 个漏跑任务,已全部处理", missedTasks.size());
            }
        } catch (Exception e) {
            log.error("漏跑任务扫描异常", e);
        }
    }

    private List<ScheduleTaskRecord> findMissedTasks() {
        LocalDateTime threshold = LocalDateTime.now()
                .minusMinutes(toleranceMinutes);

        return recordRepository.findMissedTasks(
                TaskStatus.PENDING.name(),
                TaskStatus.RUNNING.name(),
                threshold
        );
    }

    @Transactional
    public void compensateTask(ScheduleTaskRecord task) {
        log.info("开始补偿任务: id={}, taskName={}, fireTime={}",
                task.getId(), task.getTaskName(), task.getFireTime());

        task.setStatus(TaskStatus.RUNNING.name());
        task.setStartTime(LocalDateTime.now());
        recordRepository.save(task);

        try {
            TaskContext context = TaskContext.builder()
                    .recordId(task.getId())
                    .taskName(task.getTaskName())
                    .taskGroup(task.getTaskGroup())
                    .fireTime(task.getFireTime())
                    .isCompensation(true)
                    .build();

            taskExecutor.execute(context);

            task.setStatus(TaskStatus.COMPENSATED.name());
            task.setEndTime(LocalDateTime.now());
            task.setResult("补偿执行成功");
            recordRepository.save(task);

            log.info("任务补偿成功: id={}", task.getId());

        } catch (Exception e) {
            log.error("任务补偿执行异常: id={}", task.getId(), e);
            task.setStatus(TaskStatus.FAILED.name());
            task.setEndTime(LocalDateTime.now());
            task.setResult("补偿执行失败: " + e.getMessage());
            recordRepository.save(task);

            throw e;
        }
    }
}

4. 任务执行器

@Component
@Slf4j
public class TaskExecutor {

    @Autowired
    private ScheduleTaskRecordService recordService;

    @Autowired
    private Map<String, TaskHandler> taskHandlerMap;

    public void execute(TaskContext context) {
        String taskKey = context.getTaskGroup() + ":" + context.getTaskName();
        TaskHandler handler = taskHandlerMap.get(taskKey);

        if (handler == null) {
            throw new IllegalArgumentException("未找到任务处理器: " + taskKey);
        }

        recordService.recordTaskStart(context.getRecordId());

        try {
            Object result = handler.execute(context);
            recordService.recordTaskComplete(context.getRecordId(),
                    JSON.toJSONString(result));
        } catch (Exception e) {
            recordService.recordTaskFailed(context.getRecordId(),
                    e.getMessage());
            throw e;
        }
    }
}

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TaskContext {
    private Long recordId;
    private String taskName;
    private String taskGroup;
    private LocalDateTime fireTime;
    private boolean isCompensation;
    private Map<String, Object> params;
}

5. 任务执行记录 Repository

@Repository
public interface ScheduleTaskRecordRepository extends JpaRepository<ScheduleTaskRecord, Long> {

    @Query("""
        SELECT r FROM ScheduleTaskRecord r
        WHERE r.status IN :statuses
        AND r.fireTime < :threshold
        AND r.fireTime > :retentionDate
        ORDER BY r.fireTime ASC
        """)
    List<ScheduleTaskRecord> findMissedTasks(
            @Param("statuses") List<String> statuses,
            @Param("threshold") LocalDateTime threshold,
            @Param("retentionDate") LocalDateTime retentionDate);

    @Query("""
        SELECT r FROM ScheduleTaskRecord r
        WHERE r.taskName = :taskName
        AND r.taskGroup = :taskGroup
        AND r.fireTime = :fireTime
        """)
    List<ScheduleTaskRecord> findByTaskAndFireTime(
            @Param("taskName") String taskName,
            @Param("taskGroup") String taskGroup,
            @Param("fireTime") LocalDateTime fireTime);

    List<ScheduleTaskRecord> findByStatusAndFireTimeBetween(
            String status, LocalDateTime start, LocalDateTime end);
}

6. 定时任务调度器

@Component
@Slf4j
public class SchedulerManager {

    @Autowired
    private ScheduleTaskRecordService recordService;

    @Autowired
    private TaskExecutor taskExecutor;

    private final Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();

    public void scheduleTask(ScheduleTaskDO task) {
        ScheduledFuture<?> future = task.getScheduler().scheduleAtFixedRate(
                () -> executeTask(task),
                task.getInitialDelay(),
                task.getPeriod(),
                task.getUnit()
        );

        scheduledTasks.put(task.getTaskKey(), future);
        log.info("定时任务已注册: taskKey={}, period={}",
                task.getTaskKey(), task.getPeriod());
    }

    private void executeTask(ScheduleTaskDO task) {
        log.info("触发定时任务: taskName={}", task.getTaskName());

        recordService.recordTaskFired(task);

        TaskContext context = TaskContext.builder()
                .taskName(task.getTaskName())
                .taskGroup(task.getTaskGroup())
                .fireTime(LocalDateTime.now())
                .isCompensation(false)
                .build();

        try {
            taskExecutor.execute(context);
        } catch (Exception e) {
            log.error("定时任务执行异常: taskName={}", task.getTaskName(), e);
        }
    }

    public void cancelTask(String taskKey) {
        ScheduledFuture<?> future = scheduledTasks.get(taskKey);
        if (future != null) {
            future.cancel(false);
            scheduledTasks.remove(taskKey);
            log.info("定时任务已取消: taskKey={}", taskKey);
        }
    }
}

补偿策略详解

1. 时间窗口容忍

@Configuration
public class CompensationConfig {

    @Value("${scheduler.compensation.tolerance-minutes:5}")
    private int toleranceMinutes;

    @Value("${scheduler.compensation.max-retention-days:7}")
    private int maxRetentionDays;

    @Bean
    public CompensationPolicy tolerancePolicy() {
        return new ToleranceWindowPolicy(toleranceMinutes);
    }
}

public interface CompensationPolicy {
    boolean shouldCompensate(ScheduleTaskRecord task);
    int getPriority();
}

public class ToleranceWindowPolicy implements CompensationPolicy {

    private final int toleranceMinutes;

    public ToleranceWindowPolicy(int toleranceMinutes) {
        this.toleranceMinutes = toleranceMinutes;
    }

    @Override
    public boolean shouldCompensate(ScheduleTaskRecord task) {
        LocalDateTime deadline = task.getFireTime()
                .plusMinutes(toleranceMinutes);
        return LocalDateTime.now().isAfter(deadline);
    }
}

2. 幂等性保证

@Component
public class IdempotencyGuard {

    @Autowired
    private ScheduleTaskRecordRepository recordRepository;

    @Autowired
    private RedissonClient redissonClient;

    public boolean tryAcquire(ScheduleTaskRecord task) {
        String lockKey = "task:lock:" + task.getTaskGroup() + ":"
                + task.getTaskName() + ":" + task.getFireTime();

        RLock lock = redissonClient.getLock(lockKey);
        return lock.tryLock(0, 30, TimeUnit.MINUTES);
    }

    public void release(ScheduleTaskRecord task) {
        String lockKey = "task:lock:" + task.getTaskGroup() + ":"
                + task.getTaskName() + ":" + task.getFireTime();

        RLock lock = redissonClient.getLock(lockKey);
        if (lock.isHeldByCurrentThread()) {
            lock.unlock();
        }
    }
}

3. 进度记录与断点续传

@Component
public class ProgressRecorder {

    @Autowired
    private ScheduleTaskRecordService recordService;

    public void recordProgress(TaskContext context, int progress) {
        recordService.recordProgress(context.getRecordId(), progress);
    }

    public ProgressState getProgressState(TaskContext context) {
        ScheduleTaskRecord record = recordService.getRecord(context.getRecordId());
        if (record == null || record.getProgress() == null) {
            return new ProgressState(0, null);
        }
        return new ProgressState(record.getProgress(), record.getResult());
    }
}

@Data
@AllArgsConstructor
public class ProgressState {
    private int progress;
    private String checkpoint;
}

监控与告警

1. 补偿任务监控

@Component
@Slf4j
public class CompensationMonitor {

    @Autowired
    private ScheduleTaskRecordRepository recordRepository;

    @Scheduled(fixedRate = 300000)
    public void checkCompensationStatus() {
        LocalDateTime oneHourAgo = LocalDateTime.now().minusHours(1);

        List<ScheduleTaskRecord> compensatedTasks = recordRepository
                .findByStatusAndFireTimeBetween(
                        TaskStatus.COMPENSATED.name(),
                        oneHourAgo,
                        LocalDateTime.now()
                );

        if (!compensatedTasks.isEmpty()) {
            log.warn("过去1小时有 {} 个任务被补偿执行", compensatedTasks.size());

            for (ScheduleTaskRecord task : compensatedTasks) {
                log.warn("补偿任务详情: taskName={}, fireTime={}, actualExecuteTime={}",
                        task.getTaskName(), task.getFireTime(), task.getStartTime());
            }
        }
    }

    @Scheduled(fixedRate = 300000)
    public void checkLongRunningTasks() {
        LocalDateTime threshold = LocalDateTime.now().minusMinutes(30);

        List<ScheduleTaskRecord> longRunningTasks = recordRepository
                .findByStatusAndFireTimeBetween(
                        TaskStatus.RUNNING.name(),
                        LocalDateTime.MIN,
                        threshold
                );

        if (!longRunningTasks.isEmpty()) {
            log.error("发现 {} 个长时间运行的任务,可能已挂死", longRunningTasks.size());

            for (ScheduleTaskRecord task : longRunningTasks) {
                alertService.sendAlert("LONG_RUNNING_TASK", task);
            }
        }
    }
}

2. 告警服务

@Component
@Slf4j
public class AlertService {

    public void sendAlert(String alertType, ScheduleTaskRecord task) {
        String message = buildAlertMessage(alertType, task);
        log.error("发送告警: type={}, message={}", alertType, message);

        // 实际发送告警(邮件、钉钉、企业微信等)
        // sendEmail(message);
        // sendDingTalk(message);
    }

    private String buildAlertMessage(String alertType, ScheduleTaskRecord task) {
        return String.format(
                "【%s告警】\n" +
                "任务名称: %s\n" +
                "任务组: %s\n" +
                "触发时间: %s\n" +
                "当前状态: %s\n" +
                "执行进度: %d%%",
                alertType,
                task.getTaskName(),
                task.getTaskGroup(),
                task.getFireTime(),
                task.getStatus(),
                task.getProgress() != null ? task.getProgress() : 0
        );
    }
}

配置说明

server:
  port: 8080

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/scheduler_demo?useSSL=false&serverTimezone=Asia/Shanghai
    username: root
    password: root

scheduler:
  compensation:
    enabled: true
    tolerance-minutes: 5
    scan-interval-ms: 60000
    max-retention-days: 7

logging:
  level:
    com.example.scheduler: DEBUG
配置项说明默认值
scheduler.compensation.enabled是否启用补偿机制true
scheduler.compensation.tolerance-minutes容忍时间(分钟)5
scheduler.compensation.scan-interval-ms扫描间隔(毫秒)60000
scheduler.compensation.max-retention-days记录保留天数7

性能对比

补偿效果对比

场景无补偿有补偿改进
服务宕机重启任务漏执行,数据不一致自动补跑,数据完整
凌晨批处理漏跑手工补数,耗时 2h自动补偿,2min
网络抖动任务丢失,需人工介入自动重试,无需干预
JVM Full GC任务错过,业务异常延迟补偿,自动恢复

补偿时间分布

容忍时间: 5分钟

任务漏跑 → 服务重启 → 扫描漏跑任务 → 补偿执行
   ↓          ↓            ↓             ↓
  12:00     12:03        12:04         12:05

最大延迟: tolerance-minutes + scan-interval + execute-time

常见问题

Q: 如何避免补偿任务与正常任务重复执行?

A: 通过以下手段保证幂等性:

  1. 分布式锁taskName + taskGroup + fireTime 作为锁 key
  2. 状态机:任务状态只能 PENDING → RUNNING → COMPLETED/FAILED
  3. 唯一索引:在数据库建立 (taskName, taskGroup, fireTime) 唯一索引

Q: 补偿任务执行失败怎么办?

A: 采用指数退避重试策略:

  1. 首次失败:立即重试
  2. 第二次失败:等待 1 分钟
  3. 第三次失败:等待 5 分钟
  4. 超过最大重试次数:标记为 FAILED,发送告警

Q: 如何控制补偿任务的并发度?

A: 使用信号量限制:

@Value("${scheduler.compensation.max-concurrency:3}")
private int maxConcurrency;

private Semaphore semaphore = new Semaphore(maxConcurrency);

public void compensateTask(ScheduleTaskRecord task) {
    semaphore.acquire();
    try {
        // 执行补偿
    } finally {
        semaphore.release();
    }
}

Q: 补偿任务会影响正常任务的执行吗?

A: 不会。补偿任务和正常任务使用不同的线程池:

  • 正常任务:scheduled-tasks-pool
  • 补偿任务:compensation-pool

总结

通过本文的优化方案,我们可以实现:

  1. 自动检测漏跑任务:服务重启后自动扫描并补偿
  2. 零人工干预:无需运维人员半夜起床补数据
  3. 数据完整性保证:所有应该执行的任务都会被执行
  4. 可追溯可监控:完整的执行记录和告警机制

关键设计

  • 任务执行记录表:记录任务触发时间、状态、进度
  • 漏跑任务扫描器:定时扫描并补偿漏跑任务
  • 时间窗口容忍:避免网络抖动等瞬时故障误补偿
  • 分布式锁:保证补偿任务的幂等性
  • 进度记录:支持断点续传

在实际生产环境中,建议根据业务特点调整容忍时间和扫描间隔,确保补偿机制的及时性和准确性。


源码获取

文章已同步至小程序博客栏目,需要源码的请关注小程序博客。

公众号:服务端技术精选

小程序码:


标题:调度任务漏执行补偿:服务宕机重启后,自动补跑错过的定时任务!
作者:jiangyi
地址:http://jiangyi.space/articles/2026/05/13/1778385477325.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消