SpringBoot + 消息优先级队列 + 紧急通道:核心业务消息插队处理,保障关键链路

背景:消息队列的优先级挑战

在现代分布式系统中,消息队列被广泛应用于异步处理、解耦和削峰填谷等场景。然而,随着业务的发展,不同类型的消息之间的优先级差异越来越明显:

  • 核心业务消息:如支付、订单等关键业务消息,需要优先处理
  • 非核心业务消息:如日志、统计等辅助性消息,可以延迟处理
  • 紧急消息:如系统告警、异常通知等,需要立即处理

传统的消息队列通常采用先进先出(FIFO)的方式处理消息,无法满足不同优先级消息的处理需求。当系统负载较高时,核心业务消息可能会被非核心消息阻塞,导致关键业务链路出现延迟,影响用户体验和业务连续性。

本文将介绍如何使用 SpringBoot 实现消息优先级队列和紧急通道,让核心业务消息能够插队处理,保障关键链路的顺畅运行。

核心概念

1. 消息优先级

消息优先级是指消息的重要程度,通常分为以下几个级别:

优先级级别描述处理策略
HIGH0紧急消息立即处理,优先于所有其他消息
MEDIUM1核心业务消息优先于低优先级消息
LOW2非核心业务消息正常处理
LOWEST3辅助性消息最后处理

2. 优先级队列

优先级队列是一种特殊的队列,它根据消息的优先级决定消息的处理顺序。高优先级的消息会被优先处理,即使它们是后加入队列的。

3. 紧急通道

紧急通道是专门为处理紧急消息而设计的通道,它具有最高的处理优先级,确保紧急消息能够立即被处理,不受其他消息的影响。

4. 消息处理策略

策略描述适用场景
严格优先级高优先级消息完全优先于低优先级消息紧急情况,如系统告警
加权优先级高优先级消息获得更多的处理资源核心业务与非核心业务并存
抢占式优先级高优先级消息可以抢占正在处理的低优先级消息关键业务场景

技术实现

1. 核心实体类

// 消息实体
@Data
@Entity
@Table(name = "message")
public class Message {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "message_id", unique = true, nullable = false, length = 64)
    private String messageId; // 消息ID
    
    @Column(name = "topic", nullable = false, length = 128)
    private String topic; // 消息主题
    
    @Column(name = "priority", nullable = false)
    private Integer priority; // 优先级:0-高,1-中,2-低,3-最低
    
    @Column(name = "is_urgent", nullable = false)
    private Boolean isUrgent = false; // 是否为紧急消息
    
    @Column(name = "content", columnDefinition = "TEXT", nullable = false)
    private String content; // 消息内容
    
    @Column(name = "status", nullable = false, length = 32)
    private String status = "PENDING"; // 状态:PENDING/PROCESSING/SUCCESS/FAILED
    
    @Column(name = "retry_count", nullable = false)
    private Integer retryCount = 0; // 重试次数
    
    @Column(name = "max_retry", nullable = false)
    private Integer maxRetry = 3; // 最大重试次数
    
    @Column(name = "error_message", length = 1024)
    private String errorMessage; // 错误信息
    
    @CreationTimestamp
    @Column(name = "create_time", nullable = false)
    private Date createTime;
    
    @UpdateTimestamp
    @Column(name = "update_time", nullable = false)
    private Date updateTime;
    
    @Column(name = "process_time")
    private Date processTime; // 处理时间
    
    @Column(name = "process_duration")
    private Long processDuration; // 处理耗时(毫秒)
}

// 消息消费记录
@Data
@Entity
@Table(name = "message_consumer_record")
public class MessageConsumerRecord {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "message_id", nullable = false, length = 64)
    private String messageId; // 消息ID
    
    @Column(name = "consumer_id", nullable = false, length = 64)
    private String consumerId; // 消费者ID
    
    @Column(name = "status", nullable = false, length = 32)
    private String status; // 状态:SUCCESS/FAILED
    
    @Column(name = "error_message", length = 1024)
    private String errorMessage; // 错误信息
    
    @Column(name = "process_time")
    private Date processTime; // 处理时间
    
    @Column(name = "process_duration")
    private Long processDuration; // 处理耗时(毫秒)
    
    @CreationTimestamp
    @Column(name = "create_time", nullable = false)
    private Date createTime;
}

2. 消息优先级队列服务

@Service
@Slf4j
public class PriorityMessageQueueService {
    
    @Autowired
    private MessageRepository messageRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Value("${message.queue.priority.high}")
    private String highPriorityQueue;
    
    @Value("${message.queue.priority.medium}")
    private String mediumPriorityQueue;
    
    @Value("${message.queue.priority.low}")
    private String lowPriorityQueue;
    
    @Value("${message.queue.priority.lowest}")
    private String lowestPriorityQueue;
    
    @Value("${message.queue.urgent}")
    private String urgentQueue;
    
    /**
     * 发送消息
     */
    public void sendMessage(Message message) {
        // 1. 保存消息到数据库
        messageRepository.save(message);
        
        // 2. 根据优先级和是否为紧急消息,发送到不同的队列
        if (message.getIsUrgent()) {
            // 紧急消息发送到紧急通道
            sendToUrgentQueue(message);
        } else {
            // 根据优先级发送到对应队列
            sendToPriorityQueue(message);
        }
        
        log.info("Message sent: {}, priority: {}, urgent: {}", 
                message.getMessageId(), message.getPriority(), message.getIsUrgent());
    }
    
    /**
     * 发送到紧急通道
     */
    private void sendToUrgentQueue(Message message) {
        String messageJson = JSON.toJSONString(message);
        redisTemplate.opsForList().leftPush(urgentQueue, messageJson);
    }
    
    /**
     * 发送到优先级队列
     */
    private void sendToPriorityQueue(Message message) {
        String queueName = getQueueNameByPriority(message.getPriority());
        String messageJson = JSON.toJSONString(message);
        redisTemplate.opsForList().leftPush(queueName, messageJson);
    }
    
    /**
     * 根据优先级获取队列名称
     */
    private String getQueueNameByPriority(Integer priority) {
        switch (priority) {
            case 0:
                return highPriorityQueue;
            case 1:
                return mediumPriorityQueue;
            case 2:
                return lowPriorityQueue;
            case 3:
                return lowestPriorityQueue;
            default:
                return lowPriorityQueue;
        }
    }
    
    /**
     * 接收消息(按优先级顺序)
     */
    public Message receiveMessage() {
        // 1. 首先检查紧急通道
        Message urgentMessage = receiveFromUrgentQueue();
        if (urgentMessage != null) {
            return urgentMessage;
        }
        
        // 2. 检查高优先级队列
        Message highPriorityMessage = receiveFromQueue(highPriorityQueue);
        if (highPriorityMessage != null) {
            return highPriorityMessage;
        }
        
        // 3. 检查中优先级队列
        Message mediumPriorityMessage = receiveFromQueue(mediumPriorityQueue);
        if (mediumPriorityMessage != null) {
            return mediumPriorityMessage;
        }
        
        // 4. 检查低优先级队列
        Message lowPriorityMessage = receiveFromQueue(lowPriorityQueue);
        if (lowPriorityMessage != null) {
            return lowPriorityMessage;
        }
        
        // 5. 检查最低优先级队列
        return receiveFromQueue(lowestPriorityQueue);
    }
    
    /**
     * 从紧急通道接收消息
     */
    private Message receiveFromUrgentQueue() {
        return receiveFromQueue(urgentQueue);
    }
    
    /**
     * 从指定队列接收消息
     */
    private Message receiveFromQueue(String queueName) {
        Object messageJson = redisTemplate.opsForList().rightPop(queueName);
        if (messageJson != null) {
            try {
                return JSON.parseObject(messageJson.toString(), Message.class);
            } catch (Exception e) {
                log.error("Failed to parse message: {}", messageJson, e);
            }
        }
        return null;
    }
    
    /**
     * 批量发送消息
     */
    public void sendMessages(List<Message> messages) {
        for (Message message : messages) {
            sendMessage(message);
        }
    }
    
    /**
     * 获取队列长度
     */
    public Map<String, Long> getQueueLengths() {
        Map<String, Long> lengths = new HashMap<>();
        lengths.put("urgent", redisTemplate.opsForList().size(urgentQueue));
        lengths.put("high", redisTemplate.opsForList().size(highPriorityQueue));
        lengths.put("medium", redisTemplate.opsForList().size(mediumPriorityQueue));
        lengths.put("low", redisTemplate.opsForList().size(lowPriorityQueue));
        lengths.put("lowest", redisTemplate.opsForList().size(lowestPriorityQueue));
        return lengths;
    }
    
    /**
     * 清空队列
     */
    public void clearQueues() {
        redisTemplate.delete(urgentQueue);
        redisTemplate.delete(highPriorityQueue);
        redisTemplate.delete(mediumPriorityQueue);
        redisTemplate.delete(lowPriorityQueue);
        redisTemplate.delete(lowestPriorityQueue);
        log.info("All queues cleared");
    }
}

3. 紧急通道服务

@Service
@Slf4j
public class UrgentMessageService {
    
    @Autowired
    private PriorityMessageQueueService messageQueueService;
    
    @Autowired
    private MessageRepository messageRepository;
    
    /**
     * 发送紧急消息
     */
    public void sendUrgentMessage(String topic, String content) {
        Message message = createUrgentMessage(topic, content);
        messageQueueService.sendMessage(message);
        log.info("Urgent message sent: {}", message.getMessageId());
    }
    
    /**
     * 创建紧急消息
     */
    private Message createUrgentMessage(String topic, String content) {
        Message message = new Message();
        message.setMessageId(UUID.randomUUID().toString());
        message.setTopic(topic);
        message.setPriority(0); // 最高优先级
        message.setIsUrgent(true); // 标记为紧急消息
        message.setContent(content);
        message.setStatus("PENDING");
        message.setRetryCount(0);
        message.setMaxRetry(3);
        return message;
    }
    
    /**
     * 批量发送紧急消息
     */
    public void sendUrgentMessages(List<UrgentMessageRequest> requests) {
        List<Message> messages = new ArrayList<>();
        for (UrgentMessageRequest request : requests) {
            Message message = createUrgentMessage(request.getTopic(), request.getContent());
            messages.add(message);
        }
        messageQueueService.sendMessages(messages);
        log.info("Sent {} urgent messages", messages.size());
    }
    
    /**
     * 将普通消息升级为紧急消息
     */
    public void upgradeToUrgentMessage(String messageId) {
        Optional<Message> messageOptional = messageRepository.findByMessageId(messageId);
        if (messageOptional.isPresent()) {
            Message message = messageOptional.get();
            if (!message.getIsUrgent()) {
                message.setIsUrgent(true);
                message.setPriority(0);
                messageRepository.save(message);
                
                // 重新发送到紧急通道
                messageQueueService.sendMessage(message);
                log.info("Message upgraded to urgent: {}", messageId);
            }
        }
    }
    
    /**
     * 获取紧急消息列表
     */
    public List<Message> getUrgentMessages() {
        return messageRepository.findByIsUrgentTrueAndStatusNotIn(Arrays.asList("SUCCESS", "FAILED"));
    }
    
    /**
     * 紧急消息请求
     */
    @Data
    public static class UrgentMessageRequest {
        private String topic;
        private String content;
    }
}

4. 消息消费者服务

@Service
@Slf4j
public class MessageConsumerService {
    
    @Autowired
    private PriorityMessageQueueService messageQueueService;
    
    @Autowired
    private MessageRepository messageRepository;
    
    @Autowired
    private MessageConsumerRecordRepository recordRepository;
    
    @Value("${message.consumer.id}")
    private String consumerId;
    
    @Value("${message.consumer.batch-size:10}")
    private Integer batchSize;
    
    @Value("${message.consumer.threads:5}")
    private Integer consumerThreads;
    
    private final ExecutorService executorService;
    
    public MessageConsumerService() {
        this.executorService = Executors.newFixedThreadPool(5);
    }
    
    /**
     * 启动消费者
     */
    @PostConstruct
    public void startConsumers() {
        for (int i = 0; i < consumerThreads; i++) {
            executorService.submit(() -> {
                while (true) {
                    try {
                        consumeMessage();
                    } catch (Exception e) {
                        log.error("Consumer error", e);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            });
        }
        log.info("Started {} consumer threads", consumerThreads);
    }
    
    /**
     * 消费消息
     */
    private void consumeMessage() {
        // 1. 接收消息
        Message message = messageQueueService.receiveMessage();
        if (message == null) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return;
        }
        
        // 2. 更新消息状态为处理中
        updateMessageStatus(message, "PROCESSING");
        
        // 3. 处理消息
        long startTime = System.currentTimeMillis();
        boolean success = processMessage(message);
        long processDuration = System.currentTimeMillis() - startTime;
        
        // 4. 记录消费记录
        recordMessageConsumption(message, success, processDuration);
        
        // 5. 更新消息状态
        if (success) {
            updateMessageStatus(message, "SUCCESS", processDuration);
        } else {
            handleMessageFailure(message);
        }
    }
    
    /**
     * 处理消息
     */
    private boolean processMessage(Message message) {
        try {
            // 这里实现具体的消息处理逻辑
            // 示例:根据消息主题调用不同的处理方法
            switch (message.getTopic()) {
                case "payment":
                    processPaymentMessage(message);
                    break;
                case "order":
                    processOrderMessage(message);
                    break;
                case "notification":
                    processNotificationMessage(message);
                    break;
                case "alert":
                    processAlertMessage(message);
                    break;
                default:
                    processDefaultMessage(message);
                    break;
            }
            return true;
        } catch (Exception e) {
            log.error("Failed to process message: {}", message.getMessageId(), e);
            return false;
        }
    }
    
    /**
     * 处理支付消息
     */
    private void processPaymentMessage(Message message) {
        // 处理支付相关逻辑
        log.info("Processing payment message: {}", message.getMessageId());
        // 模拟处理时间
        simulateProcessingTime(100);
    }
    
    /**
     * 处理订单消息
     */
    private void processOrderMessage(Message message) {
        // 处理订单相关逻辑
        log.info("Processing order message: {}", message.getMessageId());
        // 模拟处理时间
        simulateProcessingTime(200);
    }
    
    /**
     * 处理通知消息
     */
    private void processNotificationMessage(Message message) {
        // 处理通知相关逻辑
        log.info("Processing notification message: {}", message.getMessageId());
        // 模拟处理时间
        simulateProcessingTime(50);
    }
    
    /**
     * 处理告警消息
     */
    private void processAlertMessage(Message message) {
        // 处理告警相关逻辑
        log.info("Processing alert message: {}", message.getMessageId());
        // 模拟处理时间
        simulateProcessingTime(50);
    }
    
    /**
     * 处理默认消息
     */
    private void processDefaultMessage(Message message) {
        // 处理默认逻辑
        log.info("Processing default message: {}", message.getMessageId());
        // 模拟处理时间
        simulateProcessingTime(100);
    }
    
    /**
     * 模拟处理时间
     */
    private void simulateProcessingTime(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    /**
     * 更新消息状态
     */
    private void updateMessageStatus(Message message, String status) {
        updateMessageStatus(message, status, null);
    }
    
    /**
     * 更新消息状态
     */
    private void updateMessageStatus(Message message, String status, Long processDuration) {
        message.setStatus(status);
        if ("PROCESSING".equals(status)) {
            message.setProcessTime(new Date());
        } else if ("SUCCESS".equals(status) && processDuration != null) {
            message.setProcessDuration(processDuration);
        }
        messageRepository.save(message);
    }
    
    /**
     * 处理消息失败
     */
    private void handleMessageFailure(Message message) {
        message.setRetryCount(message.getRetryCount() + 1);
        
        if (message.getRetryCount() < message.getMaxRetry()) {
            // 重新发送到队列
            message.setStatus("PENDING");
            messageRepository.save(message);
            messageQueueService.sendMessage(message);
            log.info("Message retried: {}, retry count: {}", message.getMessageId(), message.getRetryCount());
        } else {
            // 达到最大重试次数,标记为失败
            message.setStatus("FAILED");
            message.setErrorMessage("Max retry count reached");
            messageRepository.save(message);
            log.warn("Message failed after max retries: {}", message.getMessageId());
        }
    }
    
    /**
     * 记录消息消费
     */
    private void recordMessageConsumption(Message message, boolean success, long processDuration) {
        MessageConsumerRecord record = new MessageConsumerRecord();
        record.setMessageId(message.getMessageId());
        record.setConsumerId(consumerId);
        record.setStatus(success ? "SUCCESS" : "FAILED");
        record.setProcessTime(new Date());
        record.setProcessDuration(processDuration);
        recordRepository.save(record);
    }
    
    /**
     * 停止消费者
     */
    @PreDestroy
    public void stopConsumers() {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
        log.info("Stopped consumer threads");
    }
    
    /**
     * 手动消费消息
     */
    public void consumeMessageManually() {
        consumeMessage();
    }
    
    /**
     * 批量消费消息
     */
    public void consumeMessagesBatch() {
        for (int i = 0; i < batchSize; i++) {
            consumeMessage();
        }
    }
}

5. 消息管理控制器

@RestController
@RequestMapping("/api/message")
@Slf4j
public class MessageController {
    
    @Autowired
    private PriorityMessageQueueService messageQueueService;
    
    @Autowired
    private UrgentMessageService urgentMessageService;
    
    @Autowired
    private MessageConsumerService consumerService;
    
    @Autowired
    private MessageRepository messageRepository;
    
    /**
     * 发送消息
     */
    @PostMapping("/send")
    public Result<String> sendMessage(@RequestBody MessageRequest request) {
        Message message = createMessage(request);
        messageQueueService.sendMessage(message);
        return Result.success(message.getMessageId());
    }
    
    /**
     * 发送紧急消息
     */
    @PostMapping("/send/urgent")
    public Result<String> sendUrgentMessage(@RequestBody UrgentMessageRequest request) {
        urgentMessageService.sendUrgentMessage(request.getTopic(), request.getContent());
        return Result.success("紧急消息发送成功");
    }
    
    /**
     * 批量发送消息
     */
    @PostMapping("/send/batch")
    public Result<String> sendMessagesBatch(@RequestBody List<MessageRequest> requests) {
        List<Message> messages = new ArrayList<>();
        for (MessageRequest request : requests) {
            Message message = createMessage(request);
            messages.add(message);
        }
        messageQueueService.sendMessages(messages);
        return Result.success("批量消息发送成功");
    }
    
    /**
     * 批量发送紧急消息
     */
    @PostMapping("/send/urgent/batch")
    public Result<String> sendUrgentMessagesBatch(@RequestBody List<UrgentMessageRequest> requests) {
        urgentMessageService.sendUrgentMessages(requests);
        return Result.success("批量紧急消息发送成功");
    }
    
    /**
     * 将消息升级为紧急消息
     */
    @PutMapping("/upgrade/{messageId}")
    public Result<String> upgradeToUrgentMessage(@PathVariable String messageId) {
        urgentMessageService.upgradeToUrgentMessage(messageId);
        return Result.success("消息已升级为紧急消息");
    }
    
    /**
     * 获取队列状态
     */
    @GetMapping("/queue/status")
    public Result<Map<String, Long>> getQueueStatus() {
        Map<String, Long> queueLengths = messageQueueService.getQueueLengths();
        return Result.success(queueLengths);
    }
    
    /**
     * 获取消息列表
     */
    @GetMapping("/list")
    public Result<List<Message>> getMessages() {
        List<Message> messages = messageRepository.findAll();
        return Result.success(messages);
    }
    
    /**
     * 获取紧急消息列表
     */
    @GetMapping("/urgent/list")
    public Result<List<Message>> getUrgentMessages() {
        List<Message> messages = urgentMessageService.getUrgentMessages();
        return Result.success(messages);
    }
    
    /**
     * 获取消息详情
     */
    @GetMapping("/{messageId}")
    public Result<Message> getMessage(@PathVariable String messageId) {
        Optional<Message> messageOptional = messageRepository.findByMessageId(messageId);
        if (!messageOptional.isPresent()) {
            return Result.error("消息不存在");
        }
        return Result.success(messageOptional.get());
    }
    
    /**
     * 手动消费消息
     */
    @PostMapping("/consume")
    public Result<String> consumeMessage() {
        consumerService.consumeMessageManually();
        return Result.success("消息消费成功");
    }
    
    /**
     * 批量消费消息
     */
    @PostMapping("/consume/batch")
    public Result<String> consumeMessagesBatch() {
        consumerService.consumeMessagesBatch();
        return Result.success("批量消息消费成功");
    }
    
    /**
     * 清空队列
     */
    @PostMapping("/queue/clear")
    public Result<String> clearQueues() {
        messageQueueService.clearQueues();
        return Result.success("队列已清空");
    }
    
    /**
     * 创建消息
     */
    private Message createMessage(MessageRequest request) {
        Message message = new Message();
        message.setMessageId(UUID.randomUUID().toString());
        message.setTopic(request.getTopic());
        message.setPriority(request.getPriority() != null ? request.getPriority() : 2); // 默认低优先级
        message.setIsUrgent(false);
        message.setContent(request.getContent());
        message.setStatus("PENDING");
        message.setRetryCount(0);
        message.setMaxRetry(3);
        return message;
    }
    
    /**
     * 消息请求
     */
    @Data
    public static class MessageRequest {
        private String topic;
        private Integer priority;
        private String content;
    }
    
    /**
     * 紧急消息请求
     */
    @Data
    public static class UrgentMessageRequest {
        private String topic;
        private String content;
    }
}

核心流程

1. 消息发送流程

  1. 接收发送请求:客户端发送消息请求
  2. 创建消息:根据请求创建消息对象
  3. 优先级判断:判断消息是否为紧急消息或设置优先级
  4. 保存消息:将消息保存到数据库
  5. 发送到队列:根据优先级和是否为紧急消息,发送到对应的队列
  6. 返回结果:返回消息发送结果

2. 消息消费流程

  1. 按优先级接收:按紧急通道 > 高优先级 > 中优先级 > 低优先级 > 最低优先级的顺序接收消息
  2. 更新状态:将消息状态更新为处理中
  3. 处理消息:根据消息主题调用对应的处理方法
  4. 记录消费:记录消息消费记录
  5. 更新状态:根据处理结果更新消息状态
  6. 失败重试:处理失败时进行重试,达到最大重试次数后标记为失败

3. 紧急消息处理流程

  1. 接收紧急消息:接收紧急消息请求
  2. 创建紧急消息:创建标记为紧急的消息
  3. 发送到紧急通道:将消息发送到紧急通道
  4. 优先处理:紧急通道的消息会被优先处理
  5. 实时响应:确保紧急消息能够立即得到处理

技术要点

1. 优先级队列实现

  • 多级队列:使用不同的 Redis 列表作为不同优先级的队列
  • 优先级顺序:紧急通道 > 高优先级 > 中优先级 > 低优先级 > 最低优先级
  • 原子操作:使用 Redis 的原子操作确保消息的可靠发送和接收
  • 批量处理:支持批量发送和消费消息,提高处理效率

2. 紧急通道设计

  • 独立通道:紧急消息有专门的通道,不受其他消息影响
  • 最高优先级:紧急通道的消息优先级最高,确保立即处理
  • 消息升级:支持将普通消息升级为紧急消息
  • 实时监控:提供紧急消息的监控和管理

3. 消息处理机制

  • 线程池:使用线程池并行处理消息,提高处理能力
  • 错误处理:完善的错误处理和重试机制
  • 消费记录:详细的消费记录,便于问题排查
  • 状态管理:完整的消息状态管理,确保消息处理的可追踪性

4. 性能优化

  • 批量操作:批量发送和消费消息,减少网络开销
  • 异步处理:消息发送和消费都是异步的,不阻塞主线程
  • 缓存优化:使用 Redis 作为消息队列,提高消息处理速度
  • 并发控制:使用线程池和并发集合,提高并发处理能力

最佳实践

1. 消息优先级设置

  • 核心业务:如支付、订单等设置为高优先级
  • 普通业务:如通知、日志等设置为低优先级
  • 紧急情况:如系统告警、异常通知等使用紧急通道

2. 队列管理

  • 监控队列长度:定期监控各队列的长度,及时发现异常
  • 合理设置线程数:根据系统资源和消息量设置合适的消费者线程数
  • 队列清理:定期清理过期消息,避免队列积压

3. 错误处理

  • 重试机制:设置合理的重试次数和间隔
  • 死信队列:对于多次重试失败的消息,放入死信队列进行人工处理
  • 告警机制:当消息处理失败率超过阈值时,发送告警

4. 监控告警

  • 消息处理延迟:监控消息从发送到处理的延迟时间
  • 队列积压:监控队列积压情况,及时扩容
  • 处理成功率:监控消息处理的成功率
  • 系统资源:监控消费者的 CPU、内存使用情况

常见问题

1. 消息丢失

问题:消息在发送或消费过程中丢失

解决方案

  • 使用 Redis 的持久化机制
  • 消息发送后立即保存到数据库
  • 实现消息确认机制
  • 定期对账和消息重发

2. 队列积压

问题:消息队列出现积压,处理速度跟不上发送速度

解决方案

  • 增加消费者线程数
  • 优化消息处理逻辑,提高处理速度
  • 临时扩容,增加消费者实例
  • 对非核心消息进行限流

3. 优先级反转

问题:低优先级消息长时间无法得到处理

解决方案

  • 实现加权优先级,给低优先级消息一定的处理机会
  • 设置队列长度上限,避免高优先级消息完全阻塞低优先级消息
  • 定期处理低优先级队列,确保消息不会永远被阻塞

4. 系统负载过高

问题:消息处理导致系统负载过高

解决方案

  • 合理设置消费者线程数
  • 对消息处理进行限流
  • 实现熔断机制,当系统负载过高时暂停消费
  • 优化消息处理逻辑,减少资源消耗

代码优化建议

1. 优先级队列优化

/**
 * 使用加权轮询算法优化优先级队列
 */
public Message receiveMessage() {
    // 1. 首先检查紧急通道
    Message urgentMessage = receiveFromUrgentQueue();
    if (urgentMessage != null) {
        return urgentMessage;
    }
    
    // 2. 使用加权轮询处理其他优先级队列
    long currentTime = System.currentTimeMillis();
    int[] weights = {5, 3, 1, 1}; // 高:中:低:最低 = 5:3:1:1
    String[] queues = {highPriorityQueue, mediumPriorityQueue, lowPriorityQueue, lowestPriorityQueue};
    
    for (int i = 0; i < queues.length; i++) {
        if (shouldProcessQueue(queues[i], weights[i], currentTime)) {
            Message message = receiveFromQueue(queues[i]);
            if (message != null) {
                return message;
            }
        }
    }
    
    return null;
}

/**
 * 判断是否应该处理队列
 */
private boolean shouldProcessQueue(String queue, int weight, long currentTime) {
    // 实现加权轮询逻辑
    // 示例:基于时间和权重计算
    return true;
}

2. 消息处理优化

/**
 * 使用 CompletableFuture 优化消息处理
 */
private CompletableFuture<Boolean> processMessageAsync(Message message) {
    return CompletableFuture.supplyAsync(() -> {
        try {
            // 处理消息
            processMessage(message);
            return true;
        } catch (Exception e) {
            log.error("Failed to process message: {}", message.getMessageId(), e);
            return false;
        }
    });
}

/**
 * 批量处理消息
 */
private void processMessagesBatch(List<Message> messages) {
    List<CompletableFuture<Boolean>> futures = new ArrayList<>();
    
    for (Message message : messages) {
        CompletableFuture<Boolean> future = processMessageAsync(message);
        futures.add(future);
    }
    
    // 等待所有处理完成
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
    
    // 处理结果
    for (int i = 0; i < messages.size(); i++) {
        Message message = messages.get(i);
        boolean success = futures.get(i).join();
        // 更新消息状态
    }
}

3. 监控优化

/**
 * 实现消息处理监控
 */
@Scheduled(fixedRate = 60000) // 每分钟监控一次
public void monitorMessageProcessing() {
    // 1. 监控队列长度
    Map<String, Long> queueLengths = messageQueueService.getQueueLengths();
    log.info("Queue lengths: {}", queueLengths);
    
    // 2. 监控消息处理延迟
    List<Message> pendingMessages = messageRepository.findByStatus("PENDING");
    for (Message message : pendingMessages) {
        long delay = System.currentTimeMillis() - message.getCreateTime().getTime();
        if (delay > 5 * 60 * 1000) { // 超过5分钟未处理
            log.warn("Message pending for too long: {}, delay: {}ms", message.getMessageId(), delay);
        }
    }
    
    // 3. 监控处理成功率
    List<Message> failedMessages = messageRepository.findByStatus("FAILED");
    int totalMessages = messageRepository.findAll().size();
    double failureRate = totalMessages > 0 ? failedMessages.size() * 100.0 / totalMessages : 0;
    if (failureRate > 5) { // 失败率超过5%
        log.warn("Message failure rate too high: {}%", failureRate);
    }
}

互动话题

  1. 你在实际项目中遇到过哪些消息队列的优先级问题?是如何解决的?
  2. 对于紧急消息,你有什么更好的处理策略?
  3. 在分布式环境下,如何确保消息优先级的一致性?
  4. 你认为消息队列的优先级设计还有哪些可以改进的地方?

欢迎在评论区交流讨论!


公众号:服务端技术精选,关注最新技术动态,分享实用技巧,提升开发效率。


标题:SpringBoot + 消息优先级队列 + 紧急通道:核心业务消息插队处理,保障关键链路
作者:jiangyi
地址:http://jiangyi.space/articles/2026/03/22/1774081175970.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消