基于Redis的4种延时队列实现方式及实战
在日常开发中,我们经常会遇到需要延迟执行任务的场景,比如订单超时取消、优惠券到期提醒、邮件定时发送、消息重试等。传统的做法可能是使用定时任务扫描数据库,但这种方式效率低下,特别是在高并发场景下。
Redis作为一个高性能的内存数据库,为我们提供了多种实现延时队列的方式。今天我就来详细介绍4种基于Redis的延时队列实现方式,并分析它们各自的优缺点和适用场景。
什么是延时队列?
延时队列顾名思义,是指元素进入队列后,可以延时一定时间再被消费者取出执行。这与普通队列的区别在于,普通队列中的元素一旦入队就可以被立即消费,而延时队列中的元素需要等到指定时间后才能被消费。
为什么要使用Redis实现延时队列?
使用Redis实现延时队列有几个显著优势:
- 高性能:Redis基于内存操作,读写速度极快
- 丰富的数据结构:支持String、Hash、List、Set、ZSet、Stream等多种数据结构
- 原子性操作:Redis的命令都是原子性的,保证了数据一致性
- 持久化支持:可以配置RDB和AOF持久化,防止数据丢失
4种实现方式详解
1. 基于Sorted Set的延时队列
这是最经典也是最常用的实现方式。Sorted Set(简称ZSet)是Redis提供的一个有序集合数据结构,每个元素都关联一个double类型的分数(score),Redis会根据score值对集合中的元素进行排序。
实现原理是将任务的执行时间戳作为score,任务内容作为member。消费者定时查询score小于等于当前时间戳的元素,即为到期任务。
实现代码:
@Component
public class SortedSetDelayQueue {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String DELAY_QUEUE_KEY = "delay_queue:sorted_set";
/**
* 添加延时任务
*/
public void addTask(String taskId, Object taskData, long delaySeconds) {
long executeTime = Instant.now().plusSeconds(delaySeconds).toEpochMilli();
String taskStr = JSON.toJSONString(taskData);
// 将任务添加到ZSet,执行时间戳作为score
redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY, taskStr, executeTime);
log.info("添加延时任务到ZSet队列,taskId: {}, 执行时间: {}, 任务数据: {}", taskId, executeTime, taskStr);
}
/**
* 消费延时任务
*/
public void consumeTasks() {
long currentTime = System.currentTimeMillis();
// 查询所有到期的任务(score <= 当前时间戳)
Set<ZSetOperations.TypedTuple<String>> expiredTasks =
redisTemplate.opsForZSet().rangeByScoreWithScores(DELAY_QUEUE_KEY, 0, currentTime);
if (expiredTasks != null && !expiredTasks.isEmpty()) {
for (ZSetOperations.TypedTuple<String> taskTuple : expiredTasks) {
String taskData = taskTuple.getValue();
Double score = taskTuple.getScore();
// 从队列中移除任务
Boolean removed = redisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY, taskData);
if (Boolean.TRUE.equals(removed)) {
log.info("处理到期任务,执行时间: {}, 任务数据: {}", (long)score.doubleValue(), taskData);
// 执行实际的任务处理逻辑
processTask(taskData);
}
}
}
}
// ... 其他辅助方法
}
优点:
- 实现简单,利用ZSet的score排序功能
- 性能较好,添加和查询操作都是O(log N)
- 支持批量处理到期任务
- Redis原生命令支持,稳定可靠
缺点:
- 需要定时轮询检查到期任务
- 实时性依赖轮询频率,可能存在延迟
- 大量任务时轮询开销较大
2. 基于List的延时队列
这种方式是将任务和执行时间封装后存入List,通过定时轮询检查是否有到期任务。
实现代码:
@Component
public class ListDelayQueue {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String DELAY_QUEUE_KEY = "delay_queue:list";
public static class DelayTask {
private String taskId;
private Object taskData;
private long executeTime; // 执行时间戳
public DelayTask(String taskId, Object taskData, long executeTime) {
this.taskId = taskId;
this.taskData = taskData;
this.executeTime = executeTime;
}
// ... getter/setter
}
/**
* 添加延时任务
*/
public void addTask(String taskId, Object taskData, long delaySeconds) {
long executeTime = Instant.now().plusSeconds(delaySeconds).toEpochMilli();
DelayTask delayTask = new DelayTask(taskId, taskData, executeTime);
String taskStr = JSON.toJSONString(delayTask);
// 将任务添加到List
redisTemplate.opsForList().leftPush(DELAY_QUEUE_KEY, taskStr);
log.info("添加延时任务到List队列,taskId: {}, 执行时间: {}, 任务数据: {}", taskId, executeTime, taskStr);
}
/**
* 消费延时任务
*/
public void consumeTasks() {
long currentTime = System.currentTimeMillis();
long queueSize = redisTemplate.opsForList().size(DELAY_QUEUE_KEY);
if (queueSize == null || queueSize == 0) {
return;
}
// 获取所有任务
List<String> allTasks = redisTemplate.opsForList().range(DELAY_QUEUE_KEY, 0, -1);
List<String> tasksToRemove = new ArrayList<>();
if (allTasks != null) {
for (String taskStr : allTasks) {
try {
DelayTask task = JSON.parseObject(taskStr, DelayTask.class);
if (task != null && task.getExecuteTime() <= currentTime) {
// 任务到期,执行处理逻辑
processTask(task.getTaskData());
// 记录需要删除的任务
tasksToRemove.add(taskStr);
}
} catch (Exception e) {
log.error("解析延时任务失败: {}", taskStr, e);
}
}
// 从队列中移除已处理的任务
for (String taskStr : tasksToRemove) {
redisTemplate.opsForList().remove(DELAY_QUEUE_KEY, 1, taskStr);
}
}
}
}
优点:
- 实现直观,易于理解
- 添加任务速度快O(1)
缺点:
- 查询到期任务需要遍历整个列表,性能差O(N)
- 随着任务数量增加,性能急剧下降
- 不适合大量任务的场景
3. 基于Pub/Sub的延时队列
这种方式结合Timer/Thread Pool和Redis Pub/Sub,到达执行时间时发布消息。创建定时任务,在指定时间后向Redis频道发布消息,消费者通过订阅该频道来接收消息。
实现代码:
@Component
public class PubSubDelayQueue {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedisMessageListenerContainer listenerContainer;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
private static final String DELAY_CHANNEL_PREFIX = "delay_channel:";
/**
* 添加延时任务
*/
public void addTask(String taskId, Object taskData, long delaySeconds) {
String channelName = DELAY_CHANNEL_PREFIX + taskId;
String taskStr = JSON.toJSONString(taskData);
// 创建定时任务,在指定时间后发布消息
java.util.concurrent.ScheduledFuture<?> scheduledFuture = scheduler.schedule(() -> {
log.info("定时任务触发,发布延时消息,taskId: {}, channel: {}, 任务数据: {}", taskId, channelName, taskStr);
redisTemplate.convertAndSend(channelName, taskStr);
}, delaySeconds, TimeUnit.SECONDS);
log.info("添加延时任务到Pub/Sub队列,taskId: {}, 延迟时间: {}秒, 任务数据: {}", taskId, delaySeconds, taskStr);
}
/**
* 订阅指定任务通道
*/
public void subscribeToTask(String taskId, TaskProcessor taskProcessor) {
String channelName = DELAY_CHANNEL_PREFIX + taskId;
ChannelTopic topic = new ChannelTopic(channelName);
listenerContainer.addMessageListener((message, pattern) -> {
String taskData = message.getBody().toString();
log.info("收到延时消息,taskId: {}, channel: {}, 任务数据: {}", taskId, channelName, taskData);
// 执行任务处理逻辑
taskProcessor.process(taskData);
}, topic);
}
@FunctionalInterface
public interface TaskProcessor {
void process(String taskData);
}
}
优点:
- 实时性最好,到达时间立即触发
- 无需轮询,节省资源
- 支持广播模式,可通知多个消费者
缺点:
- 需要在应用层维护定时任务,重启后可能丢失
- 无法持久化,进程重启后未执行的任务会丢失
- 实现相对复杂
4. 基于Stream的延时队列
Redis 5.0新增的Stream数据结构提供了更强大的消息队列功能,支持消费者组、消息确认机制等。
实现代码:
@Component
public class StreamDelayQueue {
@Autowired
private StringRedisTemplate redisTemplate;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
private static final String DELAY_STREAM_KEY = "delay_stream:queue";
private static final String CONSUMER_GROUP = "delay_consumers";
private static final String CONSUMER_NAME = "consumer_1";
/**
* 添加延时任务
*/
public void addTask(String taskId, Object taskData, long delaySeconds) {
String taskStr = JSON.toJSONString(taskData);
// 构建任务信息
Map<String, String> taskInfo = Map.of(
"taskId", taskId,
"taskData", taskStr,
"executeTime", String.valueOf(Instant.now().plusSeconds(delaySeconds).toEpochMilli())
);
// 创建定时任务,到时间后将任务添加到Stream
scheduler.schedule(() -> {
try {
// 将任务添加到Stream
String messageId = redisTemplate.opsForStream().add(DELAY_STREAM_KEY, taskInfo);
log.info("延时任务已添加到Stream,messageId: {}, taskId: {}, taskInfo: {}", messageId, taskId, taskInfo);
} catch (Exception e) {
log.error("添加任务到Stream失败,taskId: {}", taskId, e);
}
}, delaySeconds, TimeUnit.SECONDS);
log.info("添加延时任务到Stream队列,taskId: {}, 延迟时间: {}秒, 任务数据: {}", taskId, delaySeconds, taskStr);
}
/**
* 消费延时任务
*/
public void consumeTasks() {
try {
// 从Stream中读取消息
Map<String, Object> streamMessages = redisTemplate.opsForStream().read(
Consumer.from(CONSUMER_GROUP, CONSUMER_NAME),
StreamReadOptions.empty().count(10).block(1000),
StreamOffset.create(DELAY_STREAM_KEY, ReadOffset.lastConsumed())
);
if (streamMessages != null && !streamMessages.isEmpty()) {
for (Map.Entry<String, Object> entry : streamMessages.entrySet()) {
String messageId = entry.getKey();
Map<String, String> taskInfo = (Map<String, String>) entry.getValue();
log.info("处理Stream中的任务,messageId: {}, taskInfo: {}", messageId, taskInfo);
// 处理任务
processTask(taskInfo.get("taskData"));
// 确认消息已被处理
redisTemplate.opsForStream().acknowledge(DELAY_STREAM_KEY, CONSUMER_GROUP, messageId);
}
}
} catch (Exception e) {
log.error("消费Stream消息时出错", e);
}
}
}
优点:
- 功能最强大,支持消费者组、消息确认机制
- 消息持久化,不会丢失
- 支持多消费者,可扩展性好
- 提供丰富的消息处理功能
缺点:
- Redis版本要求较高(5.0+)
- 实现复杂度最高
- 学习成本较高
方案对比总结
| 方案 | 性能 | 实时性 | 可靠性 | 实现复杂度 | 适用场景 |
|---|---|---|---|---|---|
| Sorted Set | 较好 | 依赖轮询 | 高 | 简单 | 通用场景 |
| List | 差 | 依赖轮询 | 高 | 简单 | 小规模场景 |
| Pub/Sub | 好 | 高 | 低 | 中等 | 实时性要求高 |
| Stream | 好 | 中等 | 最高 | 复杂 | 大规模系统 |
实际应用建议
- 一般业务场景:推荐使用Sorted Set方案,它在性能、可靠性和实现复杂度之间达到了很好的平衡
- 实时性要求极高:可以选择Pub/Sub方案,但要注意数据持久化问题
- 大规模分布式系统:建议使用Stream方案,功能最强大,适合复杂场景
- 小规模临时任务:可以考虑List方案,实现简单
最佳实践
- 合理设置轮询间隔:对于需要轮询的方案,要平衡实时性和系统开销
- 异常处理:确保任务处理异常时有重试机制
- 监控告警:对队列长度、处理延迟等指标进行监控
- 优雅关闭:在应用关闭时正确处理未完成的任务
总结
Redis为我们提供了多种实现延时队列的方式,每种方式都有其适用的场景。在实际项目中,我们应该根据业务需求、性能要求和系统架构来选择合适的方案。
总的来说,Sorted Set方案因其良好的性能表现和简单实现,是大多数场景下的首选。但对于特殊需求,如高实时性或大规模分布式系统,其他方案也有其独特优势。
希望这篇文章能帮助你更好地理解和应用Redis延时队列。如果你有任何问题或想法,欢迎在评论区交流讨论!
关注「服务端技术精选」,获取更多后端技术干货!
我的个人技术博客:www.jiangyi.space