SpringBoot + WebSocket 消息 QoS(服务质量):在线推优先,离线存库,确保不丢关键通知
引言:消息丢失的噩梦
公司的订单系统因为网络抖动导致大量关键通知丢失。用户下单后没有收到支付提醒,商家也没有收到新订单通知,最终导致订单超时取消,用户投诉,GMV 损失惨重。
实时消息推送是现代 Web 应用的核心功能,但面临着诸多挑战:
- 网络不稳定:用户网络波动导致连接断开
- 客户端离线:用户关闭浏览器或 APP 离线
- 消息堆积:高峰期消息量过大导致推送延迟
- 消息丢失:关键通知丢失导致业务异常
QoS(Quality of Service,服务质量) 是解决这些问题的关键。本文将带你深入理解 WebSocket 消息 QoS 机制,并使用 Spring Boot 实现一套完整的消息推送方案。
一、WebSocket 消息 QoS:概念与重要性
1.1 什么是消息 QoS?
QoS(Quality of Service) 是指消息传输的服务质量等级,用于保证消息的可靠传输。
类比生活中的例子:
- QoS 0(最多一次):普通信件,可能丢失
- QoS 1(至少一次):挂号信,保证送达但可能重复
- QoS 2(恰好一次):快递签收,保证送达且不重复
1.2 MQTT 协议中的 QoS 等级
| QoS 等级 | 名称 | 描述 | 适用场景 |
|---|---|---|---|
| 0 | At most once | 最多一次,可能丢失 | 日志收集、心跳包 |
| 1 | At least once | 至少一次,可能重复 | 普通通知、状态更新 |
| 2 | Exactly once | 恰好一次,不丢不重 | 支付通知、订单状态 |
1.3 为什么 WebSocket 需要 QoS?
┌─────────────────────────────────────────────────────────────┐
│ WebSocket 消息传输的挑战 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 网络不稳定 │
│ └─> 用户网络波动,WebSocket 连接频繁断开 │
│ │
│ 2. 客户端离线 │
│ └─> 用户关闭浏览器或 APP,消息无法推送 │
│ │
│ 3. 消息堆积 │
│ └─> 高峰期消息量过大,推送延迟或失败 │
│ │
│ 4. 消息丢失 │
│ └─> 关键通知丢失,导致业务异常 │
│ │
│ 5. 消息重复 │
│ └─> 重连后重复推送,用户体验差 │
│ │
└─────────────────────────────────────────────────────────────┘
二、WebSocket 消息 QoS 架构设计
2.1 整体架构
┌─────────────────────────────────────────────────────────────┐
│ WebSocket 消息 QoS 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 前端 │ │ 后端 │ │ MySQL │ │
│ └────┬────┘ └────┬────┘ └─────────┘ │
│ │ │ │
│ │ 1.建立连接 │ │
│ ├─────────────>│ │
│ │ │ │
│ │ 2.发送消息 │ │
│ │<─────────────┤ │
│ │ │ │
│ │ 3.消息确认 │ │
│ ├─────────────>│ │
│ │ │ │
│ │ 4.连接断开 │ │
│ │ │ │
│ │ │ 5.消息存储 │
│ │ ├─────────────────────────────────────>│
│ │ │ │
│ │ 6.重新连接 │ │
│ ├─────────────>│ │
│ │ │ │
│ │ 7.离线消息推送 │ │
│ │<─────────────┤ │
│ │
└─────────────────────────────────────────────────────────────┘
2.2 核心组件
┌─────────────────────────────────────────────────────────────┐
│ 核心组件设计 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ WebSocket 连接管理器 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Conn-1 │ │ Conn-2 │ │ Conn-3 │ │ │
│ │ │ User-1 │ │ User-2 │ │ User-3 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 消息存储服务 │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ 消息ID | 用户ID | 内容 | 状态 | 时间戳 | QoS │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ QoS 消息服务 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 在线推送 │ │ 离线存储 │ │ 消息确认 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
三、Spring Boot + WebSocket 实现消息 QoS
3.1 项目依赖
<dependencies>
<!-- Spring Boot WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Spring Boot Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- MySQL Driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
3.2 WebSocket 配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 启用简单的内存消息代理
config.enableSimpleBroker("/topic", "/queue");
// 设置应用前缀
config.setApplicationDestinationPrefixes("/app");
// 设置用户前缀
config.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册 WebSocket 端点
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
}
}
3.3 消息实体设计
@Entity
@Table(name = "websocket_message")
@Data
public class WebSocketMessage {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "message_id", nullable = false, unique = true, length = 64)
private String messageId; // 消息唯一ID
@Column(name = "user_id", nullable = false, length = 64)
private String userId; // 用户ID
@Column(name = "content", nullable = false, columnDefinition = "text")
private String content; // 消息内容
@Column(name = "message_type", length = 32)
private String messageType; // 消息类型
@Column(name = "qos", nullable = false)
private Integer qos; // QoS 等级 (0, 1, 2)
@Column(name = "status", nullable = false)
private Integer status; // 状态 (0-待发送, 1-已发送, 2-已确认)
@Column(name = "retry_count", nullable = false)
private Integer retryCount; // 重试次数
@Column(name = "create_time", nullable = false)
private LocalDateTime createTime;
@Column(name = "send_time")
private LocalDateTime sendTime;
@Column(name = "ack_time")
private LocalDateTime ackTime;
@PrePersist
protected void onCreate() {
this.createTime = LocalDateTime.now();
this.retryCount = 0;
this.status = 0;
}
}
3.4 WebSocket 连接管理器
@Component
public class WebSocketConnectionManager {
// 用户ID -> Session 映射
private final ConcurrentHashMap<String, WebSocketSession> userSessions = new ConcurrentHashMap<>();
// Session ID -> 用户ID 映射
private final ConcurrentHashMap<String, String> sessionUsers = new ConcurrentHashMap<>();
/**
* 添加连接
*/
public void addConnection(String userId, WebSocketSession session) {
userSessions.put(userId, session);
sessionUsers.put(session.getId(), userId);
}
/**
* 移除连接
*/
public void removeConnection(String sessionId) {
String userId = sessionUsers.remove(sessionId);
if (userId != null) {
userSessions.remove(userId);
}
}
/**
* 获取用户 Session
*/
public WebSocketSession getSession(String userId) {
return userSessions.get(userId);
}
/**
* 检查用户是否在线
*/
public boolean isOnline(String userId) {
return userSessions.containsKey(userId);
}
/**
* 获取在线用户列表
*/
public Set<String> getOnlineUsers() {
return new HashSet<>(userSessions.keySet());
}
}
3.5 QoS 消息服务
@Service
@Slf4j
public class QosMessageService {
@Autowired
private WebSocketConnectionManager connectionManager;
@Autowired
private WebSocketMessageRepository messageRepository;
@Autowired
private SimpMessagingTemplate messagingTemplate;
/**
* 发送消息(支持 QoS)
*/
public void sendMessage(String userId, String content, Integer qos) {
String messageId = generateMessageId();
// 创建消息记录
WebSocketMessage message = new WebSocketMessage();
message.setMessageId(messageId);
message.setUserId(userId);
message.setContent(content);
message.setQos(qos);
message.setMessageType("NOTIFICATION");
// QoS 0:直接发送,不存储
if (qos == 0) {
sendToUser(userId, message);
return;
}
// QoS 1/2:先存储,再发送
messageRepository.save(message);
if (connectionManager.isOnline(userId)) {
// 用户在线,直接推送
sendToUser(userId, message);
message.setStatus(1);
message.setSendTime(LocalDateTime.now());
messageRepository.save(message);
} else {
// 用户离线,等待重连后推送
log.info("用户 {} 离线,消息 {} 已存储", userId, messageId);
}
}
/**
* 处理消息确认
*/
@Transactional
public void handleAck(String userId, String messageId) {
WebSocketMessage message = messageRepository.findByMessageId(messageId);
if (message == null) {
log.warn("消息不存在:{}", messageId);
return;
}
if (!message.getUserId().equals(userId)) {
log.warn("消息用户不匹配:{}", messageId);
return;
}
// 更新消息状态为已确认
message.setStatus(2);
message.setAckTime(LocalDateTime.now());
messageRepository.save(message);
log.info("消息 {} 已确认", messageId);
}
/**
* 用户重连后推送离线消息
*/
public void sendOfflineMessages(String userId) {
List<WebSocketMessage> offlineMessages = messageRepository
.findByUserIdAndStatus(userId, 0);
for (WebSocketMessage message : offlineMessages) {
sendToUser(userId, message);
message.setStatus(1);
message.setSendTime(LocalDateTime.now());
messageRepository.save(message);
log.info("推送离线消息 {} 给用户 {}", message.getMessageId(), userId);
}
}
/**
* 重试未确认的消息(定时任务)
*/
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void retryUnacknowledgedMessages() {
// 查询已发送但未确认的消息
List<WebSocketMessage> unackMessages = messageRepository
.findByStatusAndSendTimeBefore(1, LocalDateTime.now().minusMinutes(1));
for (WebSocketMessage message : unackMessages) {
if (message.getRetryCount() >= 3) {
// 超过最大重试次数,标记为失败
message.setStatus(3);
messageRepository.save(message);
log.warn("消息 {} 重试次数超过限制", message.getMessageId());
continue;
}
// 重试发送
if (connectionManager.isOnline(message.getUserId())) {
sendToUser(message.getUserId(), message);
message.setRetryCount(message.getRetryCount() + 1);
messageRepository.save(message);
log.info("重试发送消息 {},第 {} 次", message.getMessageId(), message.getRetryCount());
}
}
}
/**
* 发送消息给用户
*/
private void sendToUser(String userId, WebSocketMessage message) {
try {
messagingTemplate.convertAndSendToUser(
userId,
"/queue/messages",
message
);
log.info("消息 {} 已发送给用户 {}", message.getMessageId(), userId);
} catch (Exception e) {
log.error("发送消息 {} 失败", message.getMessageId(), e);
}
}
/**
* 生成消息ID
*/
private String generateMessageId() {
return UUID.randomUUID().toString().replace("-", "");
}
}
3.6 WebSocket 处理器
@Component
@Slf4j
public class WebSocketHandler extends TextWebSocketHandler {
@Autowired
private WebSocketConnectionManager connectionManager;
@Autowired
private QosMessageService qosMessageService;
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String userId = getUserIdFromSession(session);
if (userId != null) {
connectionManager.addConnection(userId, session);
log.info("用户 {} 已连接,Session ID: {}", userId, session.getId());
// 推送离线消息
qosMessageService.sendOfflineMessages(userId);
}
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
log.info("收到消息:{}", payload);
try {
WebSocketMessage wsMessage = objectMapper.readValue(payload, WebSocketMessage.class);
// 处理消息确认
if ("ACK".equals(wsMessage.getMessageType())) {
String userId = connectionManager.getUserId(session.getId());
qosMessageService.handleAck(userId, wsMessage.getMessageId());
}
} catch (Exception e) {
log.error("处理消息失败", e);
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
connectionManager.removeConnection(session.getId());
log.info("连接已关闭,Session ID: {}", session.getId());
}
private String getUserIdFromSession(WebSocketSession session) {
// 从 Session 属性中获取用户ID
// 实际项目中可以从 JWT Token 或 Session 中解析
return (String) session.getAttributes().get("userId");
}
}
3.7 消息控制器
@RestController
@RequestMapping("/api/message")
@Slf4j
public class MessageController {
@Autowired
private QosMessageService qosMessageService;
/**
* 发送消息(支持 QoS)
*/
@PostMapping("/send")
public Result sendMessage(
@RequestParam String userId,
@RequestParam String content,
@RequestParam(defaultValue = "1") Integer qos) {
log.info("发送消息给用户 {},QoS: {}", userId, qos);
qosMessageService.sendMessage(userId, content, qos);
return Result.success("消息已发送");
}
/**
* 批量发送消息
*/
@PostMapping("/send/batch")
public Result sendBatchMessage(
@RequestBody BatchMessageRequest request) {
for (String userId : request.getUserIds()) {
qosMessageService.sendMessage(userId, request.getContent(), request.getQos());
}
return Result.success("批量消息已发送");
}
/**
* 广播消息
*/
@PostMapping("/broadcast")
public Result broadcastMessage(
@RequestParam String content,
@RequestParam(defaultValue = "0") Integer qos) {
// 获取所有在线用户
Set<String> onlineUsers = connectionManager.getOnlineUsers();
for (String userId : onlineUsers) {
qosMessageService.sendMessage(userId, content, qos);
}
return Result.success("广播消息已发送");
}
}
四、前端实现
4.1 WebSocket 客户端
class WebSocketClient {
constructor(userId) {
this.userId = userId;
this.socket = null;
this.reconnectInterval = 5000;
this.maxReconnectAttempts = 10;
this.reconnectAttempts = 0;
this.pendingAcks = new Map(); // 待确认的消息
this.messageHandlers = new Map();
}
connect() {
const wsUrl = `ws://localhost:8080/ws?userId=${this.userId}`;
this.socket = new WebSocket(wsUrl);
this.socket.onopen = () => {
console.log('WebSocket 连接已建立');
this.reconnectAttempts = 0;
// 重新发送未确认的消息
this.resendPendingMessages();
};
this.socket.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
this.socket.onclose = () => {
console.log('WebSocket 连接已关闭');
this.attemptReconnect();
};
this.socket.onerror = (error) => {
console.error('WebSocket 错误:', error);
};
}
handleMessage(message) {
console.log('收到消息:', message);
// 发送确认(QoS 1/2)
if (message.qos > 0) {
this.sendAck(message.messageId);
}
// 调用消息处理器
const handler = this.messageHandlers.get(message.messageType);
if (handler) {
handler(message);
}
}
sendAck(messageId) {
const ack = {
messageType: 'ACK',
messageId: messageId
};
this.socket.send(JSON.stringify(ack));
}
attemptReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(`尝试重连... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
setTimeout(() => {
this.connect();
}, this.reconnectInterval);
}
}
onMessage(messageType, handler) {
this.messageHandlers.set(messageType, handler);
}
close() {
if (this.socket) {
this.socket.close();
}
}
}
// 使用示例
const client = new WebSocketClient('user-001');
client.onMessage('NOTIFICATION', (message) => {
console.log('收到通知:', message.content);
// 显示通知
showNotification(message.content);
});
client.connect();
4.2 消息确认机制
// 消息确认管理器
class MessageAckManager {
constructor() {
this.pendingMessages = new Map();
this.ackTimeout = 30000; // 30秒超时
this.maxRetries = 3;
}
addPendingMessage(messageId, message) {
this.pendingMessages.set(messageId, {
message: message,
retries: 0,
timestamp: Date.now()
});
// 设置超时检查
setTimeout(() => {
this.checkAckTimeout(messageId);
}, this.ackTimeout);
}
confirmMessage(messageId) {
this.pendingMessages.delete(messageId);
console.log('消息已确认:', messageId);
}
checkAckTimeout(messageId) {
const pending = this.pendingMessages.get(messageId);
if (!pending) return; // 已确认
if (pending.retries < this.maxRetries) {
// 重试发送
pending.retries++;
console.log(`消息 ${messageId} 超时,重试 ${pending.retries}/${this.maxRetries}`);
this.resendMessage(pending.message);
// 重新设置超时检查
setTimeout(() => {
this.checkAckTimeout(messageId);
}, this.ackTimeout);
} else {
// 超过最大重试次数
console.error(`消息 ${messageId} 发送失败,超过最大重试次数`);
this.pendingMessages.delete(messageId);
this.handleSendFailure(pending.message);
}
}
resendMessage(message) {
// 重新发送消息
if (window.webSocketClient && window.webSocketClient.socket) {
window.webSocketClient.socket.send(JSON.stringify(message));
}
}
handleSendFailure(message) {
// 处理发送失败
console.error('消息发送失败:', message);
// 可以在这里进行降级处理,如使用 HTTP 接口
}
}
五、最佳实践
5.1 QoS 等级选择
| 场景 | QoS 等级 | 说明 |
|---|---|---|
| 心跳包 | 0 | 不需要确认,丢失不影响业务 |
| 普通通知 | 1 | 保证送达,允许重复 |
| 支付通知 | 2 | 保证送达且不重复 |
| 订单状态 | 2 | 关键业务,不能丢失 |
| 聊天消息 | 1 | 保证送达,顺序重要 |
5.2 消息存储策略
@Service
public class MessageStorageService {
/**
* 存储消息
*/
public void storeMessage(WebSocketMessage message) {
// QoS 0:不存储
if (message.getQos() == 0) {
return;
}
// QoS 1/2:存储到数据库
messageRepository.save(message);
// 异步写入消息队列(可选)
if (message.getQos() == 2) {
kafkaTemplate.send("websocket-messages", message);
}
}
/**
* 清理已确认的消息(定时任务)
*/
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
public void cleanupAcknowledgedMessages() {
// 删除已确认且超过7天的消息
LocalDateTime before = LocalDateTime.now().minusDays(7);
messageRepository.deleteByStatusAndAckTimeBefore(2, before);
}
}
5.3 性能优化
@Service
public class WebSocketPerformanceService {
// 批量发送消息
public void sendBatchMessages(List<WebSocketMessage> messages) {
// 按用户分组
Map<String, List<WebSocketMessage>> userMessages = messages.stream()
.collect(Collectors.groupingBy(WebSocketMessage::getUserId));
// 批量发送
for (Map.Entry<String, List<WebSocketMessage>> entry : userMessages.entrySet()) {
String userId = entry.getKey();
List<WebSocketMessage> userMsgs = entry.getValue();
if (connectionManager.isOnline(userId)) {
// 用户在线,批量推送
messagingTemplate.convertAndSendToUser(
userId,
"/queue/messages",
userMsgs
);
// 批量更新状态
userMsgs.forEach(msg -> {
msg.setStatus(1);
msg.setSendTime(LocalDateTime.now());
});
messageRepository.saveAll(userMsgs);
}
}
}
// 使用缓存加速
@Cacheable(value = "onlineUsers", key = "#userId")
public boolean isUserOnline(String userId) {
return connectionManager.isOnline(userId);
}
}
5.4 监控告警
@Component
public class WebSocketMonitor {
@Autowired
private MeterRegistry meterRegistry;
/**
* 监控在线连接数
*/
@Scheduled(fixedRate = 60000)
public void monitorOnlineConnections() {
int onlineCount = connectionManager.getOnlineUserCount();
meterRegistry.gauge("websocket.online.connections", onlineCount);
if (onlineCount > 10000) {
// 发送告警
alertService.sendAlert("WebSocket 在线连接数过高: " + onlineCount);
}
}
/**
* 监控消息积压
*/
@Scheduled(fixedRate = 60000)
public void monitorMessageBacklog() {
long pendingCount = messageRepository.countByStatus(0);
meterRegistry.gauge("websocket.pending.messages", pendingCount);
if (pendingCount > 100000) {
// 发送告警
alertService.sendAlert("WebSocket 消息积压严重: " + pendingCount);
}
}
/**
* 监控消息确认率
*/
@Scheduled(fixedRate = 300000) // 每5分钟
public void monitorAckRate() {
long total = messageRepository.countByCreateTimeAfter(LocalDateTime.now().minusMinutes(5));
long acknowledged = messageRepository.countByStatusAndCreateTimeAfter(2, LocalDateTime.now().minusMinutes(5));
double ackRate = total > 0 ? (double) acknowledged / total : 1.0;
meterRegistry.gauge("websocket.ack.rate", ackRate);
if (ackRate < 0.95) {
// 发送告警
alertService.sendAlert("WebSocket 消息确认率过低: " + ackRate);
}
}
}
六、常见问题与解决方案
6.1 消息丢失
原因:
- 客户端离线期间消息未存储
- 消息存储失败
- 推送失败未重试
解决方案:
- 所有 QoS 1/2 消息先存储再推送
- 使用数据库事务保证存储成功
- 实现消息重试机制
6.2 消息重复
原因:
- 消息重试导致重复推送
- 客户端重连后重复接收
解决方案:
- 客户端维护已接收消息ID集合
- 服务端记录已确认消息
- 幂等性处理业务逻辑
6.3 连接数过多
原因:
- 用户频繁刷新页面
- 连接未正确关闭
解决方案:
- 限制单个用户的连接数
- 实现连接心跳检测
- 及时清理无效连接
6.4 消息积压
原因:
- 高峰期消息量过大
- 消费速度跟不上生产速度
解决方案:
- 批量发送消息
- 使用消息队列削峰
- 水平扩展 WebSocket 服务
七、总结
WebSocket 消息 QoS 是保证实时消息可靠传输的关键技术。本文介绍了 QoS 的概念、架构设计和 Spring Boot 实现方案。
核心要点:
- 根据业务场景选择合适的 QoS 等级
- 在线用户直接推送,离线用户存储待推送
- 实现消息确认机制,保证消息可靠传输
- 做好监控告警,及时发现和处理问题
适用场景:
- 实时通知推送
- 在线聊天系统
- 订单状态同步
- 股票行情推送
- 游戏实时对战
如果本文对你有帮助,欢迎关注「服务端技术精选」公众号,获取更多后端技术干货。
互动题:
- 在你的项目中,如何处理 WebSocket 消息丢失问题?
- QoS 1 和 QoS 2 的区别是什么?什么场景下必须使用 QoS 2?
- 如何设计一个支持百万级并发的 WebSocket 消息推送系统?
欢迎在评论区分享你的想法和经验,我们一起交流学习!
标题:SpringBoot + WebSocket 消息 QoS(服务质量):在线推优先,离线存库,确保不丢关键通知
作者:jiangyi
地址:http://jiangyi.space/articles/2026/03/08/1772943099681.html
公众号:服务端技术精选
- 引言:消息丢失的噩梦
- 一、WebSocket 消息 QoS:概念与重要性
- 1.1 什么是消息 QoS?
- 1.2 MQTT 协议中的 QoS 等级
- 1.3 为什么 WebSocket 需要 QoS?
- 二、WebSocket 消息 QoS 架构设计
- 2.1 整体架构
- 2.2 核心组件
- 三、Spring Boot + WebSocket 实现消息 QoS
- 3.1 项目依赖
- 3.2 WebSocket 配置
- 3.3 消息实体设计
- 3.4 WebSocket 连接管理器
- 3.5 QoS 消息服务
- 3.6 WebSocket 处理器
- 3.7 消息控制器
- 四、前端实现
- 4.1 WebSocket 客户端
- 4.2 消息确认机制
- 五、最佳实践
- 5.1 QoS 等级选择
- 5.2 消息存储策略
- 5.3 性能优化
- 5.4 监控告警
- 六、常见问题与解决方案
- 6.1 消息丢失
- 6.2 消息重复
- 6.3 连接数过多
- 6.4 消息积压
- 七、总结
评论
0 评论