SpringBoot + 消息优先级队列 + 紧急通道:核心业务消息插队处理,保障关键链路
背景:消息队列的优先级挑战
在现代分布式系统中,消息队列被广泛应用于异步处理、解耦和削峰填谷等场景。然而,随着业务的发展,不同类型的消息之间的优先级差异越来越明显:
- 核心业务消息:如支付、订单等关键业务消息,需要优先处理
- 非核心业务消息:如日志、统计等辅助性消息,可以延迟处理
- 紧急消息:如系统告警、异常通知等,需要立即处理
传统的消息队列通常采用先进先出(FIFO)的方式处理消息,无法满足不同优先级消息的处理需求。当系统负载较高时,核心业务消息可能会被非核心消息阻塞,导致关键业务链路出现延迟,影响用户体验和业务连续性。
本文将介绍如何使用 SpringBoot 实现消息优先级队列和紧急通道,让核心业务消息能够插队处理,保障关键链路的顺畅运行。
核心概念
1. 消息优先级
消息优先级是指消息的重要程度,通常分为以下几个级别:
| 优先级 | 级别 | 描述 | 处理策略 |
|---|---|---|---|
| HIGH | 0 | 紧急消息 | 立即处理,优先于所有其他消息 |
| MEDIUM | 1 | 核心业务消息 | 优先于低优先级消息 |
| LOW | 2 | 非核心业务消息 | 正常处理 |
| LOWEST | 3 | 辅助性消息 | 最后处理 |
2. 优先级队列
优先级队列是一种特殊的队列,它根据消息的优先级决定消息的处理顺序。高优先级的消息会被优先处理,即使它们是后加入队列的。
3. 紧急通道
紧急通道是专门为处理紧急消息而设计的通道,它具有最高的处理优先级,确保紧急消息能够立即被处理,不受其他消息的影响。
4. 消息处理策略
| 策略 | 描述 | 适用场景 |
|---|---|---|
| 严格优先级 | 高优先级消息完全优先于低优先级消息 | 紧急情况,如系统告警 |
| 加权优先级 | 高优先级消息获得更多的处理资源 | 核心业务与非核心业务并存 |
| 抢占式优先级 | 高优先级消息可以抢占正在处理的低优先级消息 | 关键业务场景 |
技术实现
1. 核心实体类
// 消息实体
@Data
@Entity
@Table(name = "message")
public class Message {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "message_id", unique = true, nullable = false, length = 64)
private String messageId; // 消息ID
@Column(name = "topic", nullable = false, length = 128)
private String topic; // 消息主题
@Column(name = "priority", nullable = false)
private Integer priority; // 优先级:0-高,1-中,2-低,3-最低
@Column(name = "is_urgent", nullable = false)
private Boolean isUrgent = false; // 是否为紧急消息
@Column(name = "content", columnDefinition = "TEXT", nullable = false)
private String content; // 消息内容
@Column(name = "status", nullable = false, length = 32)
private String status = "PENDING"; // 状态:PENDING/PROCESSING/SUCCESS/FAILED
@Column(name = "retry_count", nullable = false)
private Integer retryCount = 0; // 重试次数
@Column(name = "max_retry", nullable = false)
private Integer maxRetry = 3; // 最大重试次数
@Column(name = "error_message", length = 1024)
private String errorMessage; // 错误信息
@CreationTimestamp
@Column(name = "create_time", nullable = false)
private Date createTime;
@UpdateTimestamp
@Column(name = "update_time", nullable = false)
private Date updateTime;
@Column(name = "process_time")
private Date processTime; // 处理时间
@Column(name = "process_duration")
private Long processDuration; // 处理耗时(毫秒)
}
// 消息消费记录
@Data
@Entity
@Table(name = "message_consumer_record")
public class MessageConsumerRecord {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "message_id", nullable = false, length = 64)
private String messageId; // 消息ID
@Column(name = "consumer_id", nullable = false, length = 64)
private String consumerId; // 消费者ID
@Column(name = "status", nullable = false, length = 32)
private String status; // 状态:SUCCESS/FAILED
@Column(name = "error_message", length = 1024)
private String errorMessage; // 错误信息
@Column(name = "process_time")
private Date processTime; // 处理时间
@Column(name = "process_duration")
private Long processDuration; // 处理耗时(毫秒)
@CreationTimestamp
@Column(name = "create_time", nullable = false)
private Date createTime;
}
2. 消息优先级队列服务
@Service
@Slf4j
public class PriorityMessageQueueService {
@Autowired
private MessageRepository messageRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Value("${message.queue.priority.high}")
private String highPriorityQueue;
@Value("${message.queue.priority.medium}")
private String mediumPriorityQueue;
@Value("${message.queue.priority.low}")
private String lowPriorityQueue;
@Value("${message.queue.priority.lowest}")
private String lowestPriorityQueue;
@Value("${message.queue.urgent}")
private String urgentQueue;
/**
* 发送消息
*/
public void sendMessage(Message message) {
// 1. 保存消息到数据库
messageRepository.save(message);
// 2. 根据优先级和是否为紧急消息,发送到不同的队列
if (message.getIsUrgent()) {
// 紧急消息发送到紧急通道
sendToUrgentQueue(message);
} else {
// 根据优先级发送到对应队列
sendToPriorityQueue(message);
}
log.info("Message sent: {}, priority: {}, urgent: {}",
message.getMessageId(), message.getPriority(), message.getIsUrgent());
}
/**
* 发送到紧急通道
*/
private void sendToUrgentQueue(Message message) {
String messageJson = JSON.toJSONString(message);
redisTemplate.opsForList().leftPush(urgentQueue, messageJson);
}
/**
* 发送到优先级队列
*/
private void sendToPriorityQueue(Message message) {
String queueName = getQueueNameByPriority(message.getPriority());
String messageJson = JSON.toJSONString(message);
redisTemplate.opsForList().leftPush(queueName, messageJson);
}
/**
* 根据优先级获取队列名称
*/
private String getQueueNameByPriority(Integer priority) {
switch (priority) {
case 0:
return highPriorityQueue;
case 1:
return mediumPriorityQueue;
case 2:
return lowPriorityQueue;
case 3:
return lowestPriorityQueue;
default:
return lowPriorityQueue;
}
}
/**
* 接收消息(按优先级顺序)
*/
public Message receiveMessage() {
// 1. 首先检查紧急通道
Message urgentMessage = receiveFromUrgentQueue();
if (urgentMessage != null) {
return urgentMessage;
}
// 2. 检查高优先级队列
Message highPriorityMessage = receiveFromQueue(highPriorityQueue);
if (highPriorityMessage != null) {
return highPriorityMessage;
}
// 3. 检查中优先级队列
Message mediumPriorityMessage = receiveFromQueue(mediumPriorityQueue);
if (mediumPriorityMessage != null) {
return mediumPriorityMessage;
}
// 4. 检查低优先级队列
Message lowPriorityMessage = receiveFromQueue(lowPriorityQueue);
if (lowPriorityMessage != null) {
return lowPriorityMessage;
}
// 5. 检查最低优先级队列
return receiveFromQueue(lowestPriorityQueue);
}
/**
* 从紧急通道接收消息
*/
private Message receiveFromUrgentQueue() {
return receiveFromQueue(urgentQueue);
}
/**
* 从指定队列接收消息
*/
private Message receiveFromQueue(String queueName) {
Object messageJson = redisTemplate.opsForList().rightPop(queueName);
if (messageJson != null) {
try {
return JSON.parseObject(messageJson.toString(), Message.class);
} catch (Exception e) {
log.error("Failed to parse message: {}", messageJson, e);
}
}
return null;
}
/**
* 批量发送消息
*/
public void sendMessages(List<Message> messages) {
for (Message message : messages) {
sendMessage(message);
}
}
/**
* 获取队列长度
*/
public Map<String, Long> getQueueLengths() {
Map<String, Long> lengths = new HashMap<>();
lengths.put("urgent", redisTemplate.opsForList().size(urgentQueue));
lengths.put("high", redisTemplate.opsForList().size(highPriorityQueue));
lengths.put("medium", redisTemplate.opsForList().size(mediumPriorityQueue));
lengths.put("low", redisTemplate.opsForList().size(lowPriorityQueue));
lengths.put("lowest", redisTemplate.opsForList().size(lowestPriorityQueue));
return lengths;
}
/**
* 清空队列
*/
public void clearQueues() {
redisTemplate.delete(urgentQueue);
redisTemplate.delete(highPriorityQueue);
redisTemplate.delete(mediumPriorityQueue);
redisTemplate.delete(lowPriorityQueue);
redisTemplate.delete(lowestPriorityQueue);
log.info("All queues cleared");
}
}
3. 紧急通道服务
@Service
@Slf4j
public class UrgentMessageService {
@Autowired
private PriorityMessageQueueService messageQueueService;
@Autowired
private MessageRepository messageRepository;
/**
* 发送紧急消息
*/
public void sendUrgentMessage(String topic, String content) {
Message message = createUrgentMessage(topic, content);
messageQueueService.sendMessage(message);
log.info("Urgent message sent: {}", message.getMessageId());
}
/**
* 创建紧急消息
*/
private Message createUrgentMessage(String topic, String content) {
Message message = new Message();
message.setMessageId(UUID.randomUUID().toString());
message.setTopic(topic);
message.setPriority(0); // 最高优先级
message.setIsUrgent(true); // 标记为紧急消息
message.setContent(content);
message.setStatus("PENDING");
message.setRetryCount(0);
message.setMaxRetry(3);
return message;
}
/**
* 批量发送紧急消息
*/
public void sendUrgentMessages(List<UrgentMessageRequest> requests) {
List<Message> messages = new ArrayList<>();
for (UrgentMessageRequest request : requests) {
Message message = createUrgentMessage(request.getTopic(), request.getContent());
messages.add(message);
}
messageQueueService.sendMessages(messages);
log.info("Sent {} urgent messages", messages.size());
}
/**
* 将普通消息升级为紧急消息
*/
public void upgradeToUrgentMessage(String messageId) {
Optional<Message> messageOptional = messageRepository.findByMessageId(messageId);
if (messageOptional.isPresent()) {
Message message = messageOptional.get();
if (!message.getIsUrgent()) {
message.setIsUrgent(true);
message.setPriority(0);
messageRepository.save(message);
// 重新发送到紧急通道
messageQueueService.sendMessage(message);
log.info("Message upgraded to urgent: {}", messageId);
}
}
}
/**
* 获取紧急消息列表
*/
public List<Message> getUrgentMessages() {
return messageRepository.findByIsUrgentTrueAndStatusNotIn(Arrays.asList("SUCCESS", "FAILED"));
}
/**
* 紧急消息请求
*/
@Data
public static class UrgentMessageRequest {
private String topic;
private String content;
}
}
4. 消息消费者服务
@Service
@Slf4j
public class MessageConsumerService {
@Autowired
private PriorityMessageQueueService messageQueueService;
@Autowired
private MessageRepository messageRepository;
@Autowired
private MessageConsumerRecordRepository recordRepository;
@Value("${message.consumer.id}")
private String consumerId;
@Value("${message.consumer.batch-size:10}")
private Integer batchSize;
@Value("${message.consumer.threads:5}")
private Integer consumerThreads;
private final ExecutorService executorService;
public MessageConsumerService() {
this.executorService = Executors.newFixedThreadPool(5);
}
/**
* 启动消费者
*/
@PostConstruct
public void startConsumers() {
for (int i = 0; i < consumerThreads; i++) {
executorService.submit(() -> {
while (true) {
try {
consumeMessage();
} catch (Exception e) {
log.error("Consumer error", e);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
});
}
log.info("Started {} consumer threads", consumerThreads);
}
/**
* 消费消息
*/
private void consumeMessage() {
// 1. 接收消息
Message message = messageQueueService.receiveMessage();
if (message == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return;
}
// 2. 更新消息状态为处理中
updateMessageStatus(message, "PROCESSING");
// 3. 处理消息
long startTime = System.currentTimeMillis();
boolean success = processMessage(message);
long processDuration = System.currentTimeMillis() - startTime;
// 4. 记录消费记录
recordMessageConsumption(message, success, processDuration);
// 5. 更新消息状态
if (success) {
updateMessageStatus(message, "SUCCESS", processDuration);
} else {
handleMessageFailure(message);
}
}
/**
* 处理消息
*/
private boolean processMessage(Message message) {
try {
// 这里实现具体的消息处理逻辑
// 示例:根据消息主题调用不同的处理方法
switch (message.getTopic()) {
case "payment":
processPaymentMessage(message);
break;
case "order":
processOrderMessage(message);
break;
case "notification":
processNotificationMessage(message);
break;
case "alert":
processAlertMessage(message);
break;
default:
processDefaultMessage(message);
break;
}
return true;
} catch (Exception e) {
log.error("Failed to process message: {}", message.getMessageId(), e);
return false;
}
}
/**
* 处理支付消息
*/
private void processPaymentMessage(Message message) {
// 处理支付相关逻辑
log.info("Processing payment message: {}", message.getMessageId());
// 模拟处理时间
simulateProcessingTime(100);
}
/**
* 处理订单消息
*/
private void processOrderMessage(Message message) {
// 处理订单相关逻辑
log.info("Processing order message: {}", message.getMessageId());
// 模拟处理时间
simulateProcessingTime(200);
}
/**
* 处理通知消息
*/
private void processNotificationMessage(Message message) {
// 处理通知相关逻辑
log.info("Processing notification message: {}", message.getMessageId());
// 模拟处理时间
simulateProcessingTime(50);
}
/**
* 处理告警消息
*/
private void processAlertMessage(Message message) {
// 处理告警相关逻辑
log.info("Processing alert message: {}", message.getMessageId());
// 模拟处理时间
simulateProcessingTime(50);
}
/**
* 处理默认消息
*/
private void processDefaultMessage(Message message) {
// 处理默认逻辑
log.info("Processing default message: {}", message.getMessageId());
// 模拟处理时间
simulateProcessingTime(100);
}
/**
* 模拟处理时间
*/
private void simulateProcessingTime(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* 更新消息状态
*/
private void updateMessageStatus(Message message, String status) {
updateMessageStatus(message, status, null);
}
/**
* 更新消息状态
*/
private void updateMessageStatus(Message message, String status, Long processDuration) {
message.setStatus(status);
if ("PROCESSING".equals(status)) {
message.setProcessTime(new Date());
} else if ("SUCCESS".equals(status) && processDuration != null) {
message.setProcessDuration(processDuration);
}
messageRepository.save(message);
}
/**
* 处理消息失败
*/
private void handleMessageFailure(Message message) {
message.setRetryCount(message.getRetryCount() + 1);
if (message.getRetryCount() < message.getMaxRetry()) {
// 重新发送到队列
message.setStatus("PENDING");
messageRepository.save(message);
messageQueueService.sendMessage(message);
log.info("Message retried: {}, retry count: {}", message.getMessageId(), message.getRetryCount());
} else {
// 达到最大重试次数,标记为失败
message.setStatus("FAILED");
message.setErrorMessage("Max retry count reached");
messageRepository.save(message);
log.warn("Message failed after max retries: {}", message.getMessageId());
}
}
/**
* 记录消息消费
*/
private void recordMessageConsumption(Message message, boolean success, long processDuration) {
MessageConsumerRecord record = new MessageConsumerRecord();
record.setMessageId(message.getMessageId());
record.setConsumerId(consumerId);
record.setStatus(success ? "SUCCESS" : "FAILED");
record.setProcessTime(new Date());
record.setProcessDuration(processDuration);
recordRepository.save(record);
}
/**
* 停止消费者
*/
@PreDestroy
public void stopConsumers() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
log.info("Stopped consumer threads");
}
/**
* 手动消费消息
*/
public void consumeMessageManually() {
consumeMessage();
}
/**
* 批量消费消息
*/
public void consumeMessagesBatch() {
for (int i = 0; i < batchSize; i++) {
consumeMessage();
}
}
}
5. 消息管理控制器
@RestController
@RequestMapping("/api/message")
@Slf4j
public class MessageController {
@Autowired
private PriorityMessageQueueService messageQueueService;
@Autowired
private UrgentMessageService urgentMessageService;
@Autowired
private MessageConsumerService consumerService;
@Autowired
private MessageRepository messageRepository;
/**
* 发送消息
*/
@PostMapping("/send")
public Result<String> sendMessage(@RequestBody MessageRequest request) {
Message message = createMessage(request);
messageQueueService.sendMessage(message);
return Result.success(message.getMessageId());
}
/**
* 发送紧急消息
*/
@PostMapping("/send/urgent")
public Result<String> sendUrgentMessage(@RequestBody UrgentMessageRequest request) {
urgentMessageService.sendUrgentMessage(request.getTopic(), request.getContent());
return Result.success("紧急消息发送成功");
}
/**
* 批量发送消息
*/
@PostMapping("/send/batch")
public Result<String> sendMessagesBatch(@RequestBody List<MessageRequest> requests) {
List<Message> messages = new ArrayList<>();
for (MessageRequest request : requests) {
Message message = createMessage(request);
messages.add(message);
}
messageQueueService.sendMessages(messages);
return Result.success("批量消息发送成功");
}
/**
* 批量发送紧急消息
*/
@PostMapping("/send/urgent/batch")
public Result<String> sendUrgentMessagesBatch(@RequestBody List<UrgentMessageRequest> requests) {
urgentMessageService.sendUrgentMessages(requests);
return Result.success("批量紧急消息发送成功");
}
/**
* 将消息升级为紧急消息
*/
@PutMapping("/upgrade/{messageId}")
public Result<String> upgradeToUrgentMessage(@PathVariable String messageId) {
urgentMessageService.upgradeToUrgentMessage(messageId);
return Result.success("消息已升级为紧急消息");
}
/**
* 获取队列状态
*/
@GetMapping("/queue/status")
public Result<Map<String, Long>> getQueueStatus() {
Map<String, Long> queueLengths = messageQueueService.getQueueLengths();
return Result.success(queueLengths);
}
/**
* 获取消息列表
*/
@GetMapping("/list")
public Result<List<Message>> getMessages() {
List<Message> messages = messageRepository.findAll();
return Result.success(messages);
}
/**
* 获取紧急消息列表
*/
@GetMapping("/urgent/list")
public Result<List<Message>> getUrgentMessages() {
List<Message> messages = urgentMessageService.getUrgentMessages();
return Result.success(messages);
}
/**
* 获取消息详情
*/
@GetMapping("/{messageId}")
public Result<Message> getMessage(@PathVariable String messageId) {
Optional<Message> messageOptional = messageRepository.findByMessageId(messageId);
if (!messageOptional.isPresent()) {
return Result.error("消息不存在");
}
return Result.success(messageOptional.get());
}
/**
* 手动消费消息
*/
@PostMapping("/consume")
public Result<String> consumeMessage() {
consumerService.consumeMessageManually();
return Result.success("消息消费成功");
}
/**
* 批量消费消息
*/
@PostMapping("/consume/batch")
public Result<String> consumeMessagesBatch() {
consumerService.consumeMessagesBatch();
return Result.success("批量消息消费成功");
}
/**
* 清空队列
*/
@PostMapping("/queue/clear")
public Result<String> clearQueues() {
messageQueueService.clearQueues();
return Result.success("队列已清空");
}
/**
* 创建消息
*/
private Message createMessage(MessageRequest request) {
Message message = new Message();
message.setMessageId(UUID.randomUUID().toString());
message.setTopic(request.getTopic());
message.setPriority(request.getPriority() != null ? request.getPriority() : 2); // 默认低优先级
message.setIsUrgent(false);
message.setContent(request.getContent());
message.setStatus("PENDING");
message.setRetryCount(0);
message.setMaxRetry(3);
return message;
}
/**
* 消息请求
*/
@Data
public static class MessageRequest {
private String topic;
private Integer priority;
private String content;
}
/**
* 紧急消息请求
*/
@Data
public static class UrgentMessageRequest {
private String topic;
private String content;
}
}
核心流程
1. 消息发送流程
- 接收发送请求:客户端发送消息请求
- 创建消息:根据请求创建消息对象
- 优先级判断:判断消息是否为紧急消息或设置优先级
- 保存消息:将消息保存到数据库
- 发送到队列:根据优先级和是否为紧急消息,发送到对应的队列
- 返回结果:返回消息发送结果
2. 消息消费流程
- 按优先级接收:按紧急通道 > 高优先级 > 中优先级 > 低优先级 > 最低优先级的顺序接收消息
- 更新状态:将消息状态更新为处理中
- 处理消息:根据消息主题调用对应的处理方法
- 记录消费:记录消息消费记录
- 更新状态:根据处理结果更新消息状态
- 失败重试:处理失败时进行重试,达到最大重试次数后标记为失败
3. 紧急消息处理流程
- 接收紧急消息:接收紧急消息请求
- 创建紧急消息:创建标记为紧急的消息
- 发送到紧急通道:将消息发送到紧急通道
- 优先处理:紧急通道的消息会被优先处理
- 实时响应:确保紧急消息能够立即得到处理
技术要点
1. 优先级队列实现
- 多级队列:使用不同的 Redis 列表作为不同优先级的队列
- 优先级顺序:紧急通道 > 高优先级 > 中优先级 > 低优先级 > 最低优先级
- 原子操作:使用 Redis 的原子操作确保消息的可靠发送和接收
- 批量处理:支持批量发送和消费消息,提高处理效率
2. 紧急通道设计
- 独立通道:紧急消息有专门的通道,不受其他消息影响
- 最高优先级:紧急通道的消息优先级最高,确保立即处理
- 消息升级:支持将普通消息升级为紧急消息
- 实时监控:提供紧急消息的监控和管理
3. 消息处理机制
- 线程池:使用线程池并行处理消息,提高处理能力
- 错误处理:完善的错误处理和重试机制
- 消费记录:详细的消费记录,便于问题排查
- 状态管理:完整的消息状态管理,确保消息处理的可追踪性
4. 性能优化
- 批量操作:批量发送和消费消息,减少网络开销
- 异步处理:消息发送和消费都是异步的,不阻塞主线程
- 缓存优化:使用 Redis 作为消息队列,提高消息处理速度
- 并发控制:使用线程池和并发集合,提高并发处理能力
最佳实践
1. 消息优先级设置
- 核心业务:如支付、订单等设置为高优先级
- 普通业务:如通知、日志等设置为低优先级
- 紧急情况:如系统告警、异常通知等使用紧急通道
2. 队列管理
- 监控队列长度:定期监控各队列的长度,及时发现异常
- 合理设置线程数:根据系统资源和消息量设置合适的消费者线程数
- 队列清理:定期清理过期消息,避免队列积压
3. 错误处理
- 重试机制:设置合理的重试次数和间隔
- 死信队列:对于多次重试失败的消息,放入死信队列进行人工处理
- 告警机制:当消息处理失败率超过阈值时,发送告警
4. 监控告警
- 消息处理延迟:监控消息从发送到处理的延迟时间
- 队列积压:监控队列积压情况,及时扩容
- 处理成功率:监控消息处理的成功率
- 系统资源:监控消费者的 CPU、内存使用情况
常见问题
1. 消息丢失
问题:消息在发送或消费过程中丢失
解决方案:
- 使用 Redis 的持久化机制
- 消息发送后立即保存到数据库
- 实现消息确认机制
- 定期对账和消息重发
2. 队列积压
问题:消息队列出现积压,处理速度跟不上发送速度
解决方案:
- 增加消费者线程数
- 优化消息处理逻辑,提高处理速度
- 临时扩容,增加消费者实例
- 对非核心消息进行限流
3. 优先级反转
问题:低优先级消息长时间无法得到处理
解决方案:
- 实现加权优先级,给低优先级消息一定的处理机会
- 设置队列长度上限,避免高优先级消息完全阻塞低优先级消息
- 定期处理低优先级队列,确保消息不会永远被阻塞
4. 系统负载过高
问题:消息处理导致系统负载过高
解决方案:
- 合理设置消费者线程数
- 对消息处理进行限流
- 实现熔断机制,当系统负载过高时暂停消费
- 优化消息处理逻辑,减少资源消耗
代码优化建议
1. 优先级队列优化
/**
* 使用加权轮询算法优化优先级队列
*/
public Message receiveMessage() {
// 1. 首先检查紧急通道
Message urgentMessage = receiveFromUrgentQueue();
if (urgentMessage != null) {
return urgentMessage;
}
// 2. 使用加权轮询处理其他优先级队列
long currentTime = System.currentTimeMillis();
int[] weights = {5, 3, 1, 1}; // 高:中:低:最低 = 5:3:1:1
String[] queues = {highPriorityQueue, mediumPriorityQueue, lowPriorityQueue, lowestPriorityQueue};
for (int i = 0; i < queues.length; i++) {
if (shouldProcessQueue(queues[i], weights[i], currentTime)) {
Message message = receiveFromQueue(queues[i]);
if (message != null) {
return message;
}
}
}
return null;
}
/**
* 判断是否应该处理队列
*/
private boolean shouldProcessQueue(String queue, int weight, long currentTime) {
// 实现加权轮询逻辑
// 示例:基于时间和权重计算
return true;
}
2. 消息处理优化
/**
* 使用 CompletableFuture 优化消息处理
*/
private CompletableFuture<Boolean> processMessageAsync(Message message) {
return CompletableFuture.supplyAsync(() -> {
try {
// 处理消息
processMessage(message);
return true;
} catch (Exception e) {
log.error("Failed to process message: {}", message.getMessageId(), e);
return false;
}
});
}
/**
* 批量处理消息
*/
private void processMessagesBatch(List<Message> messages) {
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
for (Message message : messages) {
CompletableFuture<Boolean> future = processMessageAsync(message);
futures.add(future);
}
// 等待所有处理完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 处理结果
for (int i = 0; i < messages.size(); i++) {
Message message = messages.get(i);
boolean success = futures.get(i).join();
// 更新消息状态
}
}
3. 监控优化
/**
* 实现消息处理监控
*/
@Scheduled(fixedRate = 60000) // 每分钟监控一次
public void monitorMessageProcessing() {
// 1. 监控队列长度
Map<String, Long> queueLengths = messageQueueService.getQueueLengths();
log.info("Queue lengths: {}", queueLengths);
// 2. 监控消息处理延迟
List<Message> pendingMessages = messageRepository.findByStatus("PENDING");
for (Message message : pendingMessages) {
long delay = System.currentTimeMillis() - message.getCreateTime().getTime();
if (delay > 5 * 60 * 1000) { // 超过5分钟未处理
log.warn("Message pending for too long: {}, delay: {}ms", message.getMessageId(), delay);
}
}
// 3. 监控处理成功率
List<Message> failedMessages = messageRepository.findByStatus("FAILED");
int totalMessages = messageRepository.findAll().size();
double failureRate = totalMessages > 0 ? failedMessages.size() * 100.0 / totalMessages : 0;
if (failureRate > 5) { // 失败率超过5%
log.warn("Message failure rate too high: {}%", failureRate);
}
}
互动话题
- 你在实际项目中遇到过哪些消息队列的优先级问题?是如何解决的?
- 对于紧急消息,你有什么更好的处理策略?
- 在分布式环境下,如何确保消息优先级的一致性?
- 你认为消息队列的优先级设计还有哪些可以改进的地方?
欢迎在评论区交流讨论!
公众号:服务端技术精选,关注最新技术动态,分享实用技巧,提升开发效率。
标题:SpringBoot + 消息优先级队列 + 紧急通道:核心业务消息插队处理,保障关键链路
作者:jiangyi
地址:http://jiangyi.space/articles/2026/03/22/1774081175970.html
公众号:服务端技术精选
- 背景:消息队列的优先级挑战
- 核心概念
- 1. 消息优先级
- 2. 优先级队列
- 3. 紧急通道
- 4. 消息处理策略
- 技术实现
- 1. 核心实体类
- 2. 消息优先级队列服务
- 3. 紧急通道服务
- 4. 消息消费者服务
- 5. 消息管理控制器
- 核心流程
- 1. 消息发送流程
- 2. 消息消费流程
- 3. 紧急消息处理流程
- 技术要点
- 1. 优先级队列实现
- 2. 紧急通道设计
- 3. 消息处理机制
- 4. 性能优化
- 最佳实践
- 1. 消息优先级设置
- 2. 队列管理
- 3. 错误处理
- 4. 监控告警
- 常见问题
- 1. 消息丢失
- 2. 队列积压
- 3. 优先级反转
- 4. 系统负载过高
- 代码优化建议
- 1. 优先级队列优化
- 2. 消息处理优化
- 3. 监控优化
- 互动话题
评论
0 评论