文章 548
评论 5
浏览 173088
消费者挂了半小时,RocketMQ里堆了800万条消息,恢复后消费了6个小时

消费者挂了半小时,RocketMQ里堆了800万条消息,恢复后消费了6个小时

那天下午,库存服务一次 fullGC 把消费者线程全停了。GC 持续了 28 分钟,消费者一直没心跳,RocketMQ 以为它还活着就没触发重平衡。等 GC 结束消费者恢复,队列里已经堆了 800 万条订单消息。

按正常消费速度,一个消费者每秒 200 条,800 万条要 11 个小时才能消化完。下游的物流、短信、积分服务全在等这条消费者的结果——11 个小时的延迟等于业务停摆。

这不是"能不能消费完"的问题。这是"等你消费完,用户早就不关心了"的问题。

我们当时做了一个临时救急:在不停原有消费者的前提下,多开了 10 个临时订阅组来瓜分积压。


一、为什么不能直接加消费者实例

第一反应是给原消费者组多加几个 Pod。但因为原有消费者的队列分区是固定的——假设 8 个 MessageQueue,原消费者组 8 个 Pod,每个 Pod 独占 1 个 Queue,已经分配好了。

直接加 Pod 到同一个组里没用——RocketMQ 是 Queue 粒度分配,8 个 Queue 最多 8 个消费者实例。加第 9 个 Pod 它会被晾在那,一个 Queue 也分不到。

也不能直接增加 Queue 数量——RocketMQ 的 Queue 创建时定下来,运行时不能改。

更不能 kill 原消费者重建——已经消费了一半,重建后从哪开始都是问题。


二、紧急预案:临时多订阅组

思路不复杂:原消费者组继续消费自己的 Queue,临建一组新的消费者,用不同的 groupId 订阅同一个 Topic,从最新 offset 开始拉消息。 新旧两组同时消费,把积压均摊。

@Component
public class EmergencyConsumerDeployer {
    
    @Autowired private DefaultMQPushConsumer originalConsumer;
    
    // 临时消费者——独立订阅组,独立消费进度
    private final List<DefaultMQPushConsumer> emergencyConsumers = new ArrayList<>();
    
    /**
     * 紧急扩容:
     * @param count 临时消费者数量
     */
    public void deployEmergencyConsumers(int count) {
        for (int i = 0; i < count; i++) {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                "emergency-group-" + System.currentTimeMillis() + "-" + i
            );
            
            consumer.setNamesrvAddr(originalConsumer.getNamesrvAddr());
            consumer.subscribe("order-topic", "*");
            
            // 关键:从最新 offset 开始,不碰旧消息
            consumer.setConsumeFromWhere(
                ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
            );
            
            consumer.registerMessageListener((MessageListenerConcurrently) 
                (msgs, context) -> {
                    processOrder(msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            );
            
            consumer.start();
            emergencyConsumers.add(consumer);
            
            log.info("临时消费者启动: group={}", consumer.getConsumerGroup());
        }
    }
    
    /**
     * 积压消费完毕后,优雅下线临时消费者
     */
    public void shutdownEmergencyConsumers() {
        for (DefaultMQPushConsumer consumer : emergencyConsumers) {
            consumer.shutdown();
        }
        emergencyConsumers.clear();
        log.info("临时消费者已全部下线");
    }
}

为什么从最新 offset 开始?因为原消费者正在从旧 offset 慢慢往前追。临时消费者如果也从旧 offset 开始,两个人抢同一段消息——重复消费。所以分工:旧消费者啃历史积压,临时消费者消化新进来的消息。

效果立竿见影——临时消费者上线 5 分钟后,Topic 的 produceLag 开始从 800 万往下掉,因为新消息不再堆积到同一个消费者身上了。


三、并行消费——加速消化历史积压

但光让新消费者扛新消息还不够——历史 800 万条旧消息还得消化。

正常消费者是单线程顺序消费(保证顺序),但积压场景下顺序不重要了,速度才重要。

临时改成并行消费:

public class ParallelMessageListener implements MessageListenerConcurrently {
    
    // 临时并行消费线程池
    private final ExecutorService parallelExecutor = new ThreadPoolExecutor(
        20, 50, 60, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(10000),
        new ThreadPoolExecutor.CallerRunsPolicy()
    );
    
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        
        // 临时关闭顺序消费,所有消息并行处理
        List<CompletableFuture<Void>> futures = msgs.stream()
            .map(msg -> CompletableFuture.runAsync(() -> {
                processSingleMessage(msg);
            }, parallelExecutor))
            .collect(Collectors.toList());
        
        CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        ).join();
        
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    
    /**
     * 积压清除后销毁并行线程池
     */
    public void destroy() {
        parallelExecutor.shutdown();
    }
}

从单线程消费 200 QPS 飙到 20 线程并行 3500 QPS。800 万条从 11 小时缩短到 40 分钟。

注意事项:

  • 并行消费的前提是消息之间没有严格的顺序依赖。如果订单 A 必须在订单 B 之前处理,并行会出问题
  • 线程池 LinkedBlockingQueue 设了容量 10000,避免积压问题转移
  • 这只是应急模式——积压清完就切回正常模式

四、Kafka 的同类方案

如果是 Kafka,情况略有不同。Kafka 的分区天然支持多消费者并行——每个分区一个消费者。临时扩容直接加消费者实例即可:

// Kafka 临时消费者:修改 group.id 即新建订阅组
Properties props = new Properties();
props.put("group.id", "emergency-group-" + System.currentTimeMillis());
props.put("enable.auto.commit", "false");
// 从最新开始,不碰旧分区
props.put("auto.offset.reset", "latest");

KafkaConsumer<String, String> emergencyConsumer = 
    new KafkaConsumer<>(props);
emergencyConsumer.subscribe(Arrays.asList("order-topic"));

但 Kafka 的痛点不同——分区数是固定的。如果 8 个分区只能并行 8 条消费,扩容也无法突破分区上限。唯一的办法是增加 max.poll.records 或在应用层做并行。


五、自动化——别等着人来救

手动开临时消费者太慢了。发现积压→看监控→开会讨论→写脚本→执行→确认效果,半小时过去了。应该做成自动触发:

@Component
public class AutoEmergencyScaler {
    
    @Autowired private EmergencyConsumerDeployer deployer;
    
    /**
     * 每 10 秒检查一次积压
     */
    @Scheduled(fixedDelay = 10_000)
    public void checkAndScale() {
        long totalLag = getTotalLag("order-topic");
        
        if (totalLag > 5_000_000 && deployer.getEmergencyCount() == 0) {
            log.warn("检测到严重积压: {} 条,触发紧急扩容", totalLag);
            deployer.deployEmergencyConsumers(10);
            alertService.critical("消息积压紧急扩容", 
                "Topic: order-topic, Lag: " + totalLag + ", 已启动 10 个临时消费者");
        }
        
        if (totalLag < 100_000 && deployer.getEmergencyCount() > 0) {
            log.info("积压已降至安全线: {} 条,回收临时消费者", totalLag);
            deployer.shutdownEmergencyConsumers();
        }
    }
    
    private long getTotalLag(String topic) {
        // 查询 RocketMQ 或 Kafka 的 consumer lag
        return mqAdminClient.queryConsumerLag(topic);
    }
}

告警 → 自动判断 → 自动扩容 → 积压清除 → 自动回收。全程不需要人介入。人能做的——事后复盘:为什么 fullGC 干了 28 分钟。


六、效果对比

指标无预案手动救急自动扩容
积压峰值800 万800 万500 万(触发阈值)
消化完毕耗时11 小时40 分钟25 分钟(触发更早)
业务延迟6 小时以上15 分钟(新消息优先)5 分钟
人工介入必须必须0
临时线程/消费者0手写命令行自动创建+自动回收

七、注意事项

注意一:临时消费者的 groupId 必须唯一。 用了时间戳做后缀确保不会跟已有消费者组冲突。并且下线后要 clean 掉——RocketMQ 的消费者组即使没有活跃消费者也会保留 offset 信息。

注意二:CPU 和内存上限。 临时开 10 个消费者 + 并行 20 线程,CPU 使用率从 30% 涨到 85%。如果机器本身已经接近满载,扩容反而打挂自己。扩容脚本里加一条检查:

double cpuUsage = getSystemCpuUsage();
if (cpuUsage > 0.8) {
    alertService.warn("机器 CPU 过高,拒绝自动扩容: " + cpuUsage);
    return;
}

注意三:不要扩容过度。 新消费者太多反而跟原消费者抢资源、抢数据库连接池。先扩 5 个,观察效果,不够再加。

注意四:消息消费完了不等于问题解决了。 写一条事件记录:谁在什么时间、因为什么原因、采取了什么扩容措施、恢复了没有。下次复盘才能从根源上解决而不是每次靠扩容兜底。


消息积压这件事,第一次碰到都会手忙脚乱。有预案之后就从"紧急事故"变成"自动处理"——不用凌晨爬起来开临时消费者。

你们的 RocketMQ 或 Kafka 有没有处理过紧急积压?临时扩容方案是什么?


标题:消费者挂了半小时,RocketMQ里堆了800万条消息,恢复后消费了6个小时
作者:jiangyi
地址:http://jiangyi.space/articles/2026/06/26/1782027247939.html
公众号:服务端技术精选

服务端开发博客:后端架构、高并发、性能优化与微服务实战教程

取消