SpringBoot + 本地事务表 + 定时扫描补偿:轻量级方案实现最终一致性,无中间件依赖
前言
在分布式系统中,数据一致性是一个永恒的话题。传统的分布式事务解决方案如 Seata、XA 等往往需要引入重量级中间件,增加了系统复杂度和运维成本。
本文将介绍一种轻量级的最终一致性方案——本地事务表 + 定时扫描补偿,该方案:
- 零中间件依赖:不需要 MQ、Seata 等外部组件
- 实现简单:基于数据库表和定时任务
- 可靠性高:通过本地事务保证数据一致性
- 易于理解:符合直觉的设计模式
一、分布式事务问题分析
1. 典型业务场景
┌─────────────────────────────────────────────────────────────┐
│ 订单支付业务流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 用户下单 ──▶ 创建订单 ──▶ 扣减库存 ──▶ 扣减余额 ──▶ 发送通知 │
│ │
│ 问题: │
│ 1. 订单创建成功,库存扣减失败怎么办? │
│ 2. 库存扣减成功,余额扣减失败怎么办? │
│ 3. 余额扣减成功,通知发送失败怎么办? │
│ 4. 网络超时导致状态不一致怎么办? │
│ │
└─────────────────────────────────────────────────────────────┘
2. 传统解决方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 2PC/XA | 强一致性 | 性能差、锁资源 | 传统单体应用 |
| Seata AT | 无侵入 | 依赖中间件、运维复杂 | 大型微服务 |
| Seata TCC | 高性能 | 代码侵入性强 | 高并发场景 |
| Saga | 长事务支持 | 实现复杂 | 复杂业务流程 |
| 本地事务表 | 轻量简单 | 最终一致性 | 中小型系统 |
3. 本地事务表方案原理
┌─────────────────────────────────────────────────────────────┐
│ 本地事务表方案核心思想 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 核心原理: │
│ 1. 业务操作和事务记录在同一本地事务中完成 │
│ 2. 定时任务扫描待处理的事务记录 │
│ 3. 执行补偿操作或重试 │
│ 4. 保证最终一致性 │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 本地事务(ACID) │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 业务数据表 │ │ 事务记录表 │ │ │
│ │ │ INSERT/UPDATE│ │ INSERT │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ │ │ │ │ │
│ │ └────────┬─────────┘ │ │
│ │ ▼ │ │
│ │ 同时提交或回滚 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
二、方案设计
1. 整体架构
┌─────────────────────────────────────────────────────────────┐
│ 本地事务表方案架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 业务服务层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 订单服务 │ │ 库存服务 │ │ 支付服务 │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ │ │ │ │ │
│ │ └────────────┼────────────┘ │ │
│ │ ▼ │ │
│ │ ┌──────────────┐ │ │
│ │ │ 事务管理器 │ │ │
│ │ └──────┬───────┘ │ │
│ └───────────────────┼───────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 数据存储层 │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ 业务数据表 │ │ 事务记录表 │ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ ▲ │
│ ┌───────────────────┴───────────────────────────────┐ │
│ │ 定时任务层 │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ 事务扫描器 │ │ 补偿执行器 │ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
2. 数据库设计
2.1 事务记录表
CREATE TABLE transaction_record (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
transaction_id VARCHAR(64) NOT NULL UNIQUE COMMENT '事务ID',
transaction_type VARCHAR(32) NOT NULL COMMENT '事务类型',
business_id VARCHAR(64) NOT NULL COMMENT '业务ID',
business_data TEXT COMMENT '业务数据(JSON)',
status VARCHAR(16) NOT NULL DEFAULT 'PENDING' COMMENT '状态',
retry_count INT NOT NULL DEFAULT 0 COMMENT '重试次数',
max_retry INT NOT NULL DEFAULT 5 COMMENT '最大重试次数',
next_retry_time DATETIME COMMENT '下次重试时间',
error_message TEXT COMMENT '错误信息',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status (status),
INDEX idx_next_retry (next_retry_time),
INDEX idx_business (business_id)
) COMMENT '事务记录表';
2.2 状态流转
┌─────────────────────────────────────────────────────────────┐
│ 事务状态流转图 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────┐ │
│ │ PENDING │ │
│ │ 待处理 │ │
│ └─────┬─────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ PROCESSING│ │ SUCCESS │ │ FAILED │ │
│ │ 处理中 │ │ 成功 │ │ 失败 │ │
│ └─────┬─────┘ └───────────┘ └───────────┘ │
│ │ │
│ │ │
│ ▼ │
│ ┌───────────┐ │
│ │ SUCCESS │ │
│ │ 成功 │ │
│ └───────────┘ │
│ │
│ 状态说明: │
│ - PENDING: 待处理,等待定时任务扫描 │
│ - PROCESSING: 处理中,正在执行补偿操作 │
│ - SUCCESS: 成功,补偿操作执行成功 │
│ - FAILED: 失败,达到最大重试次数仍失败 │
│ │
└─────────────────────────────────────────────────────────────┘
3. 核心流程
3.1 事务提交流程
┌─────────────────────────────────────────────────────────────┐
│ 事务提交流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 业务操作开始 │
│ ┌─────────────────────────────────────────────┐ │
│ │ @Transactional │ │
│ │ public void createOrder(Order order) { │ │
│ │ // 开始事务 │ │
│ │ } │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ │
│ 2. 执行业务操作 │
│ ┌─────────────────────────────────────────────┐ │
│ │ // 1. 创建订单 │ │
│ │ orderRepository.save(order); │ │
│ │ │ │
│ │ // 2. 扣减库存(远程调用) │ │
│ │ inventoryService.deductStock(...); │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ │
│ 3. 记录事务 │
│ ┌─────────────────────────────────────────────┐ │
│ │ // 记录需要补偿的事务 │ │
│ │ TransactionRecord record = new Record(); │ │
│ │ record.setTransactionId(uuid); │ │
│ │ record.setTransactionType("DEDUCT_STOCK"); │ │
│ │ record.setBusinessId(order.getId()); │ │
│ │ record.setStatus("PENDING"); │ │
│ │ transactionRepository.save(record); │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ │
│ 4. 提交事务 │
│ ┌─────────────────────────────────────────────┐ │
│ │ // 业务数据和事务记录同时提交 │ │
│ │ // 要么全部成功,要么全部失败 │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
3.2 定时扫描补偿流程
┌─────────────────────────────────────────────────────────────┐
│ 定时扫描补偿流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 定时任务触发 │
│ ┌─────────────────────────────────────────────┐ │
│ │ @Scheduled(fixedRate = 60000) │ │
│ │ public void scanPendingTransactions() { │ │
│ │ // 每分钟扫描一次 │ │
│ │ } │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ │
│ 2. 查询待处理事务 │
│ ┌─────────────────────────────────────────────┐ │
│ │ SELECT * FROM transaction_record │ │
│ │ WHERE status = 'PENDING' │ │
│ │ AND next_retry_time <= NOW() │ │
│ │ AND retry_count < max_retry │ │
│ │ LIMIT 100 │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ │
│ 3. 执行补偿操作 │
│ ┌─────────────────────────────────────────────┐ │
│ │ for (TransactionRecord record : records) { │ │
│ │ try { │ │
│ │ // 执行补偿逻辑 │ │
│ │ compensate(record); │ │
│ │ record.setStatus("SUCCESS"); │ │
│ │ } catch (Exception e) { │ │
│ │ record.setRetryCount(count + 1); │ │
│ │ record.setErrorMessage(e.getMessage);│ │
│ │ } │ │
│ │ } │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ │
│ 4. 更新事务状态 │
│ ┌─────────────────────────────────────────────┐ │
│ │ transactionRepository.saveAll(records); │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
三、代码实现
1. 项目结构
SpringBoot-LocalTransaction-Demo/
├── src/
│ └── main/
│ ├── java/
│ │ └── com/
│ │ └── example/
│ │ └── transaction/
│ │ ├── LocalTransactionApplication.java
│ │ ├── entity/
│ │ │ ├── TransactionRecord.java
│ │ │ └── Order.java
│ │ ├── repository/
│ │ │ ├── TransactionRecordRepository.java
│ │ │ └── OrderRepository.java
│ │ ├── service/
│ │ │ ├── TransactionService.java
│ │ │ ├── CompensationService.java
│ │ │ ├── OrderService.java
│ │ │ └── InventoryService.java
│ │ ├── task/
│ │ │ └── TransactionScanTask.java
│ │ ├── controller/
│ │ │ └── OrderController.java
│ │ ├── dto/
│ │ │ └── ApiResponse.java
│ │ └── enums/
│ │ └── TransactionStatus.java
│ └── resources/
│ └── application.yml
├── pom.xml
└── README.md
2. 核心代码实现
2.1 事务记录实体
@Entity
@Table(name = "transaction_record")
@Data
public class TransactionRecord {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(unique = true, nullable = false, length = 64)
private String transactionId;
@Column(nullable = false, length = 32)
private String transactionType;
@Column(nullable = false, length = 64)
private String businessId;
@Column(columnDefinition = "TEXT")
private String businessData;
@Column(nullable = false, length = 16)
@Enumerated(EnumType.STRING)
private TransactionStatus status = TransactionStatus.PENDING;
@Column(nullable = false)
private Integer retryCount = 0;
@Column(nullable = false)
private Integer maxRetry = 5;
private LocalDateTime nextRetryTime;
@Column(columnDefinition = "TEXT")
private String errorMessage;
@Column(nullable = false)
private LocalDateTime createTime;
@Column(nullable = false)
private LocalDateTime updateTime;
@PrePersist
public void prePersist() {
this.createTime = LocalDateTime.now();
this.updateTime = LocalDateTime.now();
if (this.nextRetryTime == null) {
this.nextRetryTime = LocalDateTime.now().plusMinutes(1);
}
}
@PreUpdate
public void preUpdate() {
this.updateTime = LocalDateTime.now();
}
public boolean canRetry() {
return retryCount < maxRetry && status == TransactionStatus.PENDING;
}
public void incrementRetry() {
this.retryCount++;
this.nextRetryTime = LocalDateTime.now().plusMinutes(
Math.min(retryCount, 10)
);
}
}
2.2 事务状态枚举
public enum TransactionStatus {
PENDING("待处理"),
PROCESSING("处理中"),
SUCCESS("成功"),
FAILED("失败");
private final String description;
TransactionStatus(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
2.3 事务服务
@Service
@Slf4j
public class TransactionService {
@Autowired
private TransactionRecordRepository transactionRepository;
public TransactionRecord createTransaction(String type, String businessId,
Object businessData) {
TransactionRecord record = new TransactionRecord();
record.setTransactionId(UUID.randomUUID().toString());
record.setTransactionType(type);
record.setBusinessId(businessId);
record.setBusinessData(toJson(businessData));
record.setStatus(TransactionStatus.PENDING);
return transactionRepository.save(record);
}
public void markSuccess(String transactionId) {
transactionRepository.findByTransactionId(transactionId)
.ifPresent(record -> {
record.setStatus(TransactionStatus.SUCCESS);
transactionRepository.save(record);
log.info("事务执行成功: transactionId={}", transactionId);
});
}
public void markFailed(String transactionId, String errorMessage) {
transactionRepository.findByTransactionId(transactionId)
.ifPresent(record -> {
record.setStatus(TransactionStatus.FAILED);
record.setErrorMessage(errorMessage);
transactionRepository.save(record);
log.error("事务执行失败: transactionId={}, error={}",
transactionId, errorMessage);
});
}
public void incrementRetry(String transactionId, String errorMessage) {
transactionRepository.findByTransactionId(transactionId)
.ifPresent(record -> {
record.incrementRetry();
record.setErrorMessage(errorMessage);
if (!record.canRetry()) {
record.setStatus(TransactionStatus.FAILED);
log.error("事务达到最大重试次数: transactionId={}", transactionId);
}
transactionRepository.save(record);
});
}
public List<TransactionRecord> findPendingTransactions(int limit) {
return transactionRepository.findPendingTransactions(
TransactionStatus.PENDING,
LocalDateTime.now(),
PageRequest.of(0, limit)
);
}
private String toJson(Object obj) {
try {
return new ObjectMapper().writeValueAsString(obj);
} catch (Exception e) {
return "{}";
}
}
}
2.4 补偿服务
@Service
@Slf4j
public class CompensationService {
@Autowired
private TransactionService transactionService;
@Autowired
private InventoryService inventoryService;
@Autowired
private NotificationService notificationService;
public void compensate(TransactionRecord record) {
log.info("开始执行补偿: transactionId={}, type={}",
record.getTransactionId(), record.getTransactionType());
try {
switch (record.getTransactionType()) {
case "DEDUCT_STOCK":
compensateDeductStock(record);
break;
case "SEND_NOTIFICATION":
compensateSendNotification(record);
break;
case "REFUND":
compensateRefund(record);
break;
default:
log.warn("未知的事务类型: {}", record.getTransactionType());
}
} catch (Exception e) {
log.error("补偿执行失败: transactionId={}",
record.getTransactionId(), e);
throw e;
}
}
private void compensateDeductStock(TransactionRecord record) {
// 解析业务数据
Map<String, Object> data = parseBusinessData(record.getBusinessData());
String productId = (String) data.get("productId");
Integer quantity = (Integer) data.get("quantity");
// 执行库存扣减
inventoryService.deductStock(productId, quantity);
// 标记成功
transactionService.markSuccess(record.getTransactionId());
}
private void compensateSendNotification(TransactionRecord record) {
Map<String, Object> data = parseBusinessData(record.getBusinessData());
Long userId = Long.valueOf(data.get("userId").toString());
String message = (String) data.get("message");
notificationService.sendNotification(userId, message);
transactionService.markSuccess(record.getTransactionId());
}
private void compensateRefund(TransactionRecord record) {
Map<String, Object> data = parseBusinessData(record.getBusinessData());
String orderId = (String) data.get("orderId");
BigDecimal amount = new BigDecimal(data.get("amount").toString());
// 执行退款逻辑
log.info("执行退款: orderId={}, amount={}", orderId, amount);
transactionService.markSuccess(record.getTransactionId());
}
@SuppressWarnings("unchecked")
private Map<String, Object> parseBusinessData(String json) {
try {
return new ObjectMapper().readValue(json, Map.class);
} catch (Exception e) {
return new HashMap<>();
}
}
}
2.5 定时扫描任务
@Component
@Slf4j
public class TransactionScanTask {
@Autowired
private TransactionService transactionService;
@Autowired
private CompensationService compensationService;
@Autowired
private TransactionRecordRepository transactionRepository;
private static final int BATCH_SIZE = 100;
@Scheduled(fixedRate = 60000)
public void scanAndCompensate() {
log.info("开始扫描待处理事务...");
List<TransactionRecord> pendingTransactions =
transactionService.findPendingTransactions(BATCH_SIZE);
if (pendingTransactions.isEmpty()) {
log.info("没有待处理的事务");
return;
}
log.info("发现 {} 个待处理事务", pendingTransactions.size());
for (TransactionRecord record : pendingTransactions) {
processTransaction(record);
}
log.info("事务扫描处理完成");
}
private void processTransaction(TransactionRecord record) {
try {
// 更新状态为处理中
record.setStatus(TransactionStatus.PROCESSING);
transactionRepository.save(record);
// 执行补偿
compensationService.compensate(record);
} catch (Exception e) {
log.error("事务处理失败: transactionId={}",
record.getTransactionId(), e);
// 增加重试次数
transactionService.incrementRetry(
record.getTransactionId(),
e.getMessage()
);
}
}
@Scheduled(cron = "0 0 2 * * ?")
public void cleanupOldRecords() {
log.info("开始清理历史事务记录...");
LocalDateTime threshold = LocalDateTime.now().minusDays(30);
int deleted = transactionRepository.deleteByCreateTimeBefore(threshold);
log.info("清理完成,删除 {} 条记录", deleted);
}
}
3. 业务服务示例
@Service
@Slf4j
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private TransactionService transactionService;
@Autowired
private InventoryService inventoryService;
@Transactional
public Order createOrder(OrderRequest request) {
log.info("创建订单: userId={}, productId={}",
request.getUserId(), request.getProductId());
// 1. 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
order.setAmount(request.getAmount());
order.setStatus("CREATED");
orderRepository.save(order);
// 2. 记录库存扣减事务(在同一事务中)
Map<String, Object> stockData = new HashMap<>();
stockData.put("productId", request.getProductId());
stockData.put("quantity", request.getQuantity());
stockData.put("orderId", order.getId());
transactionService.createTransaction(
"DEDUCT_STOCK",
order.getId().toString(),
stockData
);
// 3. 记录通知发送事务
Map<String, Object> notifyData = new HashMap<>();
notifyData.put("userId", request.getUserId());
notifyData.put("message", "订单创建成功");
transactionService.createTransaction(
"SEND_NOTIFICATION",
order.getId().toString(),
notifyData
);
log.info("订单创建成功: orderId={}", order.getId());
return order;
}
@Transactional
public void cancelOrder(Long orderId) {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new RuntimeException("订单不存在"));
order.setStatus("CANCELLED");
orderRepository.save(order);
// 记录退款事务
Map<String, Object> refundData = new HashMap<>();
refundData.put("orderId", orderId.toString());
refundData.put("amount", order.getAmount());
transactionService.createTransaction(
"REFUND",
orderId.toString(),
refundData
);
log.info("订单取消成功: orderId={}", orderId);
}
}
四、高级特性
1. 重试策略
public class RetryStrategy {
public static LocalDateTime calculateNextRetryTime(int retryCount) {
// 指数退避策略
int delayMinutes = (int) Math.min(Math.pow(2, retryCount), 60);
return LocalDateTime.now().plusMinutes(delayMinutes);
}
public static int calculateMaxRetry(String transactionType) {
switch (transactionType) {
case "DEDUCT_STOCK":
return 10;
case "SEND_NOTIFICATION":
return 3;
case "REFUND":
return 5;
default:
return 5;
}
}
}
2. 幂等性保证
@Service
public class DynamicRetryConfig {
@Value("${retry.max.default:5}")
private int defaultMaxRetry;
@Value("${retry.max.payment:10}")
private int paymentMaxRetry;
@Value("${retry.max.notification:3}")
private int notificationMaxRetry;
public int getMaxRetry(String transactionType) {
switch (transactionType) {
case "REFUND":
case "DEDUCT_STOCK":
return paymentMaxRetry;
case "SEND_NOTIFICATION":
return notificationMaxRetry;
default:
return defaultMaxRetry;
}
}
}
2. 幂等性保证
@Service
public class IdempotentCompensationService {
@Autowired
private ProcessedRecordRepository processedRepository;
public void compensateWithIdempotent(TransactionRecord record) {
String idempotentKey = record.getTransactionId();
// 检查是否已处理
if (processedRepository.existsByIdempotentKey(idempotentKey)) {
log.info("事务已处理,跳过: transactionId={}", idempotentKey);
return;
}
try {
// 执行补偿
doCompensate(record);
// 记录已处理
ProcessedRecord processed = new ProcessedRecord();
processed.setIdempotentKey(idempotentKey);
processed.setProcessTime(LocalDateTime.now());
processedRepository.save(processed);
} catch (Exception e) {
throw e;
}
}
}
3. 分布式锁
@Component
public class DistributedLockManager {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public boolean tryLock(String key, Duration timeout) {
return Boolean.TRUE.equals(
redisTemplate.opsForValue()
.setIfAbsent(key, "locked", timeout)
);
}
public void unlock(String key) {
redisTemplate.delete(key);
}
}
@Service
public class SafeCompensationService {
@Autowired
private DistributedLockManager lockManager;
public void safeCompensate(TransactionRecord record) {
String lockKey = "lock:transaction:" + record.getTransactionId();
if (lockManager.tryLock(lockKey, Duration.ofMinutes(5))) {
try {
compensate(record);
} finally {
lockManager.unlock(lockKey);
}
} else {
log.warn("获取锁失败,跳过处理: transactionId={}",
record.getTransactionId());
}
}
}
五、最佳实践
1. 事务类型设计
| 事务类型 | 说明 | 最大重试 | 重试间隔 |
|---|---|---|---|
| DEDUCT_STOCK | 库存扣减 | 10 | 指数退避 |
| SEND_NOTIFICATION | 发送通知 | 3 | 固定 1 分钟 |
| REFUND | 退款 | 10 | 指数退避 |
| SYNC_DATA | 数据同步 | 5 | 固定 5 分钟 |
2. 监控告警
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| 待处理数量 | PENDING 状态的事务数 | > 100 |
| 失败数量 | FAILED 状态的事务数 | > 10 |
| 平均重试次数 | 平均重试次数 | > 3 |
| 最长等待时间 | 最早 PENDING 事务的等待时间 | > 1 小时 |
3. 性能优化
- 批量处理:每次扫描处理多条记录
- 异步执行:补偿操作异步执行
- 分表分库:事务记录表按时间分表
- 索引优化:为查询字段添加索引
六、常见问题
Q1: 如何保证事务记录和业务数据的一致性?
A: 通过本地事务保证:
- 业务数据和事务记录在同一事务中
- 要么全部成功,要么全部失败
- 利用数据库的 ACID 特性
Q2: 定时任务扫描会不会漏掉事务?
A: 不会:
- 扫描频率合理设置(如每分钟)
- 事务记录持久化存储
- 支持手动触发补偿
Q3: 如何处理补偿失败的情况?
A: 多重保障:
- 重试机制:自动重试直到成功
- 告警通知:失败时发送告警
- 人工介入:提供手动处理接口
Q4: 多实例部署时如何避免重复处理?
A: 使用分布式锁:
- 处理前获取锁
- 处理完成后释放锁
- 锁超时自动释放
七、方案对比
| 特性 | 本地事务表 | 消息队列 | Seata |
|---|---|---|---|
| 中间件依赖 | 无 | MQ | Seata Server |
| 实现复杂度 | 低 | 中 | 高 |
| 一致性保证 | 最终一致 | 最终一致 | 强一致/最终一致 |
| 性能影响 | 低 | 中 | 高 |
| 运维成本 | 低 | 中 | 高 |
| 适用场景 | 中小型系统 | 大型系统 | 大型系统 |
八、总结
本地事务表 + 定时扫描补偿方案是一种轻量级的分布式事务解决方案:
优势:
- 零依赖:不需要引入 MQ、Seata 等中间件
- 实现简单:基于数据库表和定时任务
- 可靠性高:通过本地事务保证一致性
- 易于维护:代码简单,问题排查容易
适用场景:
- 中小型微服务系统
- 对实时性要求不高的业务
- 不想引入复杂中间件的场景
- 快速迭代的创业项目
注意事项:
- 只能保证最终一致性
- 需要合理设计补偿逻辑
- 需要考虑幂等性
- 需要监控和告警机制
更多技术文章,欢迎关注公众号"服务端技术精选",及时获取最新动态。
标题:SpringBoot + 本地事务表 + 定时扫描补偿:轻量级方案实现最终一致性,无中间件依赖
作者:jiangyi
地址:http://jiangyi.space/articles/2026/03/27/1774259755227.html
公众号:服务端技术精选
- 前言
- 一、分布式事务问题分析
- 1. 典型业务场景
- 2. 传统解决方案对比
- 3. 本地事务表方案原理
- 二、方案设计
- 1. 整体架构
- 2. 数据库设计
- 2.1 事务记录表
- 2.2 状态流转
- 3. 核心流程
- 3.1 事务提交流程
- 3.2 定时扫描补偿流程
- 三、代码实现
- 1. 项目结构
- 2. 核心代码实现
- 2.1 事务记录实体
- 2.2 事务状态枚举
- 2.3 事务服务
- 2.4 补偿服务
- 2.5 定时扫描任务
- 3. 业务服务示例
- 四、高级特性
- 1. 重试策略
- 2. 幂等性保证
- 2. 幂等性保证
- 3. 分布式锁
- 五、最佳实践
- 1. 事务类型设计
- 2. 监控告警
- 3. 性能优化
- 六、常见问题
- Q1: 如何保证事务记录和业务数据的一致性?
- Q2: 定时任务扫描会不会漏掉事务?
- Q3: 如何处理补偿失败的情况?
- Q4: 多实例部署时如何避免重复处理?
- 七、方案对比
- 八、总结
评论
0 评论