SpringBoot + 最终一致性 + 补偿任务看板:失败事务可视化,支持人工介入重试
背景:分布式事务的挑战
在微服务架构中,分布式事务是一个常见的挑战。传统的2PC(两阶段提交)方案虽然能保证强一致性,但性能开销大,不适合高并发场景。而最终一致性方案虽然性能更好,但如何确保事务最终达成一致,以及如何处理失败的事务,成为了新的挑战。
想象一下这些场景:
- 订单创建成功,但库存扣减失败
- 支付成功,但订单状态更新失败
- 消息发送成功,但消费者处理失败
- 跨服务调用时网络中断,部分操作成功部分失败
这些问题如果不及时处理,会导致系统数据不一致,影响业务正常运行。
核心概念:最终一致性 + 补偿任务看板
本文将介绍一种基于 SpringBoot 的最终一致性解决方案,通过以下核心机制确保分布式事务的最终一致性:
- 事务日志:记录每笔分布式事务的执行状态
- 补偿机制:自动或手动处理失败的事务
- 任务看板:可视化展示失败事务,支持人工介入
- 重试策略:智能的重试机制,避免无效重试
架构设计
系统架构
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 微服务 A │ │ 微服务 B │ │ 微服务 C │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────┐
│ 事务协调中心 │
└─────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌────────────────┐ ┌─────────────────┐
│ 事务日志 │ │ 补偿任务队列 │ │ 补偿任务看板 │
└───────────────┘ └────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ 人工介入界面 │
└─────────────────┘
关键组件
- 事务协调中心:协调分布式事务的执行,记录事务状态
- 事务日志:存储所有分布式事务的执行历史
- 补偿任务队列:存储需要补偿的失败事务
- 补偿任务看板:可视化展示失败事务,支持人工介入
- 人工介入界面:提供手动重试、跳过等操作
技术实现
1. 事务日志设计
事务日志表结构
CREATE TABLE `distributed_transaction` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`tx_id` VARCHAR(64) NOT NULL COMMENT '事务ID',
`business_type` VARCHAR(32) NOT NULL COMMENT '业务类型',
`status` VARCHAR(32) NOT NULL COMMENT '事务状态:INIT/PROCESSING/SUCCESS/FAILED',
`create_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`remark` VARCHAR(255) COMMENT '备注',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tx_id` (`tx_id`),
INDEX `idx_status` (`status`),
INDEX `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='分布式事务表';
CREATE TABLE `transaction_step` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`tx_id` VARCHAR(64) NOT NULL COMMENT '事务ID',
`step_name` VARCHAR(64) NOT NULL COMMENT '步骤名称',
`status` VARCHAR(32) NOT NULL COMMENT '步骤状态:PENDING/SUCCESS/FAILED',
`error_message` VARCHAR(500) COMMENT '错误信息',
`retry_count` INT NOT NULL DEFAULT 0 COMMENT '重试次数',
`create_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
INDEX `idx_tx_id` (`tx_id`),
INDEX `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='事务步骤表';
事务管理服务
@Service
@Slf4j
public class TransactionManager {
@Autowired
private DistributedTransactionRepository transactionRepository;
@Autowired
private TransactionStepRepository stepRepository;
@Autowired
private CompensateTaskService compensateTaskService;
public String beginTransaction(String businessType) {
String txId = UUID.randomUUID().toString();
DistributedTransaction transaction = new DistributedTransaction();
transaction.setTxId(txId);
transaction.setBusinessType(businessType);
transaction.setStatus("INIT");
transactionRepository.save(transaction);
log.info("Begin transaction: {}", txId);
return txId;
}
public void addStep(String txId, String stepName) {
TransactionStep step = new TransactionStep();
step.setTxId(txId);
step.setStepName(stepName);
step.setStatus("PENDING");
step.setRetryCount(0);
stepRepository.save(step);
log.info("Add step {} to transaction: {}", stepName, txId);
}
public void updateStepStatus(String txId, String stepName, String status, String errorMessage) {
TransactionStep step = stepRepository.findByTxIdAndStepName(txId, stepName);
if (step != null) {
step.setStatus(status);
step.setErrorMessage(errorMessage);
if ("FAILED".equals(status)) {
step.setRetryCount(step.getRetryCount() + 1);
}
stepRepository.save(step);
// 更新事务状态
updateTransactionStatus(txId);
}
}
private void updateTransactionStatus(String txId) {
List<TransactionStep> steps = stepRepository.findByTxId(txId);
boolean allSuccess = steps.stream().allMatch(step -> "SUCCESS".equals(step.getStatus()));
boolean hasFailed = steps.stream().anyMatch(step -> "FAILED".equals(step.getStatus()));
DistributedTransaction transaction = transactionRepository.findByTxId(txId);
if (allSuccess) {
transaction.setStatus("SUCCESS");
} else if (hasFailed) {
transaction.setStatus("FAILED");
// 创建补偿任务
compensateTaskService.createCompensateTask(txId);
} else {
transaction.setStatus("PROCESSING");
}
transactionRepository.save(transaction);
}
public DistributedTransaction getTransaction(String txId) {
return transactionRepository.findByTxId(txId);
}
public List<TransactionStep> getTransactionSteps(String txId) {
return stepRepository.findByTxId(txId);
}
}
2. 补偿任务设计
补偿任务表结构
CREATE TABLE `compensate_task` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`task_id` VARCHAR(64) NOT NULL COMMENT '任务ID',
`tx_id` VARCHAR(64) NOT NULL COMMENT '事务ID',
`status` VARCHAR(32) NOT NULL DEFAULT 'PENDING' COMMENT '任务状态:PENDING/IN_PROGRESS/SUCCESS/FAILED',
`retry_count` INT NOT NULL DEFAULT 0 COMMENT '重试次数',
`max_retry` INT NOT NULL DEFAULT 5 COMMENT '最大重试次数',
`next_retry_time` TIMESTAMP COMMENT '下次重试时间',
`last_error` VARCHAR(500) COMMENT '最后错误信息',
`create_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_task_id` (`task_id`),
INDEX `idx_tx_id` (`tx_id`),
INDEX `idx_status` (`status`),
INDEX `idx_next_retry_time` (`next_retry_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='补偿任务表';
补偿任务服务
@Service
@Slf4j
public class CompensateTaskService {
@Autowired
private CompensateTaskRepository taskRepository;
@Autowired
private TransactionManager transactionManager;
@Autowired
private RetryStrategy retryStrategy;
public void createCompensateTask(String txId) {
String taskId = UUID.randomUUID().toString();
CompensateTask task = new CompensateTask();
task.setTaskId(taskId);
task.setTxId(txId);
task.setStatus("PENDING");
task.setRetryCount(0);
task.setMaxRetry(5);
task.setNextRetryTime(new Date()); // 立即重试
taskRepository.save(task);
log.info("Created compensate task for transaction: {}", txId);
}
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void processCompensateTasks() {
List<CompensateTask> tasks = taskRepository.findByStatusAndNextRetryTimeBefore("PENDING", new Date());
for (CompensateTask task : tasks) {
try {
task.setStatus("IN_PROGRESS");
taskRepository.save(task);
boolean success = executeCompensateTask(task);
if (success) {
task.setStatus("SUCCESS");
log.info("Compensate task succeeded: {}", task.getTaskId());
} else {
handleCompensateFailure(task);
}
} catch (Exception e) {
handleCompensateFailure(task);
log.error("Error processing compensate task: {}", task.getTaskId(), e);
} finally {
taskRepository.save(task);
}
}
}
private boolean executeCompensateTask(CompensateTask task) {
String txId = task.getTxId();
DistributedTransaction transaction = transactionManager.getTransaction(txId);
List<TransactionStep> steps = transactionManager.getTransactionSteps(txId);
// 找出失败的步骤
List<TransactionStep> failedSteps = steps.stream()
.filter(step -> "FAILED".equals(step.getStatus()))
.collect(Collectors.toList());
// 重试失败的步骤
for (TransactionStep step : failedSteps) {
boolean stepSuccess = retryStep(step);
if (!stepSuccess) {
return false;
}
}
return true;
}
private boolean retryStep(TransactionStep step) {
// 根据步骤名称执行相应的补偿逻辑
// 这里需要根据具体业务实现
log.info("Retrying step {} for transaction: {}", step.getStepName(), step.getTxId());
// 模拟重试逻辑
return Math.random() > 0.3; // 70% 成功率
}
private void handleCompensateFailure(CompensateTask task) {
task.setRetryCount(task.getRetryCount() + 1);
if (task.getRetryCount() >= task.getMaxRetry()) {
task.setStatus("FAILED");
log.warn("Compensate task failed after max retries: {}", task.getTaskId());
} else {
task.setStatus("PENDING");
task.setNextRetryTime(retryStrategy.calculateNextRetryTime(task.getRetryCount()));
log.info("Compensate task will retry at: {}", task.getNextRetryTime());
}
}
public void manualRetry(String taskId) {
CompensateTask task = taskRepository.findByTaskId(taskId);
if (task != null) {
task.setStatus("PENDING");
task.setNextRetryTime(new Date());
taskRepository.save(task);
log.info("Manual retry triggered for compensate task: {}", taskId);
}
}
public void skipTask(String taskId) {
CompensateTask task = taskRepository.findByTaskId(taskId);
if (task != null) {
task.setStatus("SUCCESS"); // 标记为成功,跳过补偿
taskRepository.save(task);
log.info("Compensate task skipped: {}", taskId);
}
}
}
3. 重试策略
@Service
public class RetryStrategy {
// 指数退避策略
public Date calculateNextRetryTime(int retryCount) {
long delayMillis = (long) (Math.pow(2, retryCount) * 1000 * 60); // 指数退避,单位分钟
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MILLISECOND, (int) delayMillis);
return calendar.getTime();
}
// 固定间隔策略
public Date calculateFixedDelayTime(int retryCount, long fixedDelayMinutes) {
long delayMillis = fixedDelayMinutes * 1000 * 60;
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MILLISECOND, (int) delayMillis);
return calendar.getTime();
}
// 随机退避策略
public Date calculateRandomDelayTime(int retryCount, long minDelayMinutes, long maxDelayMinutes) {
long delayMillis = (long) (minDelayMinutes + Math.random() * (maxDelayMinutes - minDelayMinutes)) * 1000 * 60;
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MILLISECOND, (int) delayMillis);
return calendar.getTime();
}
}
4. 补偿任务看板
看板服务
@Service
@Slf4j
public class DashboardService {
@Autowired
private CompensateTaskRepository taskRepository;
@Autowired
private DistributedTransactionRepository transactionRepository;
@Autowired
private TransactionStepRepository stepRepository;
public DashboardData getDashboardData() {
DashboardData data = new DashboardData();
// 统计数据
long totalTasks = taskRepository.count();
long pendingTasks = taskRepository.countByStatus("PENDING");
long inProgressTasks = taskRepository.countByStatus("IN_PROGRESS");
long successTasks = taskRepository.countByStatus("SUCCESS");
long failedTasks = taskRepository.countByStatus("FAILED");
data.setTotalTasks(totalTasks);
data.setPendingTasks(pendingTasks);
data.setInProgressTasks(inProgressTasks);
data.setSuccessTasks(successTasks);
data.setFailedTasks(failedTasks);
// 最近失败的任务
List<CompensateTask> recentFailedTasks = taskRepository.findTop10ByStatusOrderByCreateTimeDesc("FAILED");
data.setRecentFailedTasks(recentFailedTasks);
// 待处理任务
List<CompensateTask> pendingTaskList = taskRepository.findTop20ByStatusOrderByCreateTimeAsc("PENDING");
data.setPendingTasksList(pendingTaskList);
return data;
}
public CompensateTaskDetail getTaskDetail(String taskId) {
CompensateTask task = taskRepository.findByTaskId(taskId);
if (task == null) {
return null;
}
CompensateTaskDetail detail = new CompensateTaskDetail();
detail.setTask(task);
// 获取事务信息
DistributedTransaction transaction = transactionRepository.findByTxId(task.getTxId());
detail.setTransaction(transaction);
// 获取事务步骤
List<TransactionStep> steps = stepRepository.findByTxId(task.getTxId());
detail.setSteps(steps);
return detail;
}
public List<CompensateTask> getTasksByStatus(String status, int page, int size) {
return taskRepository.findByStatusOrderByCreateTimeDesc(status, PageRequest.of(page, size));
}
public List<CompensateTask> searchTasks(String keyword, int page, int size) {
return taskRepository.findByTxIdContainingOrLastErrorContaining(keyword, keyword, PageRequest.of(page, size));
}
}
5. 控制器
@RestController
@RequestMapping("/api/compensate")
@Slf4j
public class CompensateController {
@Autowired
private DashboardService dashboardService;
@Autowired
private CompensateTaskService compensateTaskService;
@GetMapping("/dashboard")
public Result<DashboardData> getDashboard() {
DashboardData data = dashboardService.getDashboard();
return Result.success(data);
}
@GetMapping("/task/{taskId}")
public Result<CompensateTaskDetail> getTaskDetail(@PathVariable String taskId) {
CompensateTaskDetail detail = dashboardService.getTaskDetail(taskId);
return Result.success(detail);
}
@GetMapping("/tasks")
public Result<List<CompensateTask>> getTasks(@RequestParam String status,
@RequestParam(defaultValue = "1") int page,
@RequestParam(defaultValue = "20") int size) {
List<CompensateTask> tasks = dashboardService.getTasksByStatus(status, page - 1, size);
return Result.success(tasks);
}
@GetMapping("/search")
public Result<List<CompensateTask>> searchTasks(@RequestParam String keyword,
@RequestParam(defaultValue = "1") int page,
@RequestParam(defaultValue = "20") int size) {
List<CompensateTask> tasks = dashboardService.searchTasks(keyword, page - 1, size);
return Result.success(tasks);
}
@PostMapping("/task/{taskId}/retry")
public Result<String> retryTask(@PathVariable String taskId) {
compensateTaskService.manualRetry(taskId);
return Result.success("重试任务已触发");
}
@PostMapping("/task/{taskId}/skip")
public Result<String> skipTask(@PathVariable String taskId) {
compensateTaskService.skipTask(taskId);
return Result.success("任务已跳过");
}
}
核心流程
1. 事务开始
当需要执行分布式事务时,通过 TransactionManager 开始一个新的事务,生成唯一的事务ID。
2. 步骤添加
在事务执行过程中,添加各个步骤,如库存扣减、支付处理、订单状态更新等。
3. 步骤执行
执行各个步骤,并更新步骤状态。如果步骤执行失败,记录错误信息。
4. 事务状态更新
根据步骤执行结果,更新事务状态。如果有步骤失败,事务状态变为 FAILED。
5. 补偿任务创建
当事务失败时,自动创建补偿任务。
6. 补偿任务处理
补偿任务服务定期处理待执行的补偿任务,根据重试策略进行重试。
7. 人工介入
对于重试失败的任务,通过补偿任务看板进行人工介入,支持手动重试或跳过。
8. 监控告警
对长时间失败的任务进行监控和告警。
技术要点
1. 最终一致性保障
- 异步补偿:采用异步补偿机制,不阻塞主流程
- 重试策略:智能的重试策略,避免无效重试
- 人工介入:对于复杂问题,支持人工介入处理
- 状态追踪:详细的状态追踪,便于问题定位
2. 性能优化
- 批量处理:批量处理补偿任务,提高处理效率
- 异步执行:补偿任务异步执行,不影响主业务
- 索引优化:为关键查询字段创建索引
- 缓存策略:合理使用缓存减少数据库查询
3. 可靠性设计
- 幂等性设计:确保补偿操作可重复执行
- 分布式锁:防止并发补偿导致的数据冲突
- 事务管理:保证补偿操作的原子性
- 数据备份:定期备份事务日志和补偿任务数据
4. 可扩展性
- 模块化设计:各组件职责明确,易于扩展
- 插件化:支持自定义补偿逻辑和重试策略
- 配置化:通过配置文件调整重试策略和告警规则
- 监控指标:提供丰富的监控指标和日志
最佳实践
1. 事务设计
- 细粒度事务:将大事务拆分为小的、独立的步骤
- 幂等设计:确保每个步骤都是幂等的
- 超时处理:为每个步骤设置合理的超时时间
- 错误处理:详细记录错误信息,便于问题定位
2. 补偿策略
- 自动补偿:对于简单的失败,自动进行补偿
- 人工介入:对于复杂的失败,及时通知人工介入
- 重试策略:根据业务特点选择合适的重试策略
- 补偿顺序:按照依赖关系顺序执行补偿
3. 监控告警
- 多渠道告警:支持邮件、短信、微信等多种告警方式
- 分级告警:根据任务重要程度进行分级告警
- 告警聚合:对同类告警进行聚合,避免告警风暴
- 告警抑制:在特定情况下暂时抑制告警
4. 运维管理
- 定期清理:定期清理过期的补偿任务和事务日志
- 数据归档:对历史数据进行归档存储
- 备份恢复:定期备份数据,确保可恢复性
- 性能监控:监控补偿任务处理性能
常见问题与解决方案
1. 补偿任务执行失败
问题:补偿任务执行失败,无法自动修复
原因:
- 业务逻辑错误
- 外部系统异常
- 数据约束冲突
解决方案:
- 检查错误信息,定位问题原因
- 修复业务逻辑或外部系统问题
- 手动重试或跳过任务
2. 补偿任务堆积
问题:补偿任务大量堆积,处理不过来
原因:
- 系统故障导致大量任务失败
- 补偿处理速度慢
- 重试策略不合理
解决方案:
- 增加补偿任务处理线程
- 优化补偿逻辑,提高处理速度
- 调整重试策略,避免无效重试
- 对任务进行优先级排序
3. 数据一致性问题
问题:补偿后数据仍然不一致
原因:
- 补偿逻辑错误
- 并发操作导致数据冲突
- 外部系统状态不一致
解决方案:
- 检查补偿逻辑,确保正确性
- 使用分布式锁防止并发冲突
- 与外部系统进行对账,确保状态一致
4. 系统性能下降
问题:补偿任务处理导致系统性能下降
原因:
- 补偿任务并发度过高
- 数据库查询效率低
- 外部系统调用超时
解决方案:
- 控制补偿任务的并发度
- 优化数据库查询,添加索引
- 设置合理的外部系统调用超时时间
- 采用批量处理减少数据库交互
互动话题
- 你在实际项目中遇到过哪些分布式事务的挑战?
- 你认为最终一致性方案还有哪些可以改进的地方?
- 对于本文介绍的补偿任务看板,你有什么功能建议?
欢迎在评论区分享你的经验和想法!
作者:服务端技术精选
公众号:服务端技术精选
标题:SpringBoot + 最终一致性 + 补偿任务看板:失败事务可视化,支持人工介入重试
作者:jiangyi
地址:http://jiangyi.space/articles/2026/03/13/1772981078829.html
公众号:服务端技术精选
- 背景:分布式事务的挑战
- 核心概念:最终一致性 + 补偿任务看板
- 架构设计
- 系统架构
- 关键组件
- 技术实现
- 1. 事务日志设计
- 事务日志表结构
- 事务管理服务
- 2. 补偿任务设计
- 补偿任务表结构
- 补偿任务服务
- 3. 重试策略
- 4. 补偿任务看板
- 看板服务
- 5. 控制器
- 核心流程
- 1. 事务开始
- 2. 步骤添加
- 3. 步骤执行
- 4. 事务状态更新
- 5. 补偿任务创建
- 6. 补偿任务处理
- 7. 人工介入
- 8. 监控告警
- 技术要点
- 1. 最终一致性保障
- 2. 性能优化
- 3. 可靠性设计
- 4. 可扩展性
- 最佳实践
- 1. 事务设计
- 2. 补偿策略
- 3. 监控告警
- 4. 运维管理
- 常见问题与解决方案
- 1. 补偿任务执行失败
- 2. 补偿任务堆积
- 3. 数据一致性问题
- 4. 系统性能下降
- 互动话题
评论
0 评论