SpringBoot + WebSocket + STOMP:支持群聊、@提醒、消息回执的企业 IM 系统实战
传统IM系统的挑战
在我们的日常开发工作中,经常会遇到这样的需求:
- 需要实现实时聊天功能,支持一对一和群聊
- 要有@提醒功能,让用户不错过重要消息
- 需要消息回执,确保消息已送达
- 要支持离线消息推送
- 要有良好的性能和扩展性
如果用传统的HTTP轮询方式,不仅服务器压力大,用户体验也不好。今天我们就用WebSocket + STOMP技术来解决这些问题。
解决方案思路
今天我们要解决的,就是如何用SpringBoot + WebSocket + STOMP构建一个功能完整的企业IM系统。
核心思路是:
- WebSocket连接:建立持久化的双向通信通道
- STOMP协议:在WebSocket之上构建消息传递框架
- 消息路由:实现精确的消息推送和路由
- 状态管理:管理用户在线状态和消息状态
技术选型
- SpringBoot:快速搭建应用
- WebSocket:实时双向通信
- STOMP:消息传递协议
- Redis:消息存储和用户状态管理
- Spring Security:连接认证
核心实现思路
1. WebSocket配置
首先配置WebSocket和STOMP:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 配置消息代理
config.enableSimpleBroker("/topic", "/queue"); // 订阅频道前缀
config.setApplicationDestinationPrefixes("/app"); // 应用目标前缀
config.setUserDestinationPrefix("/user"); // 用户私信前缀
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册STOMP端点
registry.addEndpoint("/ws") // WebSocket连接端点
.setAllowedOriginPatterns("*") // 允许跨域
.withSockJS(); // 支持降级到HTTP轮询
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
// 配置WebSocket传输参数
registration.setMessageSizeLimit(8192) // 消息大小限制
.setSendTimeLimit(20000) // 发送时间限制
.setSendBufferSizeLimit(8192); // 发送缓冲区限制
}
}
2. 消息模型定义
定义消息相关的数据模型:
@Data
public class ChatMessage {
private String id;
private String senderId;
private String senderName;
private String receiverId; // 单聊接收者
private String roomId; // 群聊房间ID
private MessageType type; // 消息类型:TEXT, IMAGE, FILE等
private String content;
private Long timestamp;
private MessageStatus status; // 消息状态:SENT, DELIVERED, READ
private List<String> mentionedUsers; // @提醒的用户列表
private String replyTo; // 回复的消息ID
public enum MessageType {
TEXT, IMAGE, FILE, SYSTEM
}
public enum MessageStatus {
SENT, DELIVERED, READ
}
}
@Data
public class ChatRoom {
private String id;
private String name;
private String type; // PRIVATE, GROUP
private Set<String> members;
private String creatorId;
private LocalDateTime createTime;
}
3. 消息处理控制器
实现消息处理逻辑:
@Controller
@Slf4j
public class ChatController {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private ChatService chatService;
/**
* 发送消息
*/
@MessageMapping("/chat.sendMessage")
@SendTo("/topic/public")
public ChatMessage sendMessage(@Payload ChatMessage message) {
// 保存消息到数据库
ChatMessage savedMessage = chatService.saveMessage(message);
// 处理@提醒
handleMentions(savedMessage);
// 更新消息状态为已发送
chatService.updateMessageStatus(savedMessage.getId(), MessageStatus.SENT);
return savedMessage;
}
/**
* 发送私信
*/
@MessageMapping("/chat.sendPrivateMessage")
public void sendPrivateMessage(@Payload ChatMessage message, Principal principal) {
// 保存消息
ChatMessage savedMessage = chatService.saveMessage(message);
// 发送到指定用户
messagingTemplate.convertAndSendToUser(
message.getReceiverId(),
"/queue/messages",
savedMessage
);
// 更新消息状态
chatService.updateMessageStatus(savedMessage.getId(), MessageStatus.SENT);
}
/**
* 发送群聊消息
*/
@MessageMapping("/chat.sendGroupMessage")
public void sendGroupMessage(@Payload ChatMessage message, Principal principal) {
// 验证用户是否有权限发送消息到群聊
if (!chatService.isUserInChatRoom(principal.getName(), message.getRoomId())) {
throw new UnauthorizedException("用户不在群聊中");
}
// 保存消息
ChatMessage savedMessage = chatService.saveMessage(message);
// 发送到群聊
messagingTemplate.convertAndSend(
"/topic/chatroom/" + message.getRoomId(),
savedMessage
);
// 更新消息状态
chatService.updateMessageStatus(savedMessage.getId(), MessageStatus.SENT);
}
/**
* 处理@提醒
*/
private void handleMentions(ChatMessage message) {
if (message.getMentionedUsers() != null && !message.getMentionedUsers().isEmpty()) {
for (String mentionedUserId : message.getMentionedUsers()) {
// 发送提醒通知
ChatMessage mentionNotification = new ChatMessage();
mentionNotification.setType(ChatMessage.MessageType.SYSTEM);
mentionNotification.setContent("您在群聊中被@" + message.getSenderName() + "提及");
mentionNotification.setSenderId("system");
mentionNotification.setReceiverId(mentionedUserId);
mentionNotification.setTimestamp(System.currentTimeMillis());
messagingTemplate.convertAndSendToUser(
mentionedUserId,
"/queue/notifications",
mentionNotification
);
}
}
}
/**
* 消息回执
*/
@MessageMapping("/chat.messageReceipt")
public void messageReceipt(@Payload MessageReceipt receipt, Principal principal) {
// 更新消息状态
chatService.updateMessageStatus(receipt.getMessageId(), receipt.getStatus());
// 如果是已读回执,通知发送方
if (receipt.getStatus() == MessageStatus.READ) {
ChatMessage readNotification = new ChatMessage();
readNotification.setType(ChatMessage.MessageType.SYSTEM);
readNotification.setContent("消息已被阅读");
readNotification.setSenderId(receipt.getUserId());
readNotification.setReceiverId(receipt.getSenderId());
readNotification.setTimestamp(System.currentTimeMillis());
messagingTemplate.convertAndSendToUser(
receipt.getSenderId(),
"/queue/messages",
readNotification
);
}
}
/**
* 用户上线
*/
@EventListener
public void handleWebSocketConnectListener(SessionConnectedEvent event) {
log.info("用户连接: {}", event.getMessage().getHeaders().get("simpSessionId"));
}
/**
* 用户断开连接
*/
@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
String sessionId = event.getSessionId();
String userId = chatService.getUserIdBySessionId(sessionId);
if (userId != null) {
// 更新用户状态为离线
chatService.updateUserStatus(userId, UserStatus.OFFLINE);
// 通知其他用户
ChatMessage statusMessage = new ChatMessage();
statusMessage.setType(ChatMessage.MessageType.SYSTEM);
statusMessage.setContent(userId + " 已离线");
statusMessage.setSenderId("system");
statusMessage.setTimestamp(System.currentTimeMillis());
messagingTemplate.convertAndSend("/topic/public", statusMessage);
}
}
}
4. 聊天服务实现
实现核心的聊天业务逻辑:
@Service
@Transactional
public class ChatService {
@Autowired
private MessageRepository messageRepository;
@Autowired
private UserRepository userRepository;
@Autowired
private ChatRoomRepository chatRoomRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 保存消息
*/
public ChatMessage saveMessage(ChatMessage message) {
message.setId(UUID.randomUUID().toString());
message.setTimestamp(System.currentTimeMillis());
message.setStatus(MessageStatus.SENT);
return messageRepository.save(message);
}
/**
* 更新消息状态
*/
public void updateMessageStatus(String messageId, MessageStatus status) {
messageRepository.updateMessageStatus(messageId, status);
}
/**
* 获取聊天记录
*/
public List<ChatMessage> getChatHistory(String senderId, String receiverId, int page, int size) {
return messageRepository.findChatHistory(senderId, receiverId, page, size);
}
/**
* 获取群聊记录
*/
public List<ChatMessage> getGroupChatHistory(String roomId, int page, int size) {
return messageRepository.findGroupChatHistory(roomId, page, size);
}
/**
* 检查用户是否在群聊中
*/
public boolean isUserInChatRoom(String userId, String roomId) {
return chatRoomRepository.isUserInRoom(userId, roomId);
}
/**
* 获取在线用户
*/
public Set<String> getOnlineUsers() {
// 从Redis中获取在线用户列表
return redisTemplate.opsForSet().members("online_users");
}
/**
* 更新用户状态
*/
public void updateUserStatus(String userId, UserStatus status) {
if (UserStatus.ONLINE.equals(status)) {
redisTemplate.opsForSet().add("online_users", userId);
} else {
redisTemplate.opsForSet().remove("online_users", userId);
}
}
/**
* 获取用户ID通过Session ID
*/
public String getUserIdBySessionId(String sessionId) {
// 从Redis中获取Session到用户ID的映射
return (String) redisTemplate.opsForValue().get("session:" + sessionId);
}
}
5. 安全认证配置
配置WebSocket连接的安全认证:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketSecurityConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// 连接认证
String token = accessor.getFirstNativeHeader("Authorization");
if (token != null && token.startsWith("Bearer ")) {
String jwt = token.substring(7);
String username = JwtUtil.getUsernameFromToken(jwt);
if (username != null && JwtUtil.validateToken(jwt)) {
// 创建认证对象
UserDetails userDetails = userDetailsService.loadUserByUsername(username);
UsernamePasswordAuthenticationToken authentication =
new UsernamePasswordAuthenticationToken(
userDetails, null, userDetails.getAuthorities()
);
accessor.setUser(authentication);
}
}
}
return message;
}
});
}
}
6. 消息存储优化
使用Redis优化消息存储和推送:
@Service
public class RedisMessageService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 保存离线消息
*/
public void saveOfflineMessage(String userId, ChatMessage message) {
String key = "offline_messages:" + userId;
redisTemplate.opsForList().rightPush(key, message);
// 设置过期时间,避免无限存储
redisTemplate.expire(key, Duration.ofDays(7));
}
/**
* 获取离线消息
*/
public List<ChatMessage> getOfflineMessages(String userId) {
String key = "offline_messages:" + userId;
List<ChatMessage> messages = redisTemplate.opsForList().range(key, 0, -1);
// 清除已获取的离线消息
redisTemplate.delete(key);
return messages != null ? messages : new ArrayList<>();
}
/**
* 保存用户Session映射
*/
public void saveUserSession(String sessionId, String userId) {
redisTemplate.opsForValue().set("session:" + sessionId, userId);
redisTemplate.expire("session:" + sessionId, Duration.ofHours(24));
}
}
7. REST API接口
提供额外的REST API接口:
@RestController
@RequestMapping("/api/chat")
public class ChatRestController {
@Autowired
private ChatService chatService;
@Autowired
private RedisMessageService redisMessageService;
/**
* 获取聊天记录
*/
@GetMapping("/history")
public Result<List<ChatMessage>> getChatHistory(
@RequestParam String userId,
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size) {
List<ChatMessage> history = chatService.getChatHistory(
SecurityUtil.getCurrentUserId(), userId, page, size);
return Result.success(history);
}
/**
* 获取群聊记录
*/
@GetMapping("/group/{roomId}/history")
public Result<List<ChatMessage>> getGroupHistory(
@PathVariable String roomId,
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size) {
List<ChatMessage> history = chatService.getGroupChatHistory(roomId, page, size);
return Result.success(history);
}
/**
* 获取离线消息
*/
@GetMapping("/offline-messages")
public Result<List<ChatMessage>> getOfflineMessages() {
String userId = SecurityUtil.getCurrentUserId();
List<ChatMessage> offlineMessages = redisMessageService.getOfflineMessages(userId);
return Result.success(offlineMessages);
}
/**
* 获取在线用户列表
*/
@GetMapping("/online-users")
public Result<Set<String>> getOnlineUsers() {
Set<String> onlineUsers = chatService.getOnlineUsers();
return Result.success(onlineUsers);
}
}
性能优化策略
1. 消息分页加载
// 前端实现消息分页加载
function loadMoreMessages() {
const currentOffset = messages.length;
fetch(`/api/chat/history?userId=${targetUserId}&page=${currentOffset}&size=20`)
.then(response => response.json())
.then(data => {
messages.unshift(...data.data);
renderMessages();
});
}
2. 消息压缩
// 消息压缩配置
@Configuration
public class MessageCompressionConfig {
@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setObjectMapper(new ObjectMapper()
.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false));
return converter;
}
}
优势分析
相比传统的HTTP轮询方式,WebSocket + STOMP方案的优势明显:
- 实时性:毫秒级消息推送
- 资源消耗低:单个连接处理多个消息
- 扩展性好:支持集群部署
- 功能丰富:支持复杂的聊天功能
- 用户体验佳:接近原生应用体验
注意事项
- 连接管理:需要处理连接断开重连逻辑
- 消息顺序:保证消息的有序性
- 安全防护:防止消息伪造和重放攻击
- 资源限制:控制单个用户的连接数
- 监控告警:建立连接和消息处理监控
总结
通过SpringBoot + WebSocket + STOMP的技术组合,我们可以构建一个功能完整、性能优异的企业IM系统。这不仅能提升团队协作效率,还能为用户提供良好的沟通体验。
在实际项目中,建议根据具体业务需求进行定制化开发,并充分考虑安全性、性能和可扩展性等因素。
服务端技术精选,专注分享后端开发实战技术,助力你的技术成长!
标题:SpringBoot + WebSocket + STOMP:支持群聊、@提醒、消息回执的企业 IM 系统实战
作者:jiangyi
地址:http://jiangyi.space/articles/2026/01/12/1768215056068.html
0 评论