SpringBoot + Saga 模式 + 事件驱动:长流程业务的柔性事务编排实战
长流程业务的挑战
在我们的日常开发工作中,经常会遇到这样的场景:
- 保险理赔流程:报案登记→查勘定损→理算核赔→支付结案,涉及多个服务
- 电商订单流程:创建订单→扣减库存→支付处理→物流配送→确认收货
- 银行转账流程:扣款→转账→入账→手续费扣除→短信通知
这些业务流程的特点是:步骤多、耗时长、涉及多个服务,传统的分布式事务(如2PC)往往不适合。今天我们就以保险理赔为例,聊聊如何用Saga模式解决这个问题。
为什么选择Saga模式
相比传统的分布式事务,Saga模式有以下优势:
- 适合长流程:每个步骤都是独立的本地事务
- 性能更好:避免长时间锁定资源
- 容错性强:每个步骤都有对应的补偿操作
- 可恢复性:支持失败后的恢复和重试
保险理赔业务分析
让我们以保险理赔为例,分析其业务流程:
- 报案登记:记录理赔申请信息
- 查勘定损:现场查勘,确定损失金额
- 理算核赔:计算赔付金额,审核理赔
- 支付结案:支付理赔款,完成理赔
如果在支付环节失败,需要反向执行补偿操作:撤销理算核赔→撤销查勘定损→撤销报案登记。
解决方案思路
今天我们要解决的,就是如何用SpringBoot + Saga模式 + 事件驱动构建一个可靠的长流程事务编排系统。
核心思路是:
- 事件驱动:每个步骤通过事件触发
- 状态机管理:跟踪整个流程的状态
- 补偿机制:失败时自动执行补偿操作
- 异步处理:支持长时间运行的业务流程
Saga模式实现
1. 事件定义
// 通用事件基类
@Data
@AllArgsConstructor
public abstract class SagaEvent {
private String eventId;
private String sagaId;
private LocalDateTime timestamp;
private String eventType;
}
// 报案登记事件
@Data
@EqualsAndHashCode(callSuper = true)
public class ClaimRegisteredEvent extends SagaEvent {
private String claimId;
private String policyId;
private String incidentDescription;
public ClaimRegisteredEvent(String sagaId, String claimId, String policyId, String incidentDescription) {
super(UUID.randomUUID().toString(), sagaId, LocalDateTime.now(), "CLAIM_REGISTERED");
this.claimId = claimId;
this.policyId = policyId;
this.incidentDescription = incidentDescription;
}
}
// 查勘定损事件
@Data
@EqualsAndHashCode(callSuper = true)
public class InvestigationCompletedEvent extends SagaEvent {
private String claimId;
private BigDecimal damageAmount;
public InvestigationCompletedEvent(String sagaId, String claimId, BigDecimal damageAmount) {
super(UUID.randomUUID().toString(), sagaId, LocalDateTime.now(), "INVESTIGATION_COMPLETED");
this.claimId = claimId;
this.damageAmount = damageAmount;
}
}
2. Saga状态管理
@Entity
@Table(name = "saga_instance")
@Data
public class SagaInstance {
@Id
private String sagaId;
@Enumerated(EnumType.STRING)
private SagaStatus status;
@ElementCollection
@CollectionTable(name = "saga_steps", joinColumns = @JoinColumn(name = "saga_id"))
@OrderColumn(name = "step_order")
private List<SagaStep> steps = new ArrayList<>();
private LocalDateTime createTime;
private LocalDateTime updateTime;
}
@Data
@Entity
@Table(name = "saga_step")
public class SagaStep {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String stepName;
private String stepData;
private String compensationData;
@Enumerated(EnumType.STRING)
private StepStatus status;
private LocalDateTime executeTime;
private LocalDateTime rollbackTime;
}
3. 事件处理器
@Component
public class ClaimSagaEventHandler {
@Autowired
private ClaimService claimService;
@Autowired
private CompensationService compensationService;
@EventListener
public void handleClaimRegistered(ClaimRegisteredEvent event) {
try {
// 执行查勘定损
InvestigationResult result = claimService.investigate(event.getClaimId());
// 发布查勘完成事件
InvestigationCompletedEvent investigationEvent =
new InvestigationCompletedEvent(event.getSagaId(), event.getClaimId(), result.getDamageAmount());
ApplicationEventPublisher publisher =
SpringContextHolder.getBean(ApplicationEventPublisher.class);
publisher.publishEvent(investigationEvent);
} catch (Exception e) {
// 发布补偿事件
CompensationEvent compensationEvent = new CompensationEvent(
event.getSagaId(), "CLAIM_REGISTERED", event.getClaimId(), e.getMessage());
publisher.publishEvent(compensationEvent);
}
}
@EventListener
public void handleInvestigationCompleted(InvestigationCompletedEvent event) {
try {
// 执行理算核赔
CalculationResult calculation = claimService.calculate(event.getClaimId());
// 发布理算完成事件
CalculationCompletedEvent calcEvent =
new CalculationCompletedEvent(event.getSagaId(), event.getClaimId(), calculation.getPayAmount());
ApplicationEventPublisher publisher =
SpringContextHolder.getBean(ApplicationEventPublisher.class);
publisher.publishEvent(calcEvent);
} catch (Exception e) {
// 发布补偿事件
CompensationEvent compensationEvent = new CompensationEvent(
event.getSagaId(), "INVESTIGATION_COMPLETED", event.getClaimId(), e.getMessage());
publisher.publishEvent(compensationEvent);
}
}
}
4. 补偿机制
@Component
public class CompensationService {
@EventListener
public void handleCompensation(CompensationEvent event) {
switch (event.getFailedStep()) {
case "CLAIM_REGISTERED":
rollbackClaimRegistration(event.getEntityId());
break;
case "INVESTIGATION_COMPLETED":
rollbackInvestigation(event.getEntityId());
break;
case "CALCULATION_COMPLETED":
rollbackCalculation(event.getEntityId());
break;
case "PAYMENT_COMPLETED":
rollbackPayment(event.getEntityId());
break;
}
// 记录补偿结果
recordCompensationResult(event);
}
private void rollbackClaimRegistration(String claimId) {
// 撤销报案登记
claimService.cancelClaim(claimId);
}
private void rollbackInvestigation(String claimId) {
// 撤销查勘定损
claimService.cancelInvestigation(claimId);
}
private void rollbackCalculation(String claimId) {
// 撤销理算核赔
claimService.cancelCalculation(claimId);
}
private void rollbackPayment(String claimId) {
// 撤销支付(可能需要发起退款流程)
claimService.processRefund(claimId);
}
}
状态机管理
1. 状态机定义
@Component
public class ClaimSagaStateMachine {
public enum SagaState {
START,
CLAIM_REGISTERED,
INVESTIGATION_COMPLETED,
CALCULATION_COMPLETED,
PAYMENT_COMPLETED,
COMPLETED,
COMPENSATING,
COMPENSATED,
FAILED
}
public SagaState getNextState(SagaState currentState, String event) {
switch (currentState) {
case START:
if ("CLAIM_REGISTERED".equals(event)) return SagaState.CLAIM_REGISTERED;
break;
case CLAIM_REGISTERED:
if ("INVESTIGATION_COMPLETED".equals(event)) return SagaState.INVESTIGATION_COMPLETED;
break;
case INVESTIGATION_COMPLETED:
if ("CALCULATION_COMPLETED".equals(event)) return SagaState.CALCULATION_COMPLETED;
break;
case CALCULATION_COMPLETED:
if ("PAYMENT_COMPLETED".equals(event)) return SagaState.PAYMENT_COMPLETED;
break;
case PAYMENT_COMPLETED:
if ("PROCESS_COMPLETED".equals(event)) return SagaState.COMPLETED;
break;
case COMPENSATING:
if ("COMPENSATION_COMPLETED".equals(event)) return SagaState.COMPENSATED;
break;
}
return SagaState.FAILED;
}
}
2. 状态持久化
@Service
public class SagaStateService {
@Autowired
private SagaRepository sagaRepository;
public void updateSagaState(String sagaId, SagaState newState, String eventData) {
SagaInstance saga = sagaRepository.findById(sagaId).orElseThrow();
// 更新当前步骤状态
SagaStep currentStep = getCurrentStep(saga);
currentStep.setStatus(StepStatus.COMPLETED);
currentStep.setExecuteTime(LocalDateTime.now());
// 添加下一步骤
SagaStep nextStep = new SagaStep();
nextStep.setStepName(newState.name());
nextStep.setData(eventData);
nextStep.setStatus(StepStatus.PENDING);
saga.getSteps().add(nextStep);
saga.setStatus(mapToSagaStatus(newState));
sagaRepository.save(saga);
}
private SagaStatus mapToSagaStatus(SagaState state) {
switch (state) {
case COMPLETED: return SagaStatus.SUCCESS;
case COMPENSATED: return SagaStatus.COMPENSATED;
case FAILED: return SagaStatus.FAILED;
default: return SagaStatus.PROCESSING;
}
}
}
异常处理与重试
1. 重试机制
@Component
public class RetryableSagaProcessor {
@Retryable(
value = {Exception.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void processSagaStep(SagaEvent event) {
// 处理业务逻辑
handleSagaStep(event);
}
@Recover
public void recover(Exception ex, SagaEvent event) {
// 发布补偿事件
CompensationEvent compensationEvent = new CompensationEvent(
event.getSagaId(), getCurrentStep(event), getEntityId(event), ex.getMessage());
ApplicationEventPublisher publisher =
SpringContextHolder.getBean(ApplicationEventPublisher.class);
publisher.publishEvent(compensationEvent);
}
}
2. 死信队列
@Component
public class DeadLetterQueueHandler {
@RabbitListener(queues = "saga.dead.letter.queue")
public void handleDeadLetter(Message message) {
// 处理死信消息,可能需要人工介入
String payload = new String(message.getBody());
log.error("Saga死信消息处理: {}", payload);
// 记录到数据库,等待人工处理
recordDeadLetterMessage(payload);
}
}
实际应用效果
通过SpringBoot + Saga模式的实现,我们可以获得:
- 高可用性:即使某个步骤失败,也有完整的补偿机制
- 可追溯性:完整的流程状态记录,便于问题排查
- 可恢复性:支持中断后的流程恢复
- 松耦合:各服务之间通过事件解耦
注意事项
在使用Saga模式时,需要注意以下几点:
- 补偿操作幂等性:确保补偿操作可以重复执行而不产生副作用
- 数据一致性:合理设计业务逻辑,避免补偿后数据不一致
- 监控告警:对长时间未完成的Saga实例进行监控
- 人工干预:对于无法自动补偿的情况,需要人工介入
总结
通过SpringBoot + Saga模式 + 事件驱动的架构,我们可以很好地处理保险理赔这类长流程业务。这种方案不仅保证了数据一致性,还提供了良好的容错和恢复能力,是处理复杂分布式事务的有效方案。
希望这篇文章对你有所帮助!如果你觉得有用,欢迎关注【服务端技术精选】公众号,获取更多后端技术干货。
原文首发于 www.jiangyi.space
转载请注明出处
标题:SpringBoot + Saga 模式 + 事件驱动:长流程业务的柔性事务编排实战
作者:jiangyi
地址:http://jiangyi.space/articles/2026/01/19/1768799889570.html
0 评论