消息推送总延迟?这7步架构设计让你的推送秒到用户手机!

消息推送总延迟?这7步架构设计让你的推送秒到用户手机!

作为一名后端开发,经历过太多消息推送的"惨案":

  • 某电商大促期间,优惠券推送延迟2小时,用户错过了最佳抢购时机,客服电话被打爆
  • 某社交平台重要通知推送延迟,用户错过了好友消息,投诉量暴涨300%
  • 某金融APP的到账通知推送失败,用户以为钱丢了,差点报警

消息推送,看似简单,实则暗藏杀机。今天就结合自己踩过的坑,跟大家聊聊实时订阅推送到底是怎么实现的,让你的推送秒到用户手机!

一、实时推送到底是个啥?为啥大家都在用?

实时推送的核心就是:把消息实时送到用户设备,让用户第一时间收到重要信息。

为啥实时推送这么香?

  • 用户体验好:重要消息秒到,用户不会错过关键信息
  • 业务价值高:电商大促推送能提升30%转化率,社交平台推送能增加50%用户活跃度
  • 技术逼格高:看起来就很高级,用户觉得你很专业

二、实时推送的7步架构设计,一步都不能错!

实时推送就像送快递,得按流程来,一步走错就GG。

第1步:消息接入层,就像"快递收件点"

消息先进入接入层,这里要扛住高并发,不能让用户发消息就卡死。

核心代码

@RestController
public class MessageReceiveController {
    
    @Autowired
    private KafkaProducer kafkaProducer;
    
    @PostMapping("/api/message/send")
    public SendResponse sendMessage(@RequestBody MessageRequest request) {
        try {
            // 参数校验
            validateMessage(request);
            
            // 生成消息ID
            String messageId = UUID.randomUUID().toString();
            
            // 封装消息
            PushMessage message = PushMessage.builder()
                .messageId(messageId)
                .userId(request.getUserId())
                .content(request.getContent())
                .messageType(request.getType())
                .priority(request.getPriority())
                .timestamp(System.currentTimeMillis())
                .build();
            
            // 发送到Kafka,按用户ID分区保证顺序性
            kafkaProducer.send("message-topic", 
                message.getUserId().hashCode() % 10, 
                messageId, 
                message);
            
            return SendResponse.success(messageId);
            
        } catch (Exception e) {
            log.error("发送消息失败", e);
            return SendResponse.fail("消息发送失败");
        }
    }
}

第2步:消息路由层,就像"快递分拣中心"

根据消息类型和用户标签,把消息分到不同的处理队列。

消息路由逻辑

@Component
public class MessageRouter {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    public void routeMessage(PushMessage message) {
        String userId = message.getUserId();
        
        // 查询用户订阅配置
        UserSubscription subscription = getUserSubscription(userId);
        
        // 检查用户是否订阅了此类消息
        if (!subscription.isSubscribed(message.getMessageType())) {
            log.info("用户未订阅此类消息: {}", message.getMessageType());
            return;
        }
        
        // 根据消息优先级路由到不同队列
        String queueName = determineQueueName(message);
        
        // 发送到对应队列
        redisTemplate.opsForList().rightPush(queueName, message);
        
        // 记录消息统计
        recordMessageStats(message);
    }
    
    private String determineQueueName(PushMessage message) {
        // 高优先级消息走快速通道
        if (message.getPriority() >= 8) {
            return "high-priority-queue";
        }
        // 普通消息按用户分片
        int shard = message.getUserId().hashCode() % 20;
        return "normal-queue-" + shard;
    }
}

第3步:用户连接管理,就像"快递地址簿"

管理所有在线用户的WebSocket连接,支持千万级并发。

连接管理核心代码

@Component
public class WebSocketConnectionManager {
    
    // 用户ID -> WebSocket会话
    private final Map<String, WebSocketSession> userSessions = 
        new ConcurrentHashMap<>();
    
    // Redis存储在线状态
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    public void addConnection(String userId, WebSocketSession session) {
        // 存储WebSocket连接
        userSessions.put(userId, session);
        
        // 标记用户在线状态
        redisTemplate.opsForValue().set(
            "online:" + userId, 
            "1", 
            Duration.ofMinutes(5)
        );
        
        log.info("用户连接建立: {}", userId);
    }
    
    public void removeConnection(String userId) {
        userSessions.remove(userId);
        redisTemplate.delete("online:" + userId);
        log.info("用户连接断开: {}", userId);
    }
    
    public boolean isUserOnline(String userId) {
        return redisTemplate.hasKey("online:" + userId);
    }
    
    public void sendToUser(String userId, String message) {
        WebSocketSession session = userSessions.get(userId);
        if (session != null && session.isOpen()) {
            try {
                session.sendMessage(new TextMessage(message));
                log.info("消息推送成功: {}", userId);
            } catch (Exception e) {
                log.error("消息推送失败: {}", userId, e);
                removeConnection(userId);
            }
        }
    }
}

第4步:消息推送引擎,就像"快递派送员"

核心推送逻辑,支持多种推送策略和失败重试。

推送引擎实现

@Service
public class PushEngine {
    
    @Autowired
    private WebSocketConnectionManager connectionManager;
    
    @Autowired
    private PushRetryService retryService;
    
    @Scheduled(fixedDelay = 100)
    public void processMessages() {
        // 处理高优先级消息
        processHighPriorityMessages();
        
        // 处理普通消息
        processNormalMessages();
    }
    
    private void processHighPriorityMessages() {
        String messageJson = redisTemplate.opsForList().leftPop("high-priority-queue");
        if (messageJson != null) {
            PushMessage message = JSON.parseObject(messageJson, PushMessage.class);
            pushToUser(message);
        }
    }
    
    private void pushToUser(PushMessage message) {
        String userId = message.getUserId();
        
        // 检查用户是否在线
        if (!connectionManager.isUserOnline(userId)) {
            // 用户不在线,加入重试队列
            retryService.addToRetryQueue(message);
            return;
        }
        
        try {
            // 构造推送消息
            PushNotification notification = PushNotification.builder()
                .title(getMessageTitle(message))
                .content(message.getContent())
                .messageId(message.getMessageId())
                .timestamp(message.getTimestamp())
                .build();
            
            // 发送消息
            connectionManager.sendToUser(userId, JSON.toJSONString(notification));
            
            // 记录推送成功
            recordPushSuccess(message);
            
        } catch (Exception e) {
            log.error("推送失败,加入重试队列: {}", message.getMessageId(), e);
            retryService.addToRetryQueue(message);
        }
    }
}

第5步:离线消息存储,就像"快递代收点"

用户不在线时,消息要保存起来,等用户上线再推送。

离线消息存储

@Component
public class OfflineMessageStorage {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    private static final String OFFLINE_KEY_PREFIX = "offline:";
    private static final int MAX_OFFLINE_MESSAGES = 1000;
    
    public void storeOfflineMessage(String userId, PushMessage message) {
        String key = OFFLINE_KEY_PREFIX + userId;
        
        // 使用Redis List存储离线消息,限制数量
        Long size = redisTemplate.opsForList().size(key);
        if (size >= MAX_OFFLINE_MESSAGES) {
            // 消息过多,移除最早的消息
            redisTemplate.opsForList().leftPop(key);
        }
        
        // 存储消息
        redisTemplate.opsForList().rightPush(key, JSON.toJSONString(message));
        
        // 设置过期时间(7天)
        redisTemplate.expire(key, Duration.ofDays(7));
    }
    
    public List<PushMessage> getOfflineMessages(String userId) {
        String key = OFFLINE_KEY_PREFIX + userId;
        List<String> messages = redisTemplate.opsForList().range(key, 0, -1);
        
        if (messages == null) {
            return Collections.emptyList();
        }
        
        return messages.stream()
            .map(msg -> JSON.parseObject(msg, PushMessage.class))
            .collect(Collectors.toList());
    }
    
    public void clearOfflineMessages(String userId) {
        redisTemplate.delete(OFFLINE_KEY_PREFIX + userId);
    }
}

第6步:消息重试机制,就像"快递二次派送"

推送失败的消息要有重试机制,确保最终送达。

重试机制实现

@Service
public class PushRetryService {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    private PushEngine pushEngine;
    
    private static final String RETRY_QUEUE_PREFIX = "retry:";
    private static final int[] RETRY_DELAYS = {1, 5, 30, 300, 1800}; // 秒
    
    public void addToRetryQueue(PushMessage message) {
        String retryKey = RETRY_QUEUE_PREFIX + message.getMessageId();
        
        // 记录重试次数
        int retryCount = 0;
        message.setRetryCount(retryCount);
        
        // 设置首次重试时间
        long nextRetryTime = System.currentTimeMillis() + 
            RETRY_DELAYS[retryCount] * 1000;
        
        redisTemplate.opsForZSet().add(
            "retry-queue", 
            JSON.toJSONString(message), 
            nextRetryTime
        );
    }
    
    @Scheduled(fixedDelay = 1000)
    public void processRetryMessages() {
        long now = System.currentTimeMillis();
        
        // 获取需要重试的消息
        Set<String> messages = redisTemplate.opsForZSet()
            .rangeByScore("retry-queue", 0, now);
        
        if (messages != null) {
            for (String messageJson : messages) {
                PushMessage message = JSON.parseObject(messageJson, PushMessage.class);
                
                // 从重试队列移除
                redisTemplate.opsForZSet().remove("retry-queue", messageJson);
                
                // 检查重试次数
                if (message.getRetryCount() >= RETRY_DELAYS.length) {
                    // 重试次数用尽,标记为失败
                    markMessageFailed(message);
                    continue;
                }
                
                // 重新推送
                pushEngine.pushToUser(message);
                
                // 增加重试次数,再次入队
                message.setRetryCount(message.getRetryCount() + 1);
                long nextRetryTime = System.currentTimeMillis() + 
                    RETRY_DELAYS[message.getRetryCount()] * 1000;
                
                redisTemplate.opsForZSet().add(
                    "retry-queue", 
                    JSON.toJSONString(message), 
                    nextRetryTime
                );
            }
        }
    }
}

第7步:监控告警,就像"快递跟踪系统"

实时监控推送成功率、延迟等指标,出问题立即告警。

监控告警实现

@Component
public class PushMonitor {
    
    private final MeterRegistry meterRegistry;
    private final Counter pushSuccessCounter;
    private final Counter pushFailureCounter;
    private final Timer pushLatencyTimer;
    
    public PushMonitor(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.pushSuccessCounter = Counter.builder("push_success_total")
            .description("推送成功的消息总数")
            .register(meterRegistry);
        this.pushFailureCounter = Counter.builder("push_failure_total")
            .description("推送失败的消息总数")
            .register(meterRegistry);
        this.pushLatencyTimer = Timer.builder("push_latency")
            .description("推送延迟时间")
            .register(meterRegistry);
    }
    
    public void recordPushSuccess(PushMessage message, long latencyMs) {
        pushSuccessCounter.increment();
        pushLatencyTimer.record(latencyMs, TimeUnit.MILLISECONDS);
        
        // 记录用户维度的推送成功
        meterRegistry.counter("push_success_user", "user_id", message.getUserId())
            .increment();
    }
    
    public void recordPushFailure(PushMessage message, String reason) {
        pushFailureCounter.increment();
        
        // 按失败原因统计
        meterRegistry.counter("push_failure_reason", "reason", reason)
            .increment();
    }
    
    @Scheduled(fixedDelay = 30000)
    public void checkMetrics() {
        // 检查推送成功率
        double successRate = getSuccessRate();
        if (successRate < 0.95) {
            // 告警:推送成功率低于95%
            sendAlert("推送成功率异常:" + successRate);
        }
        
        // 检查平均延迟
        double avgLatency = getAverageLatency();
        if (avgLatency > 1000) {
            // 告警:平均延迟超过1秒
            sendAlert("推送延迟异常:" + avgLatency + "ms");
        }
    }
}

三、实战案例:某电商平台的实时推送架构演进

下面分享一个真实的电商平台从日活1万到500万的推送架构演进案例。

1. 第一阶段:单机推送(日活1万)

架构特点

  • 单机WebSocket服务器
  • Redis存储在线状态
  • 单机处理推送

遇到的问题

  • 单机只能支撑1万并发
  • 服务器重启导致所有连接断开
  • 消息丢失严重

2. 第二阶段:集群推送(日活10万)

架构升级

# Nginx负载均衡配置
upstream websocket_backend {
    server 192.168.1.10:8080 weight=3;
    server 192.168.1.11:8080 weight=3;
    server 192.168.1.12:8080 weight=2;
    ip_hash;  # 保证同一用户的连接落在同一台服务器
}

server {
    listen 80;
    server_name push.yourdomain.com;
    
    location /ws {
        proxy_pass http://websocket_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
    }
}

3. 第三阶段:分布式推送(日活100万)

技术方案

  • Kafka消息队列解耦
  • Redis集群存储状态
  • 多机房部署

Redis集群配置

# Redis集群配置
spring:
  redis:
    cluster:
      nodes:
        - 192.168.1.21:7000
        - 192.168.1.22:7000
        - 192.168.1.23:7000
        - 192.168.1.24:7000
        - 192.168.1.25:7000
        - 192.168.1.26:7000
      max-redirects: 3
    timeout: 3000
    lettuce:
      pool:
        max-active: 1000
        max-idle: 100
        min-idle: 50

4. 第四阶段:亿级推送架构(日活500万)

最终架构

  • 全球多机房部署
  • 消息分级处理
  • 智能路由
  • 实时监控

性能指标

  • 支持500万日活用户同时在线
  • 消息推送延迟<100ms
  • 推送成功率>99.9%
  • 峰值QPS 10万

四、7个避坑指南,90%的人都踩过!

1. WebSocket连接管理坑

问题:连接泄露导致内存溢出
解决方案

// 定期清理无效连接
@Scheduled(fixedDelay = 60000)
public void cleanupConnections() {
    userSessions.entrySet().removeIf(entry -> {
        WebSocketSession session = entry.getValue();
        return !session.isOpen();
    });
}

2. Redis集群脑裂坑

问题:Redis集群脑裂导致状态不一致
解决方案

  • 使用哨兵模式
  • 设置合理的超时时间
  • 降级到单机模式

3. 消息顺序性坑

问题:同一用户的消息乱序到达
解决方案

  • Kafka按用户ID分区
  • 使用消息序号
  • 客户端本地排序

4. 消息幂等性坑

问题:重复推送同一条消息
解决方案

// 消息去重
public boolean isDuplicate(String messageId) {
    String key = "msg:dedup:" + messageId;
    Boolean result = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofHours(1));
    return result != null && !result;
}

5. 客户端断线重连坑

问题:客户端断线后疯狂重连导致服务器崩溃
解决方案

  • 指数退避重连
  • 设置最大重连次数
  • 使用心跳机制

6. 消息体大小坑

问题:推送大消息导致网络阻塞
解决方案

  • 限制单条消息<1KB
  • 大消息使用CDN链接
  • 分批推送

7. 跨域问题坑

问题:WebSocket跨域连接失败
解决方案

@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
    
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(pushHandler(), "/ws")
                .setAllowedOrigins("*")  // 允许跨域
                .addInterceptors(new HandshakeInterceptor());
    }
}

五、5个核心监控指标,缺一不可!

1. 在线用户数监控

// 实时监控在线用户数
Gauge.builder("websocket.online.users")
    .register(meterRegistry, () -> connectionManager.getOnlineUserCount());

2. 消息推送成功率

  • 目标值:>99.9%
  • 告警阈值:<99%

3. 消息推送延迟

  • 目标值:<100ms
  • 告警阈值:>500ms

4. WebSocket连接数

  • 单机最大连接数:5万
  • 集群总连接数:100万

5. 系统资源使用率

  • CPU使用率:<70%
  • 内存使用率:<80%
  • 网络带宽:<70%

六、总结:实时推送架构的4个关键点

  1. 高可用:多机房部署,故障自动切换
  2. 高性能:消息队列解耦,异步处理
  3. 高并发:WebSocket集群,负载均衡
  4. 可监控:全链路监控,实时告警

这套架构我们已经稳定运行2年,支撑了从日活1万到500万的业务增长。记住:推送系统的核心是可靠性,宁可慢一点,也不能丢消息!

如果你也在做推送系统,欢迎留言交流踩坑经验!


标题:消息推送总延迟?这7步架构设计让你的推送秒到用户手机!
作者:jiangyi
地址:http://jiangyi.space/articles/2025/12/21/1766304288464.html

    0 评论
avatar