那天下午,库存服务一次 fullGC 把消费者线程全停了。GC 持续了 28 分钟,消费者一直没心跳,RocketMQ 以为它还活着就没触发重平衡。等 GC 结束消费者恢复,队列里已经堆了 800 万条订单消息。
按正常消费速度,一个消费者每秒 200 条,800 万条要 11 个小时才能消化完。下游的物流、短信、积分服务全在等这条消费者的结果——11 个小时的延迟等于业务停摆。
这不是"能不能消费完"的问题。这是"等你消费完,用户早就不关心了"的问题。
我们当时做了一个临时救急:在不停原有消费者的前提下,多开了 10 个临时订阅组来瓜分积压。
一、为什么不能直接加消费者实例
第一反应是给原消费者组多加几个 Pod。但因为原有消费者的队列分区是固定的——假设 8 个 MessageQueue,原消费者组 8 个 Pod,每个 Pod 独占 1 个 Queue,已经分配好了。
直接加 Pod 到同一个组里没用——RocketMQ 是 Queue 粒度分配,8 个 Queue 最多 8 个消费者实例。加第 9 个 Pod 它会被晾在那,一个 Queue 也分不到。
也不能直接增加 Queue 数量——RocketMQ 的 Queue 创建时定下来,运行时不能改。
更不能 kill 原消费者重建——已经消费了一半,重建后从哪开始都是问题。
二、紧急预案:临时多订阅组
思路不复杂:原消费者组继续消费自己的 Queue,临建一组新的消费者,用不同的 groupId 订阅同一个 Topic,从最新 offset 开始拉消息。 新旧两组同时消费,把积压均摊。
@Component
public class EmergencyConsumerDeployer {
@Autowired private DefaultMQPushConsumer originalConsumer;
// 临时消费者——独立订阅组,独立消费进度
private final List<DefaultMQPushConsumer> emergencyConsumers = new ArrayList<>();
/**
* 紧急扩容:
* @param count 临时消费者数量
*/
public void deployEmergencyConsumers(int count) {
for (int i = 0; i < count; i++) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"emergency-group-" + System.currentTimeMillis() + "-" + i
);
consumer.setNamesrvAddr(originalConsumer.getNamesrvAddr());
consumer.subscribe("order-topic", "*");
// 关键:从最新 offset 开始,不碰旧消息
consumer.setConsumeFromWhere(
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
);
consumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
processOrder(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
);
consumer.start();
emergencyConsumers.add(consumer);
log.info("临时消费者启动: group={}", consumer.getConsumerGroup());
}
}
/**
* 积压消费完毕后,优雅下线临时消费者
*/
public void shutdownEmergencyConsumers() {
for (DefaultMQPushConsumer consumer : emergencyConsumers) {
consumer.shutdown();
}
emergencyConsumers.clear();
log.info("临时消费者已全部下线");
}
}
为什么从最新 offset 开始?因为原消费者正在从旧 offset 慢慢往前追。临时消费者如果也从旧 offset 开始,两个人抢同一段消息——重复消费。所以分工:旧消费者啃历史积压,临时消费者消化新进来的消息。
效果立竿见影——临时消费者上线 5 分钟后,Topic 的 produceLag 开始从 800 万往下掉,因为新消息不再堆积到同一个消费者身上了。
三、并行消费——加速消化历史积压
但光让新消费者扛新消息还不够——历史 800 万条旧消息还得消化。
正常消费者是单线程顺序消费(保证顺序),但积压场景下顺序不重要了,速度才重要。
临时改成并行消费:
public class ParallelMessageListener implements MessageListenerConcurrently {
// 临时并行消费线程池
private final ExecutorService parallelExecutor = new ThreadPoolExecutor(
20, 50, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 临时关闭顺序消费,所有消息并行处理
List<CompletableFuture<Void>> futures = msgs.stream()
.map(msg -> CompletableFuture.runAsync(() -> {
processSingleMessage(msg);
}, parallelExecutor))
.collect(Collectors.toList());
CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
).join();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
/**
* 积压清除后销毁并行线程池
*/
public void destroy() {
parallelExecutor.shutdown();
}
}
从单线程消费 200 QPS 飙到 20 线程并行 3500 QPS。800 万条从 11 小时缩短到 40 分钟。
注意事项:
- 并行消费的前提是消息之间没有严格的顺序依赖。如果订单 A 必须在订单 B 之前处理,并行会出问题
- 线程池
LinkedBlockingQueue设了容量 10000,避免积压问题转移 - 这只是应急模式——积压清完就切回正常模式
四、Kafka 的同类方案
如果是 Kafka,情况略有不同。Kafka 的分区天然支持多消费者并行——每个分区一个消费者。临时扩容直接加消费者实例即可:
// Kafka 临时消费者:修改 group.id 即新建订阅组
Properties props = new Properties();
props.put("group.id", "emergency-group-" + System.currentTimeMillis());
props.put("enable.auto.commit", "false");
// 从最新开始,不碰旧分区
props.put("auto.offset.reset", "latest");
KafkaConsumer<String, String> emergencyConsumer =
new KafkaConsumer<>(props);
emergencyConsumer.subscribe(Arrays.asList("order-topic"));
但 Kafka 的痛点不同——分区数是固定的。如果 8 个分区只能并行 8 条消费,扩容也无法突破分区上限。唯一的办法是增加 max.poll.records 或在应用层做并行。
五、自动化——别等着人来救
手动开临时消费者太慢了。发现积压→看监控→开会讨论→写脚本→执行→确认效果,半小时过去了。应该做成自动触发:
@Component
public class AutoEmergencyScaler {
@Autowired private EmergencyConsumerDeployer deployer;
/**
* 每 10 秒检查一次积压
*/
@Scheduled(fixedDelay = 10_000)
public void checkAndScale() {
long totalLag = getTotalLag("order-topic");
if (totalLag > 5_000_000 && deployer.getEmergencyCount() == 0) {
log.warn("检测到严重积压: {} 条,触发紧急扩容", totalLag);
deployer.deployEmergencyConsumers(10);
alertService.critical("消息积压紧急扩容",
"Topic: order-topic, Lag: " + totalLag + ", 已启动 10 个临时消费者");
}
if (totalLag < 100_000 && deployer.getEmergencyCount() > 0) {
log.info("积压已降至安全线: {} 条,回收临时消费者", totalLag);
deployer.shutdownEmergencyConsumers();
}
}
private long getTotalLag(String topic) {
// 查询 RocketMQ 或 Kafka 的 consumer lag
return mqAdminClient.queryConsumerLag(topic);
}
}
告警 → 自动判断 → 自动扩容 → 积压清除 → 自动回收。全程不需要人介入。人能做的——事后复盘:为什么 fullGC 干了 28 分钟。
六、效果对比
| 指标 | 无预案 | 手动救急 | 自动扩容 |
|---|---|---|---|
| 积压峰值 | 800 万 | 800 万 | 500 万(触发阈值) |
| 消化完毕耗时 | 11 小时 | 40 分钟 | 25 分钟(触发更早) |
| 业务延迟 | 6 小时以上 | 15 分钟(新消息优先) | 5 分钟 |
| 人工介入 | 必须 | 必须 | 0 |
| 临时线程/消费者 | 0 | 手写命令行 | 自动创建+自动回收 |
七、注意事项
注意一:临时消费者的 groupId 必须唯一。 用了时间戳做后缀确保不会跟已有消费者组冲突。并且下线后要 clean 掉——RocketMQ 的消费者组即使没有活跃消费者也会保留 offset 信息。
注意二:CPU 和内存上限。 临时开 10 个消费者 + 并行 20 线程,CPU 使用率从 30% 涨到 85%。如果机器本身已经接近满载,扩容反而打挂自己。扩容脚本里加一条检查:
double cpuUsage = getSystemCpuUsage();
if (cpuUsage > 0.8) {
alertService.warn("机器 CPU 过高,拒绝自动扩容: " + cpuUsage);
return;
}
注意三:不要扩容过度。 新消费者太多反而跟原消费者抢资源、抢数据库连接池。先扩 5 个,观察效果,不够再加。
注意四:消息消费完了不等于问题解决了。 写一条事件记录:谁在什么时间、因为什么原因、采取了什么扩容措施、恢复了没有。下次复盘才能从根源上解决而不是每次靠扩容兜底。
消息积压这件事,第一次碰到都会手忙脚乱。有预案之后就从"紧急事故"变成"自动处理"——不用凌晨爬起来开临时消费者。
你们的 RocketMQ 或 Kafka 有没有处理过紧急积压?临时扩容方案是什么?
