SpringBoot + WebSocket 消息 QoS(服务质量):在线推优先,离线存库,确保不丢关键通知

引言:消息丢失的噩梦

公司的订单系统因为网络抖动导致大量关键通知丢失。用户下单后没有收到支付提醒,商家也没有收到新订单通知,最终导致订单超时取消,用户投诉,GMV 损失惨重。

实时消息推送是现代 Web 应用的核心功能,但面临着诸多挑战:

  • 网络不稳定:用户网络波动导致连接断开
  • 客户端离线:用户关闭浏览器或 APP 离线
  • 消息堆积:高峰期消息量过大导致推送延迟
  • 消息丢失:关键通知丢失导致业务异常

QoS(Quality of Service,服务质量) 是解决这些问题的关键。本文将带你深入理解 WebSocket 消息 QoS 机制,并使用 Spring Boot 实现一套完整的消息推送方案。


一、WebSocket 消息 QoS:概念与重要性

1.1 什么是消息 QoS?

QoS(Quality of Service) 是指消息传输的服务质量等级,用于保证消息的可靠传输。

类比生活中的例子

  • QoS 0(最多一次):普通信件,可能丢失
  • QoS 1(至少一次):挂号信,保证送达但可能重复
  • QoS 2(恰好一次):快递签收,保证送达且不重复

1.2 MQTT 协议中的 QoS 等级

QoS 等级名称描述适用场景
0At most once最多一次,可能丢失日志收集、心跳包
1At least once至少一次,可能重复普通通知、状态更新
2Exactly once恰好一次,不丢不重支付通知、订单状态

1.3 为什么 WebSocket 需要 QoS?

┌─────────────────────────────────────────────────────────────┐
│                  WebSocket 消息传输的挑战                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 网络不稳定                                             │
│     └─> 用户网络波动,WebSocket 连接频繁断开                 │
│                                                             │
│  2. 客户端离线                                             │
│     └─> 用户关闭浏览器或 APP,消息无法推送                   │
│                                                             │
│  3. 消息堆积                                               │
│     └─> 高峰期消息量过大,推送延迟或失败                     │
│                                                             │
│  4. 消息丢失                                               │
│     └─> 关键通知丢失,导致业务异常                           │
│                                                             │
│  5. 消息重复                                               │
│     └─> 重连后重复推送,用户体验差                           │
│                                                             │
└─────────────────────────────────────────────────────────────┘

二、WebSocket 消息 QoS 架构设计

2.1 整体架构

┌─────────────────────────────────────────────────────────────┐
│              WebSocket 消息 QoS 架构                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐            │
│  │  前端   │    │  后端   │    │  MySQL  │            │
│  └────┬────┘    └────┬────┘    └─────────┘            │
│       │              │                                    │
│       │ 1.建立连接     │                                    │
│       ├─────────────>│                                    │
│       │              │                                    │
│       │ 2.发送消息     │                                    │
│       │<─────────────┤                                    │
│       │              │                                    │
│       │ 3.消息确认     │                                    │
│       ├─────────────>│                                    │
│       │              │                                    │
│       │ 4.连接断开     │                                    │
│       │              │                                    │
│       │              │ 5.消息存储                           │
│       │              ├─────────────────────────────────────>│
│       │              │                                    │
│       │ 6.重新连接     │                                    │
│       ├─────────────>│                                    │
│       │              │                                    │
│       │ 7.离线消息推送  │                                    │
│       │<─────────────┤                                    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2.2 核心组件

┌─────────────────────────────────────────────────────────────┐
│              核心组件设计                                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              WebSocket 连接管理器                     │   │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐              │   │
│  │  │ Conn-1  │ │ Conn-2  │ │ Conn-3  │              │   │
│  │  │ User-1  │ │ User-2  │ │ User-3  │              │   │
│  │  └─────────┘ └─────────┘ └─────────┘              │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              消息存储服务                             │   │
│  │  ┌─────────────────────────────────────────────┐   │   │
│  │  │ 消息ID | 用户ID | 内容 | 状态 | 时间戳 | QoS  │   │   │
│  │  └─────────────────────────────────────────────┘   │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              QoS 消息服务                            │   │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐              │   │
│  │  │ 在线推送 │ │ 离线存储 │ │ 消息确认 │              │   │
│  │  └─────────┘ └─────────┘ └─────────┘              │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

三、Spring Boot + WebSocket 实现消息 QoS

3.1 项目依赖

<dependencies>
    <!-- Spring Boot WebSocket -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    
    <!-- Spring Boot Data JPA -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    
    <!-- MySQL Driver -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>
    
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

3.2 WebSocket 配置

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // 启用简单的内存消息代理
        config.enableSimpleBroker("/topic", "/queue");
        // 设置应用前缀
        config.setApplicationDestinationPrefixes("/app");
        // 设置用户前缀
        config.setUserDestinationPrefix("/user");
    }
    
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册 WebSocket 端点
        registry.addEndpoint("/ws")
                .setAllowedOriginPatterns("*")
                .withSockJS();
    }
}

3.3 消息实体设计

@Entity
@Table(name = "websocket_message")
@Data
public class WebSocketMessage {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "message_id", nullable = false, unique = true, length = 64)
    private String messageId;  // 消息唯一ID
    
    @Column(name = "user_id", nullable = false, length = 64)
    private String userId;     // 用户ID
    
    @Column(name = "content", nullable = false, columnDefinition = "text")
    private String content;    // 消息内容
    
    @Column(name = "message_type", length = 32)
    private String messageType; // 消息类型
    
    @Column(name = "qos", nullable = false)
    private Integer qos;       // QoS 等级 (0, 1, 2)
    
    @Column(name = "status", nullable = false)
    private Integer status;    // 状态 (0-待发送, 1-已发送, 2-已确认)
    
    @Column(name = "retry_count", nullable = false)
    private Integer retryCount; // 重试次数
    
    @Column(name = "create_time", nullable = false)
    private LocalDateTime createTime;
    
    @Column(name = "send_time")
    private LocalDateTime sendTime;
    
    @Column(name = "ack_time")
    private LocalDateTime ackTime;
    
    @PrePersist
    protected void onCreate() {
        this.createTime = LocalDateTime.now();
        this.retryCount = 0;
        this.status = 0;
    }
}

3.4 WebSocket 连接管理器

@Component
public class WebSocketConnectionManager {
    
    // 用户ID -> Session 映射
    private final ConcurrentHashMap<String, WebSocketSession> userSessions = new ConcurrentHashMap<>();
    
    // Session ID -> 用户ID 映射
    private final ConcurrentHashMap<String, String> sessionUsers = new ConcurrentHashMap<>();
    
    /**
     * 添加连接
     */
    public void addConnection(String userId, WebSocketSession session) {
        userSessions.put(userId, session);
        sessionUsers.put(session.getId(), userId);
    }
    
    /**
     * 移除连接
     */
    public void removeConnection(String sessionId) {
        String userId = sessionUsers.remove(sessionId);
        if (userId != null) {
            userSessions.remove(userId);
        }
    }
    
    /**
     * 获取用户 Session
     */
    public WebSocketSession getSession(String userId) {
        return userSessions.get(userId);
    }
    
    /**
     * 检查用户是否在线
     */
    public boolean isOnline(String userId) {
        return userSessions.containsKey(userId);
    }
    
    /**
     * 获取在线用户列表
     */
    public Set<String> getOnlineUsers() {
        return new HashSet<>(userSessions.keySet());
    }
}

3.5 QoS 消息服务

@Service
@Slf4j
public class QosMessageService {
    
    @Autowired
    private WebSocketConnectionManager connectionManager;
    
    @Autowired
    private WebSocketMessageRepository messageRepository;
    
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    /**
     * 发送消息(支持 QoS)
     */
    public void sendMessage(String userId, String content, Integer qos) {
        String messageId = generateMessageId();
        
        // 创建消息记录
        WebSocketMessage message = new WebSocketMessage();
        message.setMessageId(messageId);
        message.setUserId(userId);
        message.setContent(content);
        message.setQos(qos);
        message.setMessageType("NOTIFICATION");
        
        // QoS 0:直接发送,不存储
        if (qos == 0) {
            sendToUser(userId, message);
            return;
        }
        
        // QoS 1/2:先存储,再发送
        messageRepository.save(message);
        
        if (connectionManager.isOnline(userId)) {
            // 用户在线,直接推送
            sendToUser(userId, message);
            message.setStatus(1);
            message.setSendTime(LocalDateTime.now());
            messageRepository.save(message);
        } else {
            // 用户离线,等待重连后推送
            log.info("用户 {} 离线,消息 {} 已存储", userId, messageId);
        }
    }
    
    /**
     * 处理消息确认
     */
    @Transactional
    public void handleAck(String userId, String messageId) {
        WebSocketMessage message = messageRepository.findByMessageId(messageId);
        
        if (message == null) {
            log.warn("消息不存在:{}", messageId);
            return;
        }
        
        if (!message.getUserId().equals(userId)) {
            log.warn("消息用户不匹配:{}", messageId);
            return;
        }
        
        // 更新消息状态为已确认
        message.setStatus(2);
        message.setAckTime(LocalDateTime.now());
        messageRepository.save(message);
        
        log.info("消息 {} 已确认", messageId);
    }
    
    /**
     * 用户重连后推送离线消息
     */
    public void sendOfflineMessages(String userId) {
        List<WebSocketMessage> offlineMessages = messageRepository
                .findByUserIdAndStatus(userId, 0);
        
        for (WebSocketMessage message : offlineMessages) {
            sendToUser(userId, message);
            message.setStatus(1);
            message.setSendTime(LocalDateTime.now());
            messageRepository.save(message);
            
            log.info("推送离线消息 {} 给用户 {}", message.getMessageId(), userId);
        }
    }
    
    /**
     * 重试未确认的消息(定时任务)
     */
    @Scheduled(fixedRate = 60000) // 每分钟执行一次
    public void retryUnacknowledgedMessages() {
        // 查询已发送但未确认的消息
        List<WebSocketMessage> unackMessages = messageRepository
                .findByStatusAndSendTimeBefore(1, LocalDateTime.now().minusMinutes(1));
        
        for (WebSocketMessage message : unackMessages) {
            if (message.getRetryCount() >= 3) {
                // 超过最大重试次数,标记为失败
                message.setStatus(3);
                messageRepository.save(message);
                log.warn("消息 {} 重试次数超过限制", message.getMessageId());
                continue;
            }
            
            // 重试发送
            if (connectionManager.isOnline(message.getUserId())) {
                sendToUser(message.getUserId(), message);
                message.setRetryCount(message.getRetryCount() + 1);
                messageRepository.save(message);
                
                log.info("重试发送消息 {},第 {} 次", message.getMessageId(), message.getRetryCount());
            }
        }
    }
    
    /**
     * 发送消息给用户
     */
    private void sendToUser(String userId, WebSocketMessage message) {
        try {
            messagingTemplate.convertAndSendToUser(
                    userId,
                    "/queue/messages",
                    message
            );
            log.info("消息 {} 已发送给用户 {}", message.getMessageId(), userId);
        } catch (Exception e) {
            log.error("发送消息 {} 失败", message.getMessageId(), e);
        }
    }
    
    /**
     * 生成消息ID
     */
    private String generateMessageId() {
        return UUID.randomUUID().toString().replace("-", "");
    }
}

3.6 WebSocket 处理器

@Component
@Slf4j
public class WebSocketHandler extends TextWebSocketHandler {
    
    @Autowired
    private WebSocketConnectionManager connectionManager;
    
    @Autowired
    private QosMessageService qosMessageService;
    
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String userId = getUserIdFromSession(session);
        
        if (userId != null) {
            connectionManager.addConnection(userId, session);
            log.info("用户 {} 已连接,Session ID: {}", userId, session.getId());
            
            // 推送离线消息
            qosMessageService.sendOfflineMessages(userId);
        }
    }
    
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String payload = message.getPayload();
        log.info("收到消息:{}", payload);
        
        try {
            WebSocketMessage wsMessage = objectMapper.readValue(payload, WebSocketMessage.class);
            
            // 处理消息确认
            if ("ACK".equals(wsMessage.getMessageType())) {
                String userId = connectionManager.getUserId(session.getId());
                qosMessageService.handleAck(userId, wsMessage.getMessageId());
            }
        } catch (Exception e) {
            log.error("处理消息失败", e);
        }
    }
    
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        connectionManager.removeConnection(session.getId());
        log.info("连接已关闭,Session ID: {}", session.getId());
    }
    
    private String getUserIdFromSession(WebSocketSession session) {
        // 从 Session 属性中获取用户ID
        // 实际项目中可以从 JWT Token 或 Session 中解析
        return (String) session.getAttributes().get("userId");
    }
}

3.7 消息控制器

@RestController
@RequestMapping("/api/message")
@Slf4j
public class MessageController {
    
    @Autowired
    private QosMessageService qosMessageService;
    
    /**
     * 发送消息(支持 QoS)
     */
    @PostMapping("/send")
    public Result sendMessage(
            @RequestParam String userId,
            @RequestParam String content,
            @RequestParam(defaultValue = "1") Integer qos) {
        
        log.info("发送消息给用户 {},QoS: {}", userId, qos);
        qosMessageService.sendMessage(userId, content, qos);
        return Result.success("消息已发送");
    }
    
    /**
     * 批量发送消息
     */
    @PostMapping("/send/batch")
    public Result sendBatchMessage(
            @RequestBody BatchMessageRequest request) {
        
        for (String userId : request.getUserIds()) {
            qosMessageService.sendMessage(userId, request.getContent(), request.getQos());
        }
        
        return Result.success("批量消息已发送");
    }
    
    /**
     * 广播消息
     */
    @PostMapping("/broadcast")
    public Result broadcastMessage(
            @RequestParam String content,
            @RequestParam(defaultValue = "0") Integer qos) {
        
        // 获取所有在线用户
        Set<String> onlineUsers = connectionManager.getOnlineUsers();
        
        for (String userId : onlineUsers) {
            qosMessageService.sendMessage(userId, content, qos);
        }
        
        return Result.success("广播消息已发送");
    }
}

四、前端实现

4.1 WebSocket 客户端

class WebSocketClient {
    constructor(userId) {
        this.userId = userId;
        this.socket = null;
        this.reconnectInterval = 5000;
        this.maxReconnectAttempts = 10;
        this.reconnectAttempts = 0;
        this.pendingAcks = new Map(); // 待确认的消息
        this.messageHandlers = new Map();
    }
    
    connect() {
        const wsUrl = `ws://localhost:8080/ws?userId=${this.userId}`;
        this.socket = new WebSocket(wsUrl);
        
        this.socket.onopen = () => {
            console.log('WebSocket 连接已建立');
            this.reconnectAttempts = 0;
            
            // 重新发送未确认的消息
            this.resendPendingMessages();
        };
        
        this.socket.onmessage = (event) => {
            const message = JSON.parse(event.data);
            this.handleMessage(message);
        };
        
        this.socket.onclose = () => {
            console.log('WebSocket 连接已关闭');
            this.attemptReconnect();
        };
        
        this.socket.onerror = (error) => {
            console.error('WebSocket 错误:', error);
        };
    }
    
    handleMessage(message) {
        console.log('收到消息:', message);
        
        // 发送确认(QoS 1/2)
        if (message.qos > 0) {
            this.sendAck(message.messageId);
        }
        
        // 调用消息处理器
        const handler = this.messageHandlers.get(message.messageType);
        if (handler) {
            handler(message);
        }
    }
    
    sendAck(messageId) {
        const ack = {
            messageType: 'ACK',
            messageId: messageId
        };
        this.socket.send(JSON.stringify(ack));
    }
    
    attemptReconnect() {
        if (this.reconnectAttempts < this.maxReconnectAttempts) {
            this.reconnectAttempts++;
            console.log(`尝试重连... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
            
            setTimeout(() => {
                this.connect();
            }, this.reconnectInterval);
        }
    }
    
    onMessage(messageType, handler) {
        this.messageHandlers.set(messageType, handler);
    }
    
    close() {
        if (this.socket) {
            this.socket.close();
        }
    }
}

// 使用示例
const client = new WebSocketClient('user-001');

client.onMessage('NOTIFICATION', (message) => {
    console.log('收到通知:', message.content);
    // 显示通知
    showNotification(message.content);
});

client.connect();

4.2 消息确认机制

// 消息确认管理器
class MessageAckManager {
    constructor() {
        this.pendingMessages = new Map();
        this.ackTimeout = 30000; // 30秒超时
        this.maxRetries = 3;
    }
    
    addPendingMessage(messageId, message) {
        this.pendingMessages.set(messageId, {
            message: message,
            retries: 0,
            timestamp: Date.now()
        });
        
        // 设置超时检查
        setTimeout(() => {
            this.checkAckTimeout(messageId);
        }, this.ackTimeout);
    }
    
    confirmMessage(messageId) {
        this.pendingMessages.delete(messageId);
        console.log('消息已确认:', messageId);
    }
    
    checkAckTimeout(messageId) {
        const pending = this.pendingMessages.get(messageId);
        if (!pending) return; // 已确认
        
        if (pending.retries < this.maxRetries) {
            // 重试发送
            pending.retries++;
            console.log(`消息 ${messageId} 超时,重试 ${pending.retries}/${this.maxRetries}`);
            this.resendMessage(pending.message);
            
            // 重新设置超时检查
            setTimeout(() => {
                this.checkAckTimeout(messageId);
            }, this.ackTimeout);
        } else {
            // 超过最大重试次数
            console.error(`消息 ${messageId} 发送失败,超过最大重试次数`);
            this.pendingMessages.delete(messageId);
            this.handleSendFailure(pending.message);
        }
    }
    
    resendMessage(message) {
        // 重新发送消息
        if (window.webSocketClient && window.webSocketClient.socket) {
            window.webSocketClient.socket.send(JSON.stringify(message));
        }
    }
    
    handleSendFailure(message) {
        // 处理发送失败
        console.error('消息发送失败:', message);
        // 可以在这里进行降级处理,如使用 HTTP 接口
    }
}

五、最佳实践

5.1 QoS 等级选择

场景QoS 等级说明
心跳包0不需要确认,丢失不影响业务
普通通知1保证送达,允许重复
支付通知2保证送达且不重复
订单状态2关键业务,不能丢失
聊天消息1保证送达,顺序重要

5.2 消息存储策略

@Service
public class MessageStorageService {
    
    /**
     * 存储消息
     */
    public void storeMessage(WebSocketMessage message) {
        // QoS 0:不存储
        if (message.getQos() == 0) {
            return;
        }
        
        // QoS 1/2:存储到数据库
        messageRepository.save(message);
        
        // 异步写入消息队列(可选)
        if (message.getQos() == 2) {
            kafkaTemplate.send("websocket-messages", message);
        }
    }
    
    /**
     * 清理已确认的消息(定时任务)
     */
    @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
    public void cleanupAcknowledgedMessages() {
        // 删除已确认且超过7天的消息
        LocalDateTime before = LocalDateTime.now().minusDays(7);
        messageRepository.deleteByStatusAndAckTimeBefore(2, before);
    }
}

5.3 性能优化

@Service
public class WebSocketPerformanceService {
    
    // 批量发送消息
    public void sendBatchMessages(List<WebSocketMessage> messages) {
        // 按用户分组
        Map<String, List<WebSocketMessage>> userMessages = messages.stream()
                .collect(Collectors.groupingBy(WebSocketMessage::getUserId));
        
        // 批量发送
        for (Map.Entry<String, List<WebSocketMessage>> entry : userMessages.entrySet()) {
            String userId = entry.getKey();
            List<WebSocketMessage> userMsgs = entry.getValue();
            
            if (connectionManager.isOnline(userId)) {
                // 用户在线,批量推送
                messagingTemplate.convertAndSendToUser(
                        userId,
                        "/queue/messages",
                        userMsgs
                );
                
                // 批量更新状态
                userMsgs.forEach(msg -> {
                    msg.setStatus(1);
                    msg.setSendTime(LocalDateTime.now());
                });
                messageRepository.saveAll(userMsgs);
            }
        }
    }
    
    // 使用缓存加速
    @Cacheable(value = "onlineUsers", key = "#userId")
    public boolean isUserOnline(String userId) {
        return connectionManager.isOnline(userId);
    }
}

5.4 监控告警

@Component
public class WebSocketMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    /**
     * 监控在线连接数
     */
    @Scheduled(fixedRate = 60000)
    public void monitorOnlineConnections() {
        int onlineCount = connectionManager.getOnlineUserCount();
        meterRegistry.gauge("websocket.online.connections", onlineCount);
        
        if (onlineCount > 10000) {
            // 发送告警
            alertService.sendAlert("WebSocket 在线连接数过高: " + onlineCount);
        }
    }
    
    /**
     * 监控消息积压
     */
    @Scheduled(fixedRate = 60000)
    public void monitorMessageBacklog() {
        long pendingCount = messageRepository.countByStatus(0);
        meterRegistry.gauge("websocket.pending.messages", pendingCount);
        
        if (pendingCount > 100000) {
            // 发送告警
            alertService.sendAlert("WebSocket 消息积压严重: " + pendingCount);
        }
    }
    
    /**
     * 监控消息确认率
     */
    @Scheduled(fixedRate = 300000) // 每5分钟
    public void monitorAckRate() {
        long total = messageRepository.countByCreateTimeAfter(LocalDateTime.now().minusMinutes(5));
        long acknowledged = messageRepository.countByStatusAndCreateTimeAfter(2, LocalDateTime.now().minusMinutes(5));
        
        double ackRate = total > 0 ? (double) acknowledged / total : 1.0;
        meterRegistry.gauge("websocket.ack.rate", ackRate);
        
        if (ackRate < 0.95) {
            // 发送告警
            alertService.sendAlert("WebSocket 消息确认率过低: " + ackRate);
        }
    }
}

六、常见问题与解决方案

6.1 消息丢失

原因

  • 客户端离线期间消息未存储
  • 消息存储失败
  • 推送失败未重试

解决方案

  • 所有 QoS 1/2 消息先存储再推送
  • 使用数据库事务保证存储成功
  • 实现消息重试机制

6.2 消息重复

原因

  • 消息重试导致重复推送
  • 客户端重连后重复接收

解决方案

  • 客户端维护已接收消息ID集合
  • 服务端记录已确认消息
  • 幂等性处理业务逻辑

6.3 连接数过多

原因

  • 用户频繁刷新页面
  • 连接未正确关闭

解决方案

  • 限制单个用户的连接数
  • 实现连接心跳检测
  • 及时清理无效连接

6.4 消息积压

原因

  • 高峰期消息量过大
  • 消费速度跟不上生产速度

解决方案

  • 批量发送消息
  • 使用消息队列削峰
  • 水平扩展 WebSocket 服务

七、总结

WebSocket 消息 QoS 是保证实时消息可靠传输的关键技术。本文介绍了 QoS 的概念、架构设计和 Spring Boot 实现方案。

核心要点:

  1. 根据业务场景选择合适的 QoS 等级
  2. 在线用户直接推送,离线用户存储待推送
  3. 实现消息确认机制,保证消息可靠传输
  4. 做好监控告警,及时发现和处理问题

适用场景:

  • 实时通知推送
  • 在线聊天系统
  • 订单状态同步
  • 股票行情推送
  • 游戏实时对战

如果本文对你有帮助,欢迎关注「服务端技术精选」公众号,获取更多后端技术干货。


互动题:

  1. 在你的项目中,如何处理 WebSocket 消息丢失问题?
  2. QoS 1 和 QoS 2 的区别是什么?什么场景下必须使用 QoS 2?
  3. 如何设计一个支持百万级并发的 WebSocket 消息推送系统?

欢迎在评论区分享你的想法和经验,我们一起交流学习!


标题:SpringBoot + WebSocket 消息 QoS(服务质量):在线推优先,离线存库,确保不丢关键通知
作者:jiangyi
地址:http://jiangyi.space/articles/2026/03/08/1772943099681.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消