任务执行日志自动归档:历史数据膨胀拖慢查询?定时清理 + 冷热数据分离

一、问题背景:日志数据的"熵增困境"

你是否遇到过这样的场景:系统运行一段时间后,任务执行日志表的数据量达到数十亿条,导致:

  1. 查询变慢:简单的日志查询需要数秒甚至数分钟
  2. 存储成本飙升:SSD 存储费用持续增长
  3. 备份困难:全量备份耗时过长
  4. DDL 操作阻塞:添加索引或修改表结构需要长时间锁表

这就是典型的历史数据膨胀问题。任务执行日志通常具有"写多读少"的特点,超过90%的日志数据写入后很少被访问,但却占用着宝贵的存储资源和查询性能。


二、核心概念:冷热数据分离原理

2.1 数据生命周期模型

┌──────────────────────────────────────────────────────────────────┐
│                     数据生命周期                                 │
├──────────────────────────────────────────────────────────────────┤
│                                                                 │
│  热数据 ──► 温数据 ──► 冷数据 ──► 归档数据 ──► 删除              │
│   (7天)    (30天)     (90天)      (365天)                        │
│                                                                 │
│   │          │          │           │                           │
│   ▼          ▼          ▼           ▼                           │
│  SSD       SSD       HDD/OSS    HDD/OSS                        │
│  实时查询   快速查询   批量查询   归档存储                        │
└──────────────────────────────────────────────────────────────────┘

2.2 冷热数据定义

数据类型定义访问频率存储介质保留期限
热数据最近7天的日志高(频繁查询)SSD7天
温数据7-30天的日志中(偶尔查询)SSD30天
冷数据30-90天的日志低(极少查询)HDD/OSS90天
归档数据90天以上的日志极低(审计/合规)归档存储按需

三、实现方案:定时清理 + 冷热分离

3.1 方案架构设计

┌──────────────────────────────────────────────────────────────────┐
│                    日志归档系统架构                               │
├──────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐     │
│  │  热数据表    │───►│  温数据表    │───►│  冷数据表    │     │
│  │ task_log     │    │ task_log_warm│    │ task_log_cold│     │
│  └──────────────┘    └──────────────┘    └──────────────┘     │
│         │                   │                   │               │
│         ▼                   ▼                   ▼               │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐     │
│  │   查询服务   │    │   查询服务   │    │   查询服务   │     │
│  └──────────────┘    └──────────────┘    └──────────────┘     │
│                                                                 │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │                    定时归档任务                             │  │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────────┐ │  │
│  │  │ 热→温   │  │ 温→冷   │  │ 冷→归档 │  │ 清理过期   │ │  │
│  │  └─────────┘  └─────────┘  └─────────┘  └─────────────┘ │  │
│  └───────────────────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────────────────────┘

3.2 数据库表设计

热数据表(task_log)

CREATE TABLE `task_log` (
  `id` BIGINT AUTO_INCREMENT PRIMARY KEY,
  `task_id` VARCHAR(64) NOT NULL,
  `task_name` VARCHAR(255) NOT NULL,
  `status` ENUM('PENDING', 'RUNNING', 'SUCCESS', 'FAILED', 'TIMEOUT') NOT NULL,
  `start_time` DATETIME NOT NULL,
  `end_time` DATETIME,
  `duration_ms` BIGINT,
  `error_message` TEXT,
  `created_at` DATETIME DEFAULT CURRENT_TIMESTAMP,
  INDEX `idx_task_id` (`task_id`),
  INDEX `idx_status` (`status`),
  INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

温数据表(task_log_warm)

CREATE TABLE `task_log_warm` (
  `id` BIGINT PRIMARY KEY,
  `task_id` VARCHAR(64) NOT NULL,
  `task_name` VARCHAR(255) NOT NULL,
  `status` ENUM('PENDING', 'RUNNING', 'SUCCESS', 'FAILED', 'TIMEOUT') NOT NULL,
  `start_time` DATETIME NOT NULL,
  `end_time` DATETIME,
  `duration_ms` BIGINT,
  `error_message` TEXT,
  `created_at` DATETIME,
  INDEX `idx_task_id` (`task_id`),
  INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

冷数据表(task_log_cold)

CREATE TABLE `task_log_cold` (
  `id` BIGINT PRIMARY KEY,
  `task_id` VARCHAR(64) NOT NULL,
  `task_name` VARCHAR(255) NOT NULL,
  `status` ENUM('PENDING', 'RUNNING', 'SUCCESS', 'FAILED', 'TIMEOUT') NOT NULL,
  `start_time` DATETIME NOT NULL,
  `end_time` DATETIME,
  `duration_ms` BIGINT,
  `error_message` TEXT,
  `created_at` DATETIME,
  INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPRESSED;

3.3 归档配置类

@ConfigurationProperties(prefix = "log.archive")
@Data
@Configuration
public class ArchiveProperties {
    
    /** 是否启用归档 */
    private boolean enabled = true;
    
    /** 热数据保留天数 */
    private int hotRetentionDays = 7;
    
    /** 温数据保留天数 */
    private int warmRetentionDays = 30;
    
    /** 冷数据保留天数 */
    private int coldRetentionDays = 90;
    
    /** 归档批次大小 */
    private int batchSize = 1000;
    
    /** 归档时间(每天凌晨2点) */
    private String cron = "0 0 2 * * ?";
    
    /** 是否压缩冷数据 */
    private boolean compressColdData = true;
}

3.4 归档任务实现

@Component
@Slf4j
public class LogArchiveTask {
    
    @Autowired
    private LogArchiveService archiveService;
    
    @Autowired
    private ArchiveProperties properties;
    
    @Scheduled(cron = "${log.archive.cron:0 0 2 * * ?}")
    public void executeArchive() {
        if (!properties.isEnabled()) {
            log.info("Log archive is disabled");
            return;
        }
        
        log.info("Starting log archive task");
        
        try {
            // 热→温归档
            int hotToWarmCount = archiveService.archiveHotToWarm();
            log.info("Archived {} records from hot to warm", hotToWarmCount);
            
            // 温→冷归档
            int warmToColdCount = archiveService.archiveWarmToCold();
            log.info("Archived {} records from warm to cold", warmToColdCount);
            
            // 删除过期冷数据
            int deletedCount = archiveService.cleanupExpiredColdData();
            log.info("Deleted {} expired cold records", deletedCount);
            
            log.info("Log archive task completed successfully");
        } catch (Exception e) {
            log.error("Log archive task failed", e);
        }
    }
}

3.5 归档服务实现

@Service
@Slf4j
public class LogArchiveService {
    
    @Autowired
    private TaskLogRepository hotLogRepository;
    
    @Autowired
    private TaskLogWarmRepository warmLogRepository;
    
    @Autowired
    private TaskLogColdRepository coldLogRepository;
    
    @Autowired
    private ArchiveProperties properties;
    
    /**
     * 热数据→温数据归档
     */
    @Transactional
    public int archiveHotToWarm() {
        LocalDateTime cutoffTime = LocalDateTime.now().minusDays(properties.getHotRetentionDays());
        int totalArchived = 0;
        
        while (true) {
            List<TaskLog> hotLogs = hotLogRepository.findOldLogs(cutoffTime, properties.getBatchSize());
            
            if (hotLogs.isEmpty()) {
                break;
            }
            
            List<TaskLogWarm> warmLogs = hotLogs.stream()
                .map(this::convertToWarm)
                .collect(Collectors.toList());
            
            warmLogRepository.saveAll(warmLogs);
            hotLogRepository.deleteAll(hotLogs);
            
            totalArchived += hotLogs.size();
            log.debug("Archived {} hot records to warm", hotLogs.size());
        }
        
        return totalArchived;
    }
    
    /**
     * 温数据→冷数据归档
     */
    @Transactional
    public int archiveWarmToCold() {
        LocalDateTime cutoffTime = LocalDateTime.now().minusDays(properties.getWarmRetentionDays());
        int totalArchived = 0;
        
        while (true) {
            List<TaskLogWarm> warmLogs = warmLogRepository.findOldLogs(cutoffTime, properties.getBatchSize());
            
            if (warmLogs.isEmpty()) {
                break;
            }
            
            List<TaskLogCold> coldLogs = warmLogs.stream()
                .map(this::convertToCold)
                .collect(Collectors.toList());
            
            coldLogRepository.saveAll(coldLogs);
            warmLogRepository.deleteAll(warmLogs);
            
            totalArchived += warmLogs.size();
            log.debug("Archived {} warm records to cold", warmLogs.size());
        }
        
        return totalArchived;
    }
    
    /**
     * 清理过期冷数据
     */
    @Transactional
    public int cleanupExpiredColdData() {
        LocalDateTime cutoffTime = LocalDateTime.now().minusDays(properties.getColdRetentionDays());
        return coldLogRepository.deleteByCreatedAtBefore(cutoffTime);
    }
    
    private TaskLogWarm convertToWarm(TaskLog log) {
        TaskLogWarm warm = new TaskLogWarm();
        warm.setId(log.getId());
        warm.setTaskId(log.getTaskId());
        warm.setTaskName(log.getTaskName());
        warm.setStatus(log.getStatus());
        warm.setStartTime(log.getStartTime());
        warm.setEndTime(log.getEndTime());
        warm.setDurationMs(log.getDurationMs());
        warm.setErrorMessage(log.getErrorMessage());
        warm.setCreatedAt(log.getCreatedAt());
        return warm;
    }
    
    private TaskLogCold convertToCold(TaskLogWarm warm) {
        TaskLogCold cold = new TaskLogCold();
        cold.setId(warm.getId());
        cold.setTaskId(warm.getTaskId());
        cold.setTaskName(warm.getTaskName());
        cold.setStatus(warm.getStatus());
        cold.setStartTime(warm.getStartTime());
        cold.setEndTime(warm.getEndTime());
        cold.setDurationMs(warm.getDurationMs());
        cold.setErrorMessage(warm.getErrorMessage());
        cold.setCreatedAt(warm.getCreatedAt());
        return cold;
    }
}

3.6 统一查询服务

@Service
@Slf4j
public class LogQueryService {
    
    @Autowired
    private TaskLogRepository hotLogRepository;
    
    @Autowired
    private TaskLogWarmRepository warmLogRepository;
    
    @Autowired
    private TaskLogColdRepository coldLogRepository;
    
    /**
     * 根据时间范围查询日志(自动选择合适的表)
     */
    public List<TaskLogDTO> queryLogsByTimeRange(LocalDateTime startTime, LocalDateTime endTime) {
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime hotCutoff = now.minusDays(7);
        LocalDateTime warmCutoff = now.minusDays(30);
        
        List<TaskLogDTO> results = new ArrayList<>();
        
        // 查询热数据
        if (endTime.isAfter(hotCutoff)) {
            LocalDateTime queryStart = startTime.isBefore(hotCutoff) ? hotCutoff : startTime;
            List<TaskLog> hotLogs = hotLogRepository.findByCreatedAtBetween(queryStart, endTime);
            results.addAll(hotLogs.stream().map(this::convertToDTO).collect(Collectors.toList()));
        }
        
        // 查询温数据
        if (startTime.isBefore(hotCutoff) && endTime.isAfter(warmCutoff)) {
            LocalDateTime queryStart = startTime.isBefore(warmCutoff) ? warmCutoff : startTime;
            LocalDateTime queryEnd = endTime.isAfter(hotCutoff) ? hotCutoff : endTime;
            List<TaskLogWarm> warmLogs = warmLogRepository.findByCreatedAtBetween(queryStart, queryEnd);
            results.addAll(warmLogs.stream().map(this::convertToDTO).collect(Collectors.toList()));
        }
        
        // 查询冷数据
        if (startTime.isBefore(warmCutoff)) {
            LocalDateTime queryEnd = endTime.isAfter(warmCutoff) ? warmCutoff : endTime;
            List<TaskLogCold> coldLogs = coldLogRepository.findByCreatedAtBetween(startTime, queryEnd);
            results.addAll(coldLogs.stream().map(this::convertToDTO).collect(Collectors.toList()));
        }
        
        // 按创建时间排序
        results.sort(Comparator.comparing(TaskLogDTO::getCreatedAt));
        
        return results;
    }
    
    private TaskLogDTO convertToDTO(TaskLog log) {
        TaskLogDTO dto = new TaskLogDTO();
        dto.setId(log.getId());
        dto.setTaskId(log.getTaskId());
        dto.setTaskName(log.getTaskName());
        dto.setStatus(log.getStatus());
        dto.setStartTime(log.getStartTime());
        dto.setEndTime(log.getEndTime());
        dto.setDurationMs(log.getDurationMs());
        dto.setErrorMessage(log.getErrorMessage());
        dto.setCreatedAt(log.getCreatedAt());
        dto.setDataTier("HOT");
        return dto;
    }
    
    private TaskLogDTO convertToDTO(TaskLogWarm warm) {
        TaskLogDTO dto = new TaskLogDTO();
        dto.setId(warm.getId());
        dto.setTaskId(warm.getTaskId());
        dto.setTaskName(warm.getTaskName());
        dto.setStatus(warm.getStatus());
        dto.setStartTime(warm.getStartTime());
        dto.setEndTime(warm.getEndTime());
        dto.setDurationMs(warm.getDurationMs());
        dto.setErrorMessage(warm.getErrorMessage());
        dto.setCreatedAt(warm.getCreatedAt());
        dto.setDataTier("WARM");
        return dto;
    }
    
    private TaskLogDTO convertToDTO(TaskLogCold cold) {
        TaskLogDTO dto = new TaskLogDTO();
        dto.setId(cold.getId());
        dto.setTaskId(cold.getTaskId());
        dto.setTaskName(cold.getTaskName());
        dto.setStatus(cold.getStatus());
        dto.setStartTime(cold.getStartTime());
        dto.setEndTime(cold.getEndTime());
        dto.setDurationMs(cold.getDurationMs());
        dto.setErrorMessage(cold.getErrorMessage());
        dto.setCreatedAt(cold.getCreatedAt());
        dto.setDataTier("COLD");
        return dto;
    }
}

3.7 DTO 和 Repository

@Data
public class TaskLogDTO {
    private Long id;
    private String taskId;
    private String taskName;
    private String status;
    private LocalDateTime startTime;
    private LocalDateTime endTime;
    private Long durationMs;
    private String errorMessage;
    private LocalDateTime createdAt;
    private String dataTier;
}

@Repository
public interface TaskLogRepository extends JpaRepository<TaskLog, Long> {
    List<TaskLog> findByCreatedAtBetween(LocalDateTime start, LocalDateTime end);
    @Query("SELECT t FROM TaskLog t WHERE t.createdAt < :cutoffTime ORDER BY t.createdAt ASC")
    List<TaskLog> findOldLogs(@Param("cutoffTime") LocalDateTime cutoffTime, Pageable pageable);
}

@Repository
public interface TaskLogWarmRepository extends JpaRepository<TaskLogWarm, Long> {
    List<TaskLogWarm> findByCreatedAtBetween(LocalDateTime start, LocalDateTime end);
    @Query("SELECT t FROM TaskLogWarm t WHERE t.createdAt < :cutoffTime ORDER BY t.createdAt ASC")
    List<TaskLogWarm> findOldLogs(@Param("cutoffTime") LocalDateTime cutoffTime, Pageable pageable);
}

@Repository
public interface TaskLogColdRepository extends JpaRepository<TaskLogCold, Long> {
    List<TaskLogCold> findByCreatedAtBetween(LocalDateTime start, LocalDateTime end);
    int deleteByCreatedAtBefore(LocalDateTime cutoffTime);
}

四、配置文件示例

server:
  port: 8080

spring:
  application:
    name: log-archive-demo
  datasource:
    url: jdbc:mysql://localhost:3306/example_db?useSSL=false&serverTimezone=UTC
    username: admin
    password: password
    driver-class-name: com.mysql.cj.jdbc.Driver
  jpa:
    hibernate:
      ddl-auto: update
    show-sql: false

# 日志归档配置
log:
  archive:
    enabled: true
    hot-retention-days: 7
    warm-retention-days: 30
    cold-retention-days: 90
    batch-size: 1000
    cron: "0 0 2 * * ?"
    compress-cold-data: true

# 监控配置
management:
  endpoints:
    web:
      exposure:
        include: health, info, prometheus, metrics
  metrics:
    tags:
      application: ${spring.application.name}

logging:
  level:
    com.example.archive: DEBUG

五、监控与告警

5.1 归档监控指标

@Component
public class ArchiveMetrics {
    
    private final MeterRegistry meterRegistry;
    
    public ArchiveMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
    
    public void recordArchive(String sourceTable, String targetTable, int count) {
        Counter.builder("log.archive.count")
            .tag("source", sourceTable)
            .tag("target", targetTable)
            .register(meterRegistry)
            .increment(count);
    }
    
    public void recordArchiveDuration(String operation, long durationMs) {
        Timer.builder("log.archive.duration")
            .tag("operation", operation)
            .register(meterRegistry)
            .record(durationMs, TimeUnit.MILLISECONDS);
    }
    
    public void recordCleanup(int count) {
        Counter.builder("log.archive.cleanup.count")
            .register(meterRegistry)
            .increment(count);
    }
    
    public void recordArchiveFailure(String operation) {
        Counter.builder("log.archive.failure.count")
            .tag("operation", operation)
            .register(meterRegistry)
            .increment();
    }
}

5.2 Prometheus 告警规则

groups:
- name: log_archive_alerts
  rules:
  - alert: LogArchiveFailure
    expr: log_archive_failure_count > 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "日志归档失败"
      description: "归档操作 {{ $labels.operation }} 失败"
  
  - alert: HotLogTableLarge
    expr: sum(log_archive_count{source="task_log"}) - sum(log_archive_count{target="task_log_warm"}) > 1000000
    for: 1h
    labels:
      severity: warning
    annotations:
      summary: "热数据表过大"
      description: "热数据表记录数超过100万"
  
  - alert: ArchiveDurationHigh
    expr: log_archive_duration_seconds > 300
    for: 1m
    labels:
      severity: warning
    annotations:
      summary: "归档耗时过长"
      description: "归档操作耗时超过5分钟"

六、最佳实践建议

6.1 数据分层策略

分层保留期限存储建议索引策略
热数据7天SSD完整索引
温数据30天SSD必要索引
冷数据90天HDD/压缩最少索引
归档数据按需对象存储无索引

6.2 归档注意事项

  1. 选择低峰期执行:建议在凌晨2-4点执行归档任务
  2. 分批处理:避免单次归档过多数据导致锁表
  3. 事务控制:确保归档操作的原子性
  4. 监控告警:及时发现归档失败
  5. 数据验证:归档后验证数据完整性

6.3 性能优化技巧

  1. 使用批量操作saveAll()deleteAll() 批量处理
  2. 禁用二级缓存:归档期间禁用 Hibernate 二级缓存
  3. 调整连接池大小:确保有足够的数据库连接
  4. 读写分离:归档操作使用从库
  5. 分区表:对冷数据表使用分区

互动话题

您在生产环境中是如何管理历史日志数据的?是否遇到过日志表膨胀导致的性能问题?欢迎在评论区分享您的经验!


标题:任务执行日志自动归档:历史数据膨胀拖慢查询?定时清理 + 冷热数据分离
作者:jiangyi
地址:http://jiangyi.space/articles/2026/06/17/1781423877372.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消