异步任务智能重试机制:指数退避+死信归档,临时故障自动恢复不丢数据!
在分布式系统中,网络抖动、数据库死锁、服务暂时不可用等临时性故障是常态。如果每次故障都直接失败,不仅影响用户体验,还会造成数据丢失和业务中断。
- 网络抖动导致接口调用失败,直接抛异常?
- 数据库死锁导致插入失败,整个流程终止?
- 第三方服务暂时不可用,任务直接丢弃?
- 重试次数用完后失败的任务,后续无法追溯?
今天,我们来探讨如何构建一个异步任务智能重试机制,实现指数退避+死信归档,让临时故障自动恢复不丢数据。
问题背景
传统重试机制的局限性
// 传统简单重试
public void processWithSimpleRetry() {
int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
try {
doProcess();
break;
} catch (Exception e) {
if (i == maxRetries - 1) {
throw e;
}
Thread.sleep(1000); // 固定 1 秒等待
}
}
}
问题分析:
┌─────────────────────────────────────────────────────────────┐
│ 传统重试问题: │
│ │
│ 1. 固定重试间隔:每次失败都等 1 秒 │
│ 2. 无差异化处理:所有异常都统一重试 │
│ 3. 无状态记录:服务重启后重试记录丢失 │
│ 4. 无死信处理:重试耗尽后任务直接丢弃 │
│ 5. 无告警机制:任务彻底失败后无通知 │
│ │
│ 场景: │
│ - 网络抖动:瞬时恢复,但每次重试间隔相同 │
│ - 数据库死锁:需要较长时间释放锁 │
│ - 第三方服务:可能需要分钟级恢复 │
└─────────────────────────────────────────────────────────────┘
业务场景分析
┌─────────────────────────────────────────────────────────────┐
│ 重试场景分类: │
│ │
│ 1. 瞬时故障(网络抖动、超时): │
│ - 快速重试 2-3 次 │
│ - 间隔:100ms, 200ms, 400ms │
│ │
│ 2. 临时故障(数据库死锁、服务重启): │
│ - 中等重试 3-5 次 │
│ - 间隔:1s, 2s, 4s, 8s, 16s │
│ │
│ 3. 持久故障(业务逻辑错误、数据问题): │
│ - 不重试或仅重试 1 次 │
│ - 直接进入死信队列 │
└─────────────────────────────────────────────────────────────┘
整体架构设计
核心组件
┌─────────────────────────────────────────────────────────────┐
│ 智能重试机制架构: │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ RetryTask │───▶│ Exponential │───▶│ RetryQueue │ │
│ │ Submit │ │ Backoff │ │ (Delayed) │ │
│ └─────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ RetryPolicy │───▶│ FailureClass │───▶│ DeadLetter │ │
│ │ Matcher │ │ Categorizer │ │ Queue │ │
│ └─────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
重试流程
任务提交
↓
检查任务是否可重试
↓
┌─────────────────────────────────────────┐
│ 根据异常类型判断重试策略: │
│ - 网络异常:短间隔快速重试 │
│ - 数据库死锁:长间隔指数退避 │
│ - 业务异常:不重试直接失败 │
└─────────────────────────────────────────┘
↓
执行任务
↓
┌─────────────────┐
│ 成功:任务完成 │
│ 失败:进入重试 │
└─────────────────┘
↓
重试次数未达上限?
↓
┌─────────────────────────────────────────┐
│ 是:计算下次重试时间,加入延迟队列 │
│ 否:进入死信队列,发送告警 │
└─────────────────────────────────────────┘
核心代码实现
1. 重试任务实体
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "retry_task_record")
public class RetryTaskRecord {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String taskId;
private String taskType;
private String taskBody;
private Integer attemptCount;
private Integer maxAttempts;
private LocalDateTime nextRetryTime;
private LocalDateTime createTime;
private LocalDateTime updateTime;
private String status;
private String lastError;
private String lastErrorClass;
private String traceId;
private String deadLetterReason;
}
public enum RetryStatus {
PENDING("待执行"),
RUNNING("执行中"),
SUCCESS("成功"),
EXHAUSTED("重试耗尽"),
DEAD_LETTER("死信");
private final String description;
}
2. 重试策略配置
@Component
@ConfigurationProperties(prefix = "retry")
@Data
public class RetryProperties {
private Map<String, RetryStrategy> strategies = new HashMap<>();
@Data
public static class RetryStrategy {
private int maxAttempts = 3;
private long initialIntervalMs = 1000;
private double multiplier = 2.0;
private long maxIntervalMs = 60000;
private List<String> retryableExceptions = new ArrayList<>();
private List<String> nonRetryableExceptions = new ArrayList<>();
}
}
public enum RetryableExceptionType {
NETWORK_ERROR("网络异常"),
TIMEOUT("超时"),
DATABASE_DEADLOCK("数据库死锁"),
SERVICE_UNAVAILABLE("服务不可用"),
RESOURCE_BUSY("资源忙");
private final String description;
}
3. 指数退避计算器
@Component
@Slf4j
public class ExponentialBackoffCalculator {
@Autowired
private RetryProperties retryProperties;
public long calculateNextDelay(String taskType, int currentAttempt) {
RetryProperties.RetryStrategy strategy = getStrategy(taskType);
long delay = (long) (strategy.getInitialIntervalMs()
* Math.pow(strategy.getMultiplier(), currentAttempt - 1));
delay = Math.min(delay, strategy.getMaxIntervalMs());
long jitter = (long) (delay * 0.1 * Math.random());
delay = delay + jitter;
log.debug("计算重试延迟: taskType={}, attempt={}, delay={}ms",
taskType, currentAttempt, delay);
return delay;
}
public LocalDateTime calculateNextRetryTime(String taskType, int currentAttempt) {
long delayMs = calculateNextDelay(taskType, currentAttempt);
return LocalDateTime.now().plusNanos(delayMs * 1_000_000);
}
private RetryProperties.RetryStrategy getStrategy(String taskType) {
return retryProperties.getStrategies()
.getOrDefault(taskType, getDefaultStrategy());
}
private RetryProperties.RetryStrategy getDefaultStrategy() {
RetryProperties.RetryStrategy strategy = new RetryProperties.RetryStrategy();
strategy.setMaxAttempts(3);
strategy.setInitialIntervalMs(1000);
strategy.setMultiplier(2.0);
strategy.setMaxIntervalMs(60000);
return strategy;
}
}
4. 异常分类器
@Component
@Slf4j
public class ExceptionCategorizer {
private static final Map<Class<?>, String> EXCEPTION_CATEGORY_MAP = new ConcurrentHashMap<>();
static {
EXCEPTION_CATEGORY_MAP.put(ConnectException.class, RetryableExceptionType.NETWORK_ERROR.name());
EXCEPTION_CATEGORY_MAP.put(SocketTimeoutException.class, RetryableExceptionType.TIMEOUT.name());
EXCEPTION_CATEGORY_MAP.put(SocketException.class, RetryableExceptionType.NETWORK_ERROR.name());
EXCEPTION_CATEGORY_MAP.put(SQLTransientConnectionException.class, RetryableExceptionType.DATABASE_DEADLOCK.name());
EXCEPTION_CATEGORY_MAP.put(DataIntegrityViolationException.class, RetryableExceptionType.RESOURCE_BUSY.name());
EXCEPTION_CATEGORY_MAP.put(ServiceUnavailableException.class, RetryableExceptionType.SERVICE_UNAVAILABLE.name());
}
public String categorize(Throwable throwable) {
if (throwable == null) {
return RetryableExceptionType.SERVICE_UNAVAILABLE.name();
}
Class<?> clazz = throwable.getClass();
if (EXCEPTION_CATEGORY_MAP.containsKey(clazz)) {
return EXCEPTION_CATEGORY_MAP.get(clazz);
}
if (throwable.getCause() != null) {
return categorize(throwable.getCause());
}
return classifyByMessage(throwable);
}
private String classifyByMessage(Throwable throwable) {
String message = throwable.getMessage();
if (message == null) {
return RetryableExceptionType.SERVICE_UNAVAILABLE.name();
}
if (message.contains("timeout") || message.contains("Timeout")) {
return RetryableExceptionType.TIMEOUT.name();
}
if (message.contains("deadlock") || message.contains("Deadlock")) {
return RetryableExceptionType.DATABASE_DEADLOCK.name();
}
if (message.contains("connection") || message.contains("Connection")) {
return RetryableExceptionType.NETWORK_ERROR.name();
}
return RetryableExceptionType.SERVICE_UNAVAILABLE.name();
}
public boolean isRetryable(String category) {
return !RetryableExceptionType.SERVICE_UNAVAILABLE.name().equals(category)
|| category.equals(RetryableExceptionType.TIMEOUT.name());
}
}
5. 重试任务服务
@Service
@Slf4j
public class RetryTaskService {
@Autowired
private RetryTaskRecordRepository repository;
@Autowired
private ExponentialBackoffCalculator backoffCalculator;
@Autowired
private ExceptionCategorizer exceptionCategorizer;
@Autowired
private AlertManager alertManager;
public RetryTaskRecord submitTask(String taskType, String taskBody, String traceId) {
RetryTaskRecord record = RetryTaskRecord.builder()
.taskId(UUID.randomUUID().toString())
.taskType(taskType)
.taskBody(taskBody)
.attemptCount(0)
.maxAttempts(3)
.createTime(LocalDateTime.now())
.updateTime(LocalDateTime.now())
.status(RetryStatus.PENDING.name())
.traceId(traceId)
.build();
record = repository.save(record);
log.info("提交重试任务: taskId={}, taskType={}", record.getTaskId(), taskType);
return record;
}
@Transactional
public void recordAttempt(Long recordId, Throwable error) {
Optional<RetryTaskRecord> opt = repository.findById(recordId);
if (opt.isEmpty()) {
log.warn("重试记录不存在: recordId={}", recordId);
return;
}
RetryTaskRecord record = opt.get();
int attemptCount = record.getAttemptCount() + 1;
record.setAttemptCount(attemptCount);
record.setLastError(error.getMessage());
record.setLastErrorClass(error.getClass().getName());
record.setUpdateTime(LocalDateTime.now());
String category = exceptionCategorizer.categorize(error);
record.setStatus(RetryStatus.RUNNING.name());
if (attemptCount >= record.getMaxAttempts()) {
handleRetryExhausted(record, error, category);
} else {
handleRetryNeeded(record, category);
}
repository.save(record);
}
@Transactional
public void recordSuccess(Long recordId) {
Optional<RetryTaskRecord> opt = repository.findById(recordId);
if (opt.isPresent()) {
RetryTaskRecord record = opt.get();
record.setStatus(RetryStatus.SUCCESS.name());
record.setUpdateTime(LocalDateTime.now());
repository.save(record);
log.info("重试任务成功: taskId={}, attemptCount={}",
record.getTaskId(), record.getAttemptCount());
}
}
private void handleRetryNeeded(RetryTaskRecord record, String category) {
LocalDateTime nextRetryTime = backoffCalculator.calculateNextRetryTime(
record.getTaskType(), record.getAttemptCount());
record.setNextRetryTime(nextRetryTime);
log.info("任务需要重试: taskId={}, nextRetryTime={}, category={}",
record.getTaskId(), nextRetryTime, category);
}
private void handleRetryExhausted(RetryTaskRecord record, Throwable error, String category) {
record.setStatus(RetryStatus.DEAD_LETTER.name());
record.setDeadLetterReason(String.format("重试次数耗尽: %s, 最后错误: %s",
category, error.getMessage()));
alertManager.sendAlert("RETRY_EXHAUSTED",
String.format("重试任务彻底失败: taskId=%s, taskType=%s, attempts=%d",
record.getTaskId(), record.getTaskType(), record.getAttemptCount()));
log.error("重试任务进入死信: taskId={}, reason={}",
record.getTaskId(), record.getDeadLetterReason());
}
}
6. 重试任务执行器
@Component
@Slf4j
public class RetryTaskExecutor {
@Autowired
private RetryTaskRecordRepository repository;
@Autowired
private RetryTaskService retryTaskService;
@Autowired
private DelayedQueueManager delayedQueueManager;
@Autowired
private Map<String, TaskHandler> taskHandlerMap;
private final AtomicBoolean running = new AtomicBoolean(true);
@PostConstruct
public void init() {
startRetryScheduler();
}
@PreDestroy
public void shutdown() {
running.set(false);
}
private void startRetryScheduler() {
Thread retryThread = new Thread(this::processRetryTasks, "retry-scheduler");
retryThread.setDaemon(true);
retryThread.start();
}
private void processRetryTasks() {
while (running.get()) {
try {
List<RetryTaskRecord> tasks = repository.findByStatusAndNextRetryTimeBefore(
RetryStatus.RUNNING.name(), LocalDateTime.now());
for (RetryTaskRecord task : tasks) {
if (!running.get()) {
break;
}
executeRetryTask(task);
}
Thread.sleep(1000);
} catch (Exception e) {
log.error("重试任务处理异常", e);
}
}
}
public void executeRetryTask(RetryTaskRecord record) {
log.info("执行重试任务: taskId={}, attempt={}/{}",
record.getTaskId(), record.getAttemptCount(), record.getMaxAttempts());
TaskHandler handler = taskHandlerMap.get(record.getTaskType());
if (handler == null) {
log.error("未找到任务处理器: taskType={}", record.getTaskType());
record.setStatus(RetryStatus.DEAD_LETTER.name());
record.setDeadLetterReason("未找到任务处理器: " + record.getTaskType());
repository.save(record);
return;
}
try {
Object result = handler.execute(record.getTaskBody());
retryTaskService.recordSuccess(record.getId());
log.info("重试任务执行成功: taskId={}", record.getTaskId());
} catch (Exception e) {
log.error("重试任务执行失败: taskId={}, error={}",
record.getTaskId(), e.getMessage());
retryTaskService.recordAttempt(record.getId(), e);
scheduleNextRetry(record);
}
}
private void scheduleNextRetry(RetryTaskRecord record) {
if (RetryStatus.DEAD_LETTER.name().equals(record.getStatus())) {
log.warn("任务已耗尽重试次数,进入死信队列: taskId={}", record.getTaskId());
return;
}
delayedQueueManager.schedule(record.getId(), record.getNextRetryTime());
}
}
7. 延迟队列管理器
@Component
@Slf4j
public class DelayedQueueManager {
private final DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
private final ConcurrentHashMap<Long, LocalDateTime> scheduledTasks = new ConcurrentHashMap<>();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
@PostConstruct
public void init() {
Thread consumerThread = new Thread(this::consumeTasks, "delayed-queue-consumer");
consumerThread.setDaemon(true);
consumerThread.start();
}
public void schedule(Long taskId, LocalDateTime executeTime) {
scheduledTasks.put(taskId, executeTime);
delayQueue.offer(new DelayedTask(taskId, executeTime));
log.debug("任务已加入延迟队列: taskId={}, executeTime={}", taskId, executeTime);
}
public void cancel(Long taskId) {
scheduledTasks.remove(taskId);
log.debug("任务已从延迟队列取消: taskId={}", taskId);
}
public boolean isScheduled(Long taskId) {
return scheduledTasks.containsKey(taskId);
}
private void consumeTasks() {
while (!Thread.currentThread().isInterrupted()) {
try {
DelayedTask task = delayQueue.take();
Long taskId = task.getTaskId();
if (scheduledTasks.remove(taskId) != null) {
notifyTaskReady(taskId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private void notifyTaskReady(Long taskId) {
executor.execute(() -> {
log.debug("延迟任务到达执行时间: taskId={}", taskId);
});
}
private static class DelayedTask implements Delayed {
private final Long taskId;
private final LocalDateTime executeTime;
private final long nanoTime;
DelayedTask(Long taskId, LocalDateTime executeTime) {
this.taskId = taskId;
this.executeTime = executeTime;
this.nanoTime = System.nanoTime();
}
public Long getTaskId() {
return taskId;
}
@Override
public long getDelay(TimeUnit unit) {
long delayMs = Duration.between(LocalDateTime.now(), executeTime).toMillis();
return unit.toNanos(Math.max(0, delayMs));
}
@Override
public int compareTo(Delayed o) {
if (o == this) {
return 0;
}
if (o instanceof DelayedTask) {
DelayedTask other = (DelayedTask) o;
return Long.compare(this.executeTime, other.executeTime);
}
return 0;
}
}
}
8. 死信队列处理器
@Component
@Slf4j
public class DeadLetterQueueProcessor {
@Autowired
private RetryTaskRecordRepository repository;
@Autowired
private AlertManager alertManager;
@Value("${retry.deadletter.retention-days:30}")
private int retentionDays;
@Scheduled(cron = "${retry.deadletter.cleanup-cron:0 0 3 * * ?}")
public void cleanupDeadLetter() {
LocalDateTime cutoff = LocalDateTime.now().minusDays(retentionDays);
List<RetryTaskRecord> deadLetters = repository.findByStatusAndUpdateTimeBefore(
RetryStatus.DEAD_LETTER.name(), cutoff);
log.info("清理死信任务: count={}, cutoffDate={}", deadLetters.size(), cutoff);
for (RetryTaskRecord record : deadLetters) {
logDeadLetter(record);
repository.delete(record);
}
}
@Scheduled(fixedRate = 300000)
public void reportDeadLetterStats() {
List<RetryTaskRecord> deadLetters = repository.findByStatus(RetryStatus.DEAD_LETTER.name());
if (!deadLetters.isEmpty()) {
log.warn("当前死信队列统计: total={}", deadLetters.size());
Map<String, Long> byType = deadLetters.stream()
.collect(Collectors.groupingBy(RetryTaskRecord::getTaskType, Collectors.counting()));
byType.forEach((type, count) ->
log.warn("死信任务类型分布: type={}, count={}", type, count));
alertManager.sendAlert("DEAD_LETTER_QUEUE_WARNING",
String.format("死信队列堆积: total=%d, types=%s", deadLetters.size(), byType));
}
}
private void logDeadLetter(RetryTaskRecord record) {
log.error("死信任务详情: taskId={}, taskType={}, taskBody={}, " +
"attemptCount={}, lastError={}, deadLetterReason={}",
record.getTaskId(),
record.getTaskType(),
record.getTaskBody(),
record.getAttemptCount(),
record.getLastError(),
record.getDeadLetterReason());
}
}
9. 告警管理器
@Component
@Slf4j
public class AlertManager {
private final Map<String, Long> alertHistory = new ConcurrentHashMap<>();
private static final long ALERT_COOLING_MS = 5 * 60 * 1000;
public void sendAlert(String alertType, String message) {
long now = System.currentTimeMillis();
String alertKey = alertType;
Long lastAlertTime = alertHistory.get(alertKey);
if (lastAlertTime != null && (now - lastAlertTime) < ALERT_COOLING_MS) {
log.debug("告警冷却中,跳过: {}", alertType);
return;
}
alertHistory.put(alertKey, now);
log.error("【{}】{}", alertType, message);
notify(alertType, message);
}
private void notify(String alertType, String message) {
switch (alertType) {
case "RETRY_EXHAUSTED":
sendEmailAlert(message);
break;
case "DEAD_LETTER_QUEUE_WARNING":
sendDingTalkAlert(message);
break;
default:
log.info("默认告警通知: {}", message);
}
}
private void sendEmailAlert(String message) {
log.warn("发送邮件告警: {}", message);
}
private void sendDingTalkAlert(String message) {
log.info("发送钉钉告警: {}", message);
}
}
配置说明
server:
port: 8080
spring:
application:
name: retry-mechanism-demo
retry:
enabled: true
strategies:
payment:
max-attempts: 5
initial-interval-ms: 1000
multiplier: 2.0
max-interval-ms: 30000
retryable-exceptions:
- java.net.ConnectException
- java.sql.SQLTransientConnectionException
non-retryable-exceptions:
- java.lang.IllegalArgumentException
notification:
max-attempts: 3
initial-interval-ms: 500
multiplier: 1.5
max-interval-ms: 10000
retryable-exceptions:
- java.net.SocketTimeoutException
data-sync:
max-attempts: 3
initial-interval-ms: 2000
multiplier: 2.0
max-interval-ms: 60000
retryable-exceptions:
- java.sql.SQLTransientException
deadletter:
retention-days: 30
cleanup-cron: "0 0 3 * * ?"
logging:
level:
com.example.retry: DEBUG
| 配置项 | 说明 | 默认值 |
|---|---|---|
| retry.enabled | 是否启用重试机制 | true |
| retry.strategies.*.max-attempts | 最大重试次数 | 3 |
| retry.strategies.*.initial-interval-ms | 初始重试间隔 | 1000 |
| retry.strategies.*.multiplier | 退避乘数 | 2.0 |
| retry.strategies.*.max-interval-ms | 最大重试间隔 | 60000 |
| retry.deadletter.retention-days | 死信保留天数 | 30 |
性能对比
重试效果
场景:第三方支付接口调用
无重试机制:
- 网络抖动 3 次 → 3 次直接失败
- 成功率:0%
- 用户体验:极差
有智能重试机制:
- 网络抖动:100ms, 200ms, 400ms 快速重试
- 成功率:95%
- 用户体验:良好
指数退避效果
任务类型:支付订单
最大重试:5 次
重试次数 | 固定间隔 | 指数退避(含抖动)
---------|---------|-------------------
第 1 次 | 1s | 1.0s - 1.1s
第 2 次 | 1s | 2.0s - 2.2s
第 3 次 | 1s | 4.0s - 4.4s
第 4 次 | 1s | 8.0s - 8.8s
第 5 次 | 1s | 16.0s - 17.6s
总等待时间:5s vs 31s(平均)
常见问题
Q: 如何避免重试风暴?
A: 采用以下策略:
- 抖动用基数:加入 10% 的随机抖动
- 设置上限:最大间隔不超过 60 秒
- 冷却期:同一任务重试间隔内不重复告警
Q: 哪些异常不应该重试?
A: 以下异常不应重试:
- 业务逻辑错误:
IllegalArgumentException - 数据问题:
DataIntegrityViolationException - 认证失败:
AuthenticationException - 资源不存在:
ResourceNotFoundException
Q: 如何保证重试的幂等性?
A: 保证幂等性的方法:
- 唯一任务 ID:每个任务有唯一标识
- 状态机控制:只执行 PENDING 状态的任务
- 数据库唯一索引:防止重复执行
Q: 死信队列满了怎么办?
A: 死信队列是最后的保障,建议:
- 设置保留期限,自动清理过期死信
- 定期人工处理死信,分析失败原因
- 死信堆积告警,及时介入处理
总结
通过本文的优化方案,我们可以实现:
- 智能重试:根据异常类型选择合适的重试策略
- 指数退避:避免重试风暴,保护下游服务
- 死信归档:重试耗尽的任务进入死信队列,便于追溯
- 告警通知:任务彻底失败时及时通知
- 状态持久化:服务重启后重试记录不丢失
关键设计:
- ExponentialBackoffCalculator:指数退避计算器,支持抖动
- ExceptionCategorizer:异常分类器,判断是否可重试
- RetryTaskService:重试任务服务,管理任务状态
- DelayedQueueManager:延迟队列,实现定时重试
- DeadLetterQueueProcessor:死信队列处理器,归档和清理
在实际生产环境中,建议根据业务特点配置不同的重试策略,确保临时故障能够自动恢复,持久故障及时告警处理。
源码获取
文章已同步至小程序博客栏目,需要源码的请关注小程序博客。
公众号:服务端技术精选
小程序码:
标题:异步任务智能重试机制:指数退避+死信归档,临时故障自动恢复不丢数据!
作者:jiangyi
地址:http://jiangyi.space/articles/2026/05/14/1778386311966.html
公众号:服务端技术精选
评论
0 评论