任务执行日志自动归档:历史数据膨胀拖慢查询?定时清理 + 冷热数据分离
一、问题背景:日志数据的"熵增困境"
你是否遇到过这样的场景:系统运行一段时间后,任务执行日志表的数据量达到数十亿条,导致:
- 查询变慢:简单的日志查询需要数秒甚至数分钟
- 存储成本飙升:SSD 存储费用持续增长
- 备份困难:全量备份耗时过长
- DDL 操作阻塞:添加索引或修改表结构需要长时间锁表
这就是典型的历史数据膨胀问题。任务执行日志通常具有"写多读少"的特点,超过90%的日志数据写入后很少被访问,但却占用着宝贵的存储资源和查询性能。
二、核心概念:冷热数据分离原理
2.1 数据生命周期模型
┌──────────────────────────────────────────────────────────────────┐
│ 数据生命周期 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ 热数据 ──► 温数据 ──► 冷数据 ──► 归档数据 ──► 删除 │
│ (7天) (30天) (90天) (365天) │
│ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ SSD SSD HDD/OSS HDD/OSS │
│ 实时查询 快速查询 批量查询 归档存储 │
└──────────────────────────────────────────────────────────────────┘
2.2 冷热数据定义
| 数据类型 | 定义 | 访问频率 | 存储介质 | 保留期限 |
|---|---|---|---|---|
| 热数据 | 最近7天的日志 | 高(频繁查询) | SSD | 7天 |
| 温数据 | 7-30天的日志 | 中(偶尔查询) | SSD | 30天 |
| 冷数据 | 30-90天的日志 | 低(极少查询) | HDD/OSS | 90天 |
| 归档数据 | 90天以上的日志 | 极低(审计/合规) | 归档存储 | 按需 |
三、实现方案:定时清理 + 冷热分离
3.1 方案架构设计
┌──────────────────────────────────────────────────────────────────┐
│ 日志归档系统架构 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 热数据表 │───►│ 温数据表 │───►│ 冷数据表 │ │
│ │ task_log │ │ task_log_warm│ │ task_log_cold│ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 查询服务 │ │ 查询服务 │ │ 查询服务 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 定时归档任务 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │ │
│ │ │ 热→温 │ │ 温→冷 │ │ 冷→归档 │ │ 清理过期 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
3.2 数据库表设计
热数据表(task_log):
CREATE TABLE `task_log` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY,
`task_id` VARCHAR(64) NOT NULL,
`task_name` VARCHAR(255) NOT NULL,
`status` ENUM('PENDING', 'RUNNING', 'SUCCESS', 'FAILED', 'TIMEOUT') NOT NULL,
`start_time` DATETIME NOT NULL,
`end_time` DATETIME,
`duration_ms` BIGINT,
`error_message` TEXT,
`created_at` DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX `idx_task_id` (`task_id`),
INDEX `idx_status` (`status`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
温数据表(task_log_warm):
CREATE TABLE `task_log_warm` (
`id` BIGINT PRIMARY KEY,
`task_id` VARCHAR(64) NOT NULL,
`task_name` VARCHAR(255) NOT NULL,
`status` ENUM('PENDING', 'RUNNING', 'SUCCESS', 'FAILED', 'TIMEOUT') NOT NULL,
`start_time` DATETIME NOT NULL,
`end_time` DATETIME,
`duration_ms` BIGINT,
`error_message` TEXT,
`created_at` DATETIME,
INDEX `idx_task_id` (`task_id`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
冷数据表(task_log_cold):
CREATE TABLE `task_log_cold` (
`id` BIGINT PRIMARY KEY,
`task_id` VARCHAR(64) NOT NULL,
`task_name` VARCHAR(255) NOT NULL,
`status` ENUM('PENDING', 'RUNNING', 'SUCCESS', 'FAILED', 'TIMEOUT') NOT NULL,
`start_time` DATETIME NOT NULL,
`end_time` DATETIME,
`duration_ms` BIGINT,
`error_message` TEXT,
`created_at` DATETIME,
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPRESSED;
3.3 归档配置类
@ConfigurationProperties(prefix = "log.archive")
@Data
@Configuration
public class ArchiveProperties {
/** 是否启用归档 */
private boolean enabled = true;
/** 热数据保留天数 */
private int hotRetentionDays = 7;
/** 温数据保留天数 */
private int warmRetentionDays = 30;
/** 冷数据保留天数 */
private int coldRetentionDays = 90;
/** 归档批次大小 */
private int batchSize = 1000;
/** 归档时间(每天凌晨2点) */
private String cron = "0 0 2 * * ?";
/** 是否压缩冷数据 */
private boolean compressColdData = true;
}
3.4 归档任务实现
@Component
@Slf4j
public class LogArchiveTask {
@Autowired
private LogArchiveService archiveService;
@Autowired
private ArchiveProperties properties;
@Scheduled(cron = "${log.archive.cron:0 0 2 * * ?}")
public void executeArchive() {
if (!properties.isEnabled()) {
log.info("Log archive is disabled");
return;
}
log.info("Starting log archive task");
try {
// 热→温归档
int hotToWarmCount = archiveService.archiveHotToWarm();
log.info("Archived {} records from hot to warm", hotToWarmCount);
// 温→冷归档
int warmToColdCount = archiveService.archiveWarmToCold();
log.info("Archived {} records from warm to cold", warmToColdCount);
// 删除过期冷数据
int deletedCount = archiveService.cleanupExpiredColdData();
log.info("Deleted {} expired cold records", deletedCount);
log.info("Log archive task completed successfully");
} catch (Exception e) {
log.error("Log archive task failed", e);
}
}
}
3.5 归档服务实现
@Service
@Slf4j
public class LogArchiveService {
@Autowired
private TaskLogRepository hotLogRepository;
@Autowired
private TaskLogWarmRepository warmLogRepository;
@Autowired
private TaskLogColdRepository coldLogRepository;
@Autowired
private ArchiveProperties properties;
/**
* 热数据→温数据归档
*/
@Transactional
public int archiveHotToWarm() {
LocalDateTime cutoffTime = LocalDateTime.now().minusDays(properties.getHotRetentionDays());
int totalArchived = 0;
while (true) {
List<TaskLog> hotLogs = hotLogRepository.findOldLogs(cutoffTime, properties.getBatchSize());
if (hotLogs.isEmpty()) {
break;
}
List<TaskLogWarm> warmLogs = hotLogs.stream()
.map(this::convertToWarm)
.collect(Collectors.toList());
warmLogRepository.saveAll(warmLogs);
hotLogRepository.deleteAll(hotLogs);
totalArchived += hotLogs.size();
log.debug("Archived {} hot records to warm", hotLogs.size());
}
return totalArchived;
}
/**
* 温数据→冷数据归档
*/
@Transactional
public int archiveWarmToCold() {
LocalDateTime cutoffTime = LocalDateTime.now().minusDays(properties.getWarmRetentionDays());
int totalArchived = 0;
while (true) {
List<TaskLogWarm> warmLogs = warmLogRepository.findOldLogs(cutoffTime, properties.getBatchSize());
if (warmLogs.isEmpty()) {
break;
}
List<TaskLogCold> coldLogs = warmLogs.stream()
.map(this::convertToCold)
.collect(Collectors.toList());
coldLogRepository.saveAll(coldLogs);
warmLogRepository.deleteAll(warmLogs);
totalArchived += warmLogs.size();
log.debug("Archived {} warm records to cold", warmLogs.size());
}
return totalArchived;
}
/**
* 清理过期冷数据
*/
@Transactional
public int cleanupExpiredColdData() {
LocalDateTime cutoffTime = LocalDateTime.now().minusDays(properties.getColdRetentionDays());
return coldLogRepository.deleteByCreatedAtBefore(cutoffTime);
}
private TaskLogWarm convertToWarm(TaskLog log) {
TaskLogWarm warm = new TaskLogWarm();
warm.setId(log.getId());
warm.setTaskId(log.getTaskId());
warm.setTaskName(log.getTaskName());
warm.setStatus(log.getStatus());
warm.setStartTime(log.getStartTime());
warm.setEndTime(log.getEndTime());
warm.setDurationMs(log.getDurationMs());
warm.setErrorMessage(log.getErrorMessage());
warm.setCreatedAt(log.getCreatedAt());
return warm;
}
private TaskLogCold convertToCold(TaskLogWarm warm) {
TaskLogCold cold = new TaskLogCold();
cold.setId(warm.getId());
cold.setTaskId(warm.getTaskId());
cold.setTaskName(warm.getTaskName());
cold.setStatus(warm.getStatus());
cold.setStartTime(warm.getStartTime());
cold.setEndTime(warm.getEndTime());
cold.setDurationMs(warm.getDurationMs());
cold.setErrorMessage(warm.getErrorMessage());
cold.setCreatedAt(warm.getCreatedAt());
return cold;
}
}
3.6 统一查询服务
@Service
@Slf4j
public class LogQueryService {
@Autowired
private TaskLogRepository hotLogRepository;
@Autowired
private TaskLogWarmRepository warmLogRepository;
@Autowired
private TaskLogColdRepository coldLogRepository;
/**
* 根据时间范围查询日志(自动选择合适的表)
*/
public List<TaskLogDTO> queryLogsByTimeRange(LocalDateTime startTime, LocalDateTime endTime) {
LocalDateTime now = LocalDateTime.now();
LocalDateTime hotCutoff = now.minusDays(7);
LocalDateTime warmCutoff = now.minusDays(30);
List<TaskLogDTO> results = new ArrayList<>();
// 查询热数据
if (endTime.isAfter(hotCutoff)) {
LocalDateTime queryStart = startTime.isBefore(hotCutoff) ? hotCutoff : startTime;
List<TaskLog> hotLogs = hotLogRepository.findByCreatedAtBetween(queryStart, endTime);
results.addAll(hotLogs.stream().map(this::convertToDTO).collect(Collectors.toList()));
}
// 查询温数据
if (startTime.isBefore(hotCutoff) && endTime.isAfter(warmCutoff)) {
LocalDateTime queryStart = startTime.isBefore(warmCutoff) ? warmCutoff : startTime;
LocalDateTime queryEnd = endTime.isAfter(hotCutoff) ? hotCutoff : endTime;
List<TaskLogWarm> warmLogs = warmLogRepository.findByCreatedAtBetween(queryStart, queryEnd);
results.addAll(warmLogs.stream().map(this::convertToDTO).collect(Collectors.toList()));
}
// 查询冷数据
if (startTime.isBefore(warmCutoff)) {
LocalDateTime queryEnd = endTime.isAfter(warmCutoff) ? warmCutoff : endTime;
List<TaskLogCold> coldLogs = coldLogRepository.findByCreatedAtBetween(startTime, queryEnd);
results.addAll(coldLogs.stream().map(this::convertToDTO).collect(Collectors.toList()));
}
// 按创建时间排序
results.sort(Comparator.comparing(TaskLogDTO::getCreatedAt));
return results;
}
private TaskLogDTO convertToDTO(TaskLog log) {
TaskLogDTO dto = new TaskLogDTO();
dto.setId(log.getId());
dto.setTaskId(log.getTaskId());
dto.setTaskName(log.getTaskName());
dto.setStatus(log.getStatus());
dto.setStartTime(log.getStartTime());
dto.setEndTime(log.getEndTime());
dto.setDurationMs(log.getDurationMs());
dto.setErrorMessage(log.getErrorMessage());
dto.setCreatedAt(log.getCreatedAt());
dto.setDataTier("HOT");
return dto;
}
private TaskLogDTO convertToDTO(TaskLogWarm warm) {
TaskLogDTO dto = new TaskLogDTO();
dto.setId(warm.getId());
dto.setTaskId(warm.getTaskId());
dto.setTaskName(warm.getTaskName());
dto.setStatus(warm.getStatus());
dto.setStartTime(warm.getStartTime());
dto.setEndTime(warm.getEndTime());
dto.setDurationMs(warm.getDurationMs());
dto.setErrorMessage(warm.getErrorMessage());
dto.setCreatedAt(warm.getCreatedAt());
dto.setDataTier("WARM");
return dto;
}
private TaskLogDTO convertToDTO(TaskLogCold cold) {
TaskLogDTO dto = new TaskLogDTO();
dto.setId(cold.getId());
dto.setTaskId(cold.getTaskId());
dto.setTaskName(cold.getTaskName());
dto.setStatus(cold.getStatus());
dto.setStartTime(cold.getStartTime());
dto.setEndTime(cold.getEndTime());
dto.setDurationMs(cold.getDurationMs());
dto.setErrorMessage(cold.getErrorMessage());
dto.setCreatedAt(cold.getCreatedAt());
dto.setDataTier("COLD");
return dto;
}
}
3.7 DTO 和 Repository
@Data
public class TaskLogDTO {
private Long id;
private String taskId;
private String taskName;
private String status;
private LocalDateTime startTime;
private LocalDateTime endTime;
private Long durationMs;
private String errorMessage;
private LocalDateTime createdAt;
private String dataTier;
}
@Repository
public interface TaskLogRepository extends JpaRepository<TaskLog, Long> {
List<TaskLog> findByCreatedAtBetween(LocalDateTime start, LocalDateTime end);
@Query("SELECT t FROM TaskLog t WHERE t.createdAt < :cutoffTime ORDER BY t.createdAt ASC")
List<TaskLog> findOldLogs(@Param("cutoffTime") LocalDateTime cutoffTime, Pageable pageable);
}
@Repository
public interface TaskLogWarmRepository extends JpaRepository<TaskLogWarm, Long> {
List<TaskLogWarm> findByCreatedAtBetween(LocalDateTime start, LocalDateTime end);
@Query("SELECT t FROM TaskLogWarm t WHERE t.createdAt < :cutoffTime ORDER BY t.createdAt ASC")
List<TaskLogWarm> findOldLogs(@Param("cutoffTime") LocalDateTime cutoffTime, Pageable pageable);
}
@Repository
public interface TaskLogColdRepository extends JpaRepository<TaskLogCold, Long> {
List<TaskLogCold> findByCreatedAtBetween(LocalDateTime start, LocalDateTime end);
int deleteByCreatedAtBefore(LocalDateTime cutoffTime);
}
四、配置文件示例
server:
port: 8080
spring:
application:
name: log-archive-demo
datasource:
url: jdbc:mysql://localhost:3306/example_db?useSSL=false&serverTimezone=UTC
username: admin
password: password
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto: update
show-sql: false
# 日志归档配置
log:
archive:
enabled: true
hot-retention-days: 7
warm-retention-days: 30
cold-retention-days: 90
batch-size: 1000
cron: "0 0 2 * * ?"
compress-cold-data: true
# 监控配置
management:
endpoints:
web:
exposure:
include: health, info, prometheus, metrics
metrics:
tags:
application: ${spring.application.name}
logging:
level:
com.example.archive: DEBUG
五、监控与告警
5.1 归档监控指标
@Component
public class ArchiveMetrics {
private final MeterRegistry meterRegistry;
public ArchiveMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
public void recordArchive(String sourceTable, String targetTable, int count) {
Counter.builder("log.archive.count")
.tag("source", sourceTable)
.tag("target", targetTable)
.register(meterRegistry)
.increment(count);
}
public void recordArchiveDuration(String operation, long durationMs) {
Timer.builder("log.archive.duration")
.tag("operation", operation)
.register(meterRegistry)
.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordCleanup(int count) {
Counter.builder("log.archive.cleanup.count")
.register(meterRegistry)
.increment(count);
}
public void recordArchiveFailure(String operation) {
Counter.builder("log.archive.failure.count")
.tag("operation", operation)
.register(meterRegistry)
.increment();
}
}
5.2 Prometheus 告警规则
groups:
- name: log_archive_alerts
rules:
- alert: LogArchiveFailure
expr: log_archive_failure_count > 0
for: 1m
labels:
severity: critical
annotations:
summary: "日志归档失败"
description: "归档操作 {{ $labels.operation }} 失败"
- alert: HotLogTableLarge
expr: sum(log_archive_count{source="task_log"}) - sum(log_archive_count{target="task_log_warm"}) > 1000000
for: 1h
labels:
severity: warning
annotations:
summary: "热数据表过大"
description: "热数据表记录数超过100万"
- alert: ArchiveDurationHigh
expr: log_archive_duration_seconds > 300
for: 1m
labels:
severity: warning
annotations:
summary: "归档耗时过长"
description: "归档操作耗时超过5分钟"
六、最佳实践建议
6.1 数据分层策略
| 分层 | 保留期限 | 存储建议 | 索引策略 |
|---|---|---|---|
| 热数据 | 7天 | SSD | 完整索引 |
| 温数据 | 30天 | SSD | 必要索引 |
| 冷数据 | 90天 | HDD/压缩 | 最少索引 |
| 归档数据 | 按需 | 对象存储 | 无索引 |
6.2 归档注意事项
- 选择低峰期执行:建议在凌晨2-4点执行归档任务
- 分批处理:避免单次归档过多数据导致锁表
- 事务控制:确保归档操作的原子性
- 监控告警:及时发现归档失败
- 数据验证:归档后验证数据完整性
6.3 性能优化技巧
- 使用批量操作:
saveAll()和deleteAll()批量处理 - 禁用二级缓存:归档期间禁用 Hibernate 二级缓存
- 调整连接池大小:确保有足够的数据库连接
- 读写分离:归档操作使用从库
- 分区表:对冷数据表使用分区
互动话题
您在生产环境中是如何管理历史日志数据的?是否遇到过日志表膨胀导致的性能问题?欢迎在评论区分享您的经验!
标题:任务执行日志自动归档:历史数据膨胀拖慢查询?定时清理 + 冷热数据分离
作者:jiangyi
地址:http://jiangyi.space/articles/2026/06/17/1781423877372.html
公众号:服务端技术精选
评论
0 评论