MQ消息堆积到系统崩溃?这5个排查技巧让你10秒定位问题!

MQ消息堆积到系统崩溃?这5个排查技巧让你10秒定位问题!

作为一名后端开发,经历过太多因为MQ消息堆积导致的"血案":

  • 某电商大促期间,订单消息堆积了500万条,用户下单后2小时还没收到确认短信,客服电话被打爆
  • 某支付系统因为消息堆积,导致用户支付成功但订单状态一直未更新,老板差点把我祭天
  • 某社交平台的消息通知系统,堆积了1000万条消息,用户私信延迟了整整一天才收到

消息堆积,可以说是分布式系统中最让人闻风丧胆的问题之一。今天就结合自己踩过的坑,跟大家聊聊如何快速排查和解决MQ消息堆积问题,让你的系统稳如老狗。

一、消息堆积到底是个啥?为啥这么可怕?

简单来说,消息堆积就是生产者发送消息的速度远大于消费者处理消息的速度,导致消息在队列中越积越多。

堆积的恐怖后果

  • 内存暴涨:消息堆积会把MQ服务器的内存打满,最终导致OOM
  • 磁盘爆满:持久化消息会把磁盘写满,影响整个服务器
  • 系统雪崩:消息处理延迟越来越高,最终整个系统不可用
  • 用户体验差:用户操作后迟迟得不到响应,直接投诉

二、消息堆积的3大元凶,你肯定遇到过

在讲排查技巧之前,我们先得搞清楚消息为什么会堆积。根据我多年的踩坑经验,主要有以下3个原因:

1. 消费者处理太慢:最常见的元凶

症状:消息处理时间越来越长,消费TPS持续下降

真实案例:我们有个订单系统,消费者需要调用3个外部接口:库存接口、支付接口、物流接口。有一次物流接口响应从100ms变成了5秒,直接导致消息处理速度从1000TPS降到了50TPS,消息疯狂堆积。

2. 消费者数量不足:小马拉大车

症状:单机处理能力达到上限,但集群资源利用率很低

真实案例:某次大促活动,我们预估QPS会增长10倍,但只增加了2台消费者服务器。结果消息处理能力只提升了2倍,剩下8倍的流量直接堆积在MQ中,差点把服务器撑爆。

3. 消息生产异常:洪水猛兽般的冲击

症状:某个时间段消息量突然暴增,远超正常水平

真实案例:某次系统上线bug,导致一个循环发送消息的逻辑失控,1分钟内发送了50万条消息,而消费者正常处理能力只有1000TPS,直接GG。

三、5个核心排查技巧,10秒定位问题根源

技巧1:三板斧快速定位堆积位置

第一板斧:看监控面板

# RocketMQ查看消息堆积数量
sh mqadmin consumerProgress -n localhost:9876 -g your_consumer_group

# RabbitMQ查看队列消息数量
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged

第二板斧:看消费TPS

# 查看消费者TPS
sh mqadmin statsAll -n localhost:9876 | grep your_topic

第三板斧:看消费者状态

# 查看消费者连接情况
sh mqadmin consumerConnection -n localhost:9876 -g your_consumer_group

技巧2:线程Dump分析,揪出慢处理元凶

当消费者处理慢时,第一时间打线程Dump:

# 快速获取线程Dump
jstack [消费者进程PID] > consumer_thread_dump.txt

# 或者直接使用Arthas
arthas> thread -n 10

实战案例:有次发现消费者TPS从1000降到了100,打线程Dump发现90%的线程都在等待一个Redis响应,原来是Redis连接池配置过小,导致大量线程阻塞。

技巧3:消息轨迹追踪,找到问题消息

RocketMQ的消息轨迹功能特别好用:

// 开启消息轨迹
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_group");
consumer.setEnableMsgTrace(true);
consumer.setTraceTopic("RMQ_SYS_TRACE_TOPIC");

// 查看消息轨迹
sh mqadmin queryMsgTraceById -n localhost:9876 -i your_message_id

真实场景:通过消息轨迹发现,有10%的消息处理时间特别长,进一步分析发现这些消息都来自同一个用户,该用户的数据存在异常,导致处理逻辑卡死。

技巧4:资源监控,发现系统瓶颈

CPU监控

# 查看CPU使用率
top -p [消费者PID]

# 查看GC情况
jstat -gc [消费者PID] 1000

内存监控

# 查看内存使用
free -h

# 查看JVM内存
jmap -heap [消费者PID]

网络监控

# 查看网络连接
netstat -an | grep ESTABLISHED | wc -l

# 查看网络流量
iftop -i eth0

技巧5:日志分析,还原案发现场

关键日志位置

// 在消费者中添加详细日志
@Slf4j
@Component
public class MessageConsumer {
    
    @RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "order-consumer"
    )
    public void onMessage(String message) {
        long startTime = System.currentTimeMillis();
        String messageId = MessageHelper.getMessageId(message);
        
        try {
            log.info("开始处理消息: {}, 时间: {}", messageId, startTime);
            
            // 业务处理
            processBusinessLogic(message);
            
            long costTime = System.currentTimeMillis() - startTime;
            log.info("消息处理完成: {}, 耗时: {}ms", messageId, costTime);
            
            // 如果处理时间超过1秒,记录警告
            if (costTime > 1000) {
                log.warn("消息处理超时: {}, 耗时: {}ms", messageId, costTime);
            }
            
        } catch (Exception e) {
            log.error("消息处理失败: {}, 错误: {}", messageId, e.getMessage(), e);
            throw e;
        }
    }
}

四、实战案例:某电商平台大促期间消息堆积全记录

下面分享一个真实的电商平台大促期间消息堆积排查案例。

1. 事故发生

大促开始1小时后,监控系统疯狂报警:

  • 订单消息堆积数量:从0暴涨到200万条
  • 消费TPS:从正常的2000TPS降到了200TPS
  • 消息延迟:从1秒增加到5分钟

2. 紧急排查过程

第一步:快速定位问题

# 1. 查看消费者状态
sh mqadmin consumerProgress -n localhost:9876 -g order-consumer-group
# 结果显示:未消费消息200万,延迟5分钟

# 2. 查看消费者连接
sh mqadmin consumerConnection -n localhost:9876 -g order-consumer-group
# 结果显示:只有5台消费者在线,应该有20台

第二步:分析消费者日志
发现大量如下异常:

2024-01-15 14:32:15 ERROR [OrderConsumer] - 调用库存服务超时: Read timed out after 3000ms
2024-01-15 14:32:16 ERROR [OrderConsumer] - 消息处理失败,准备重试

第三步:查看外部依赖
发现库存服务的响应时间从正常的100ms增加到了3秒以上,导致消费者处理时间大幅增加。

3. 紧急处理方案

临时方案(5分钟内实施):

  1. 扩容消费者:紧急启动15台消费者服务器,消费者数量从5台增加到20台
  2. 调整超时时间:将库存服务调用超时时间从3秒调整为1秒,快速失败
  3. 开启批量消费:将单条消费改为批量消费,一次处理10条消息

永久方案(2天内实施):

  1. 库存服务优化:增加库存服务服务器数量,优化数据库查询
  2. 缓存预热:大促前预热库存缓存,减少数据库查询
  3. 异步处理:将非核心的库存扣减改为异步处理

4. 效果验证

  • 消息堆积数量:2小时内从200万降到0
  • 消费TPS:提升到5000TPS,是原来的2.5倍
  • 消息延迟:稳定在1秒以内

五、6个避坑小贴士,让你远离消息堆积

1. 监控告警要做全

# RocketMQ监控配置示例
rocketmq:
  consumer:
    metrics:
      enabled: true
      delay-level: 2  # 2分钟延迟就报警
      accumulation: 10000  # 堆积1万条就报警

2. 消费者要支持水平扩展

// 使用容器化部署,支持快速扩容
@Component
public class MessageConsumer {
    
    @Value("${consumer.thread-count:10}")
    private int consumeThreadCount;
    
    @RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "order-consumer",
        consumeThreadNumber = "#{${consumer.thread-count}}"
    )
    public void onMessage(String message) {
        // 处理逻辑
    }
}

3. 设置合理的消费超时时间

// RocketMQ配置
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer");
consumer.setConsumeTimeout(15);  // 15秒超时,避免长时间阻塞

// RabbitMQ配置
container.setDefaultRequeueRejected(false);  // 失败后直接进入死信队列

4. 消息分级处理

// 高优先级消息优先处理
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer",
    selectorExpression = "orderType='urgent'"  // 紧急订单优先
)
public void processUrgentOrder(String message) {
    // 紧急订单处理逻辑
}

5. 定期清理过期消息

# 定期清理死信队列中的过期消息
sh mqadmin deleteTopic -n localhost:9876 -t DLQ_order-consumer

6. 做好压力测试

// 压测脚本示例
@Test
public void testMessageAccumulation() {
    // 模拟10倍流量冲击
    for (int i = 0; i < 1000000; i++) {
        rocketMQTemplate.asyncSend("order-topic", buildOrderMessage());
    }
    
    // 验证消费者处理能力
    await().atMost(5, TimeUnit.MINUTES)
        .until(() -> getAccumulatedMessageCount() == 0);
}

六、总结:消息堆积处理的黄金法则

  1. 监控先行:没有监控就没有发言权,实时掌握消息堆积情况
  2. 快速定位:三板斧(看监控、打Dump、查日志)快速找到问题
  3. 先治标再治本:先扩容缓解,再优化根治
  4. 预防为主:压测、限流、熔断,把问题消灭在萌芽状态

最后再提醒一句:消息堆积不可怕,可怕的是没有预案。建议每个季度都做一次消息堆积的演练,确保真正出问题的时候能够快速响应。

觉得有用的话,别忘了关注我的公众号【服务端技术精选】,点赞、在看、转发三连哦!咱们下期见~


标题:MQ消息堆积到系统崩溃?这5个排查技巧让你10秒定位问题!
作者:jiangyi
地址:http://jiangyi.space/articles/2025/12/21/1766304278157.html

    0 评论
avatar