SpringBoot + Saga 模式 + 事件驱动:长流程业务的柔性事务编排实战

长流程业务的挑战

在我们的日常开发工作中,经常会遇到这样的场景:

  • 保险理赔流程:报案登记→查勘定损→理算核赔→支付结案,涉及多个服务
  • 电商订单流程:创建订单→扣减库存→支付处理→物流配送→确认收货
  • 银行转账流程:扣款→转账→入账→手续费扣除→短信通知

这些业务流程的特点是:步骤多、耗时长、涉及多个服务,传统的分布式事务(如2PC)往往不适合。今天我们就以保险理赔为例,聊聊如何用Saga模式解决这个问题。

为什么选择Saga模式

相比传统的分布式事务,Saga模式有以下优势:

  • 适合长流程:每个步骤都是独立的本地事务
  • 性能更好:避免长时间锁定资源
  • 容错性强:每个步骤都有对应的补偿操作
  • 可恢复性:支持失败后的恢复和重试

保险理赔业务分析

让我们以保险理赔为例,分析其业务流程:

  1. 报案登记:记录理赔申请信息
  2. 查勘定损:现场查勘,确定损失金额
  3. 理算核赔:计算赔付金额,审核理赔
  4. 支付结案:支付理赔款,完成理赔

如果在支付环节失败,需要反向执行补偿操作:撤销理算核赔→撤销查勘定损→撤销报案登记。

解决方案思路

今天我们要解决的,就是如何用SpringBoot + Saga模式 + 事件驱动构建一个可靠的长流程事务编排系统。

核心思路是:

  1. 事件驱动:每个步骤通过事件触发
  2. 状态机管理:跟踪整个流程的状态
  3. 补偿机制:失败时自动执行补偿操作
  4. 异步处理:支持长时间运行的业务流程

Saga模式实现

1. 事件定义

// 通用事件基类
@Data
@AllArgsConstructor
public abstract class SagaEvent {
    private String eventId;
    private String sagaId;
    private LocalDateTime timestamp;
    private String eventType;
}

// 报案登记事件
@Data
@EqualsAndHashCode(callSuper = true)
public class ClaimRegisteredEvent extends SagaEvent {
    private String claimId;
    private String policyId;
    private String incidentDescription;
    
    public ClaimRegisteredEvent(String sagaId, String claimId, String policyId, String incidentDescription) {
        super(UUID.randomUUID().toString(), sagaId, LocalDateTime.now(), "CLAIM_REGISTERED");
        this.claimId = claimId;
        this.policyId = policyId;
        this.incidentDescription = incidentDescription;
    }
}

// 查勘定损事件
@Data
@EqualsAndHashCode(callSuper = true)
public class InvestigationCompletedEvent extends SagaEvent {
    private String claimId;
    private BigDecimal damageAmount;
    
    public InvestigationCompletedEvent(String sagaId, String claimId, BigDecimal damageAmount) {
        super(UUID.randomUUID().toString(), sagaId, LocalDateTime.now(), "INVESTIGATION_COMPLETED");
        this.claimId = claimId;
        this.damageAmount = damageAmount;
    }
}

2. Saga状态管理

@Entity
@Table(name = "saga_instance")
@Data
public class SagaInstance {
    @Id
    private String sagaId;
    
    @Enumerated(EnumType.STRING)
    private SagaStatus status;
    
    @ElementCollection
    @CollectionTable(name = "saga_steps", joinColumns = @JoinColumn(name = "saga_id"))
    @OrderColumn(name = "step_order")
    private List<SagaStep> steps = new ArrayList<>();
    
    private LocalDateTime createTime;
    private LocalDateTime updateTime;
}

@Data
@Entity
@Table(name = "saga_step")
public class SagaStep {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    private String stepName;
    private String stepData;
    private String compensationData;
    
    @Enumerated(EnumType.STRING)
    private StepStatus status;
    
    private LocalDateTime executeTime;
    private LocalDateTime rollbackTime;
}

3. 事件处理器

@Component
public class ClaimSagaEventHandler {
    
    @Autowired
    private ClaimService claimService;
    
    @Autowired
    private CompensationService compensationService;
    
    @EventListener
    public void handleClaimRegistered(ClaimRegisteredEvent event) {
        try {
            // 执行查勘定损
            InvestigationResult result = claimService.investigate(event.getClaimId());
            
            // 发布查勘完成事件
            InvestigationCompletedEvent investigationEvent = 
                new InvestigationCompletedEvent(event.getSagaId(), event.getClaimId(), result.getDamageAmount());
            
            ApplicationEventPublisher publisher = 
                SpringContextHolder.getBean(ApplicationEventPublisher.class);
            publisher.publishEvent(investigationEvent);
            
        } catch (Exception e) {
            // 发布补偿事件
            CompensationEvent compensationEvent = new CompensationEvent(
                event.getSagaId(), "CLAIM_REGISTERED", event.getClaimId(), e.getMessage());
            publisher.publishEvent(compensationEvent);
        }
    }
    
    @EventListener
    public void handleInvestigationCompleted(InvestigationCompletedEvent event) {
        try {
            // 执行理算核赔
            CalculationResult calculation = claimService.calculate(event.getClaimId());
            
            // 发布理算完成事件
            CalculationCompletedEvent calcEvent = 
                new CalculationCompletedEvent(event.getSagaId(), event.getClaimId(), calculation.getPayAmount());
            
            ApplicationEventPublisher publisher = 
                SpringContextHolder.getBean(ApplicationEventPublisher.class);
            publisher.publishEvent(calcEvent);
            
        } catch (Exception e) {
            // 发布补偿事件
            CompensationEvent compensationEvent = new CompensationEvent(
                event.getSagaId(), "INVESTIGATION_COMPLETED", event.getClaimId(), e.getMessage());
            publisher.publishEvent(compensationEvent);
        }
    }
}

4. 补偿机制

@Component
public class CompensationService {
    
    @EventListener
    public void handleCompensation(CompensationEvent event) {
        switch (event.getFailedStep()) {
            case "CLAIM_REGISTERED":
                rollbackClaimRegistration(event.getEntityId());
                break;
            case "INVESTIGATION_COMPLETED":
                rollbackInvestigation(event.getEntityId());
                break;
            case "CALCULATION_COMPLETED":
                rollbackCalculation(event.getEntityId());
                break;
            case "PAYMENT_COMPLETED":
                rollbackPayment(event.getEntityId());
                break;
        }
        
        // 记录补偿结果
        recordCompensationResult(event);
    }
    
    private void rollbackClaimRegistration(String claimId) {
        // 撤销报案登记
        claimService.cancelClaim(claimId);
    }
    
    private void rollbackInvestigation(String claimId) {
        // 撤销查勘定损
        claimService.cancelInvestigation(claimId);
    }
    
    private void rollbackCalculation(String claimId) {
        // 撤销理算核赔
        claimService.cancelCalculation(claimId);
    }
    
    private void rollbackPayment(String claimId) {
        // 撤销支付(可能需要发起退款流程)
        claimService.processRefund(claimId);
    }
}

状态机管理

1. 状态机定义

@Component
public class ClaimSagaStateMachine {
    
    public enum SagaState {
        START,
        CLAIM_REGISTERED,
        INVESTIGATION_COMPLETED,
        CALCULATION_COMPLETED,
        PAYMENT_COMPLETED,
        COMPLETED,
        COMPENSATING,
        COMPENSATED,
        FAILED
    }
    
    public SagaState getNextState(SagaState currentState, String event) {
        switch (currentState) {
            case START:
                if ("CLAIM_REGISTERED".equals(event)) return SagaState.CLAIM_REGISTERED;
                break;
            case CLAIM_REGISTERED:
                if ("INVESTIGATION_COMPLETED".equals(event)) return SagaState.INVESTIGATION_COMPLETED;
                break;
            case INVESTIGATION_COMPLETED:
                if ("CALCULATION_COMPLETED".equals(event)) return SagaState.CALCULATION_COMPLETED;
                break;
            case CALCULATION_COMPLETED:
                if ("PAYMENT_COMPLETED".equals(event)) return SagaState.PAYMENT_COMPLETED;
                break;
            case PAYMENT_COMPLETED:
                if ("PROCESS_COMPLETED".equals(event)) return SagaState.COMPLETED;
                break;
            case COMPENSATING:
                if ("COMPENSATION_COMPLETED".equals(event)) return SagaState.COMPENSATED;
                break;
        }
        return SagaState.FAILED;
    }
}

2. 状态持久化

@Service
public class SagaStateService {
    
    @Autowired
    private SagaRepository sagaRepository;
    
    public void updateSagaState(String sagaId, SagaState newState, String eventData) {
        SagaInstance saga = sagaRepository.findById(sagaId).orElseThrow();
        
        // 更新当前步骤状态
        SagaStep currentStep = getCurrentStep(saga);
        currentStep.setStatus(StepStatus.COMPLETED);
        currentStep.setExecuteTime(LocalDateTime.now());
        
        // 添加下一步骤
        SagaStep nextStep = new SagaStep();
        nextStep.setStepName(newState.name());
        nextStep.setData(eventData);
        nextStep.setStatus(StepStatus.PENDING);
        
        saga.getSteps().add(nextStep);
        saga.setStatus(mapToSagaStatus(newState));
        
        sagaRepository.save(saga);
    }
    
    private SagaStatus mapToSagaStatus(SagaState state) {
        switch (state) {
            case COMPLETED: return SagaStatus.SUCCESS;
            case COMPENSATED: return SagaStatus.COMPENSATED;
            case FAILED: return SagaStatus.FAILED;
            default: return SagaStatus.PROCESSING;
        }
    }
}

异常处理与重试

1. 重试机制

@Component
public class RetryableSagaProcessor {
    
    @Retryable(
        value = {Exception.class},
        maxAttempts = 3,
        backoff = @Backoff(delay = 1000, multiplier = 2)
    )
    public void processSagaStep(SagaEvent event) {
        // 处理业务逻辑
        handleSagaStep(event);
    }
    
    @Recover
    public void recover(Exception ex, SagaEvent event) {
        // 发布补偿事件
        CompensationEvent compensationEvent = new CompensationEvent(
            event.getSagaId(), getCurrentStep(event), getEntityId(event), ex.getMessage());
        
        ApplicationEventPublisher publisher = 
            SpringContextHolder.getBean(ApplicationEventPublisher.class);
        publisher.publishEvent(compensationEvent);
    }
}

2. 死信队列

@Component
public class DeadLetterQueueHandler {
    
    @RabbitListener(queues = "saga.dead.letter.queue")
    public void handleDeadLetter(Message message) {
        // 处理死信消息,可能需要人工介入
        String payload = new String(message.getBody());
        log.error("Saga死信消息处理: {}", payload);
        
        // 记录到数据库,等待人工处理
        recordDeadLetterMessage(payload);
    }
}

实际应用效果

通过SpringBoot + Saga模式的实现,我们可以获得:

  • 高可用性:即使某个步骤失败,也有完整的补偿机制
  • 可追溯性:完整的流程状态记录,便于问题排查
  • 可恢复性:支持中断后的流程恢复
  • 松耦合:各服务之间通过事件解耦

注意事项

在使用Saga模式时,需要注意以下几点:

  1. 补偿操作幂等性:确保补偿操作可以重复执行而不产生副作用
  2. 数据一致性:合理设计业务逻辑,避免补偿后数据不一致
  3. 监控告警:对长时间未完成的Saga实例进行监控
  4. 人工干预:对于无法自动补偿的情况,需要人工介入

总结

通过SpringBoot + Saga模式 + 事件驱动的架构,我们可以很好地处理保险理赔这类长流程业务。这种方案不仅保证了数据一致性,还提供了良好的容错和恢复能力,是处理复杂分布式事务的有效方案。

希望这篇文章对你有所帮助!如果你觉得有用,欢迎关注【服务端技术精选】公众号,获取更多后端技术干货。


原文首发于 www.jiangyi.space

转载请注明出处


标题:SpringBoot + Saga 模式 + 事件驱动:长流程业务的柔性事务编排实战
作者:jiangyi
地址:http://jiangyi.space/articles/2026/01/19/1768799889570.html

    0 评论
avatar