RabbitMQ 消费者内存暴涨防护:未 ACK 消息堆积撑爆 JVM?Prefetch 限制 + 自动降级策略!
隔壁组一个 RabbitMQ 消费者服务,每隔几天就 OOM 重启一次。查了 heap dump,发现 Delivery 对象占了 80% 的堆内存——全是未 ACK 的消息。根因是他们用的是默认的自动 ACK,后来改成了手动 ACK 防止消息丢失,但忘了设 prefetch。RabbitMQ 一股脑把所有消息都推给了 Consumer,Consumer 处理不过来,消息在堆内存里越堆越多,直到撑爆。
RabbitMQ 的 Push 模式有个很容易忽略的坑:如果你不设 prefetch,Broker 会用最快的速度把所有消息推给 Consumer。 Consumer 处理慢了,消息就在堆内存里排队等处理。每条消息至少占几百字节,10 万条消息就是几十 MB,轻松打满 JVM。
今天聊聊怎么用 prefetch 限流和自动降级,让 Consumer 不会把自己撑死。
问题出在哪:Push 模式的无限制投递
RabbitMQ 默认的行为是:Consumer 一连接上,Broker 就把当前队列里所有的消息一口气推过去。Consumer 处理不过来没关系——消息已经在你的 JVM 堆里了。
问题在手动 ACK 模式下尤其严重:
Broker 有 10 万条消息排队
Consumer 连接 → Broker 推送 10 万条 → Consumer 堆里积压 10 万条
↓ 每秒只能处理 100 条
↓ 堆内存持续增长
↓ OOM
自动 ACK 反而不会出这个问题——因为消息一推过来就 ACK 了,Broker 认为消息已处理完,Consumer 本地不积压。但自动 ACK 有丢消息的风险:Consumer 崩溃了,已推给它但还没处理的消息全丢。
所以大多数场景用的是手动 ACK。但手动 ACK + 不设 prefetch,等于把 Broker 的积压转移到了 Consumer 的内存里。
Prefetch:给 Consumer 设一个"最多同时处理多少条"的上限
Prefetch 就是告诉 Broker:"别一口气全给我,一次最多给 N 条,等我处理完了再给新的。"
spring:
rabbitmq:
listener:
simple:
prefetch: 50 # 最多同时处理 50 条
acknowledge-mode: manual
设成 50 之后的行为:
Broker 有 10 万条消息排队
Consumer 连接 → Broker 推送 50 条 → Consumer 处理
↓ 处理完 1 条,ACK
↓ Broker 补推 1 条
↓ Consumer 堆里始终只有 ≤ 50 条
↓ 内存稳定
Broker 变成了一个"缓冲区"——消息在 Broker 的磁盘上排队,而不是在 Consumer 的堆里。RabbitMQ 的磁盘存储比你的 JVM 堆大得多,也稳定得多。
Prefetch 设多少合适
设太低——每次只拿 1 条,Consumer 主频大量的时间花在等网络 IO 上,吞吐量上不去。
设太高——跟没设一样,积压照样来。
建议从这几个维度考量:
| 场景 | 建议值 | 理由 |
|---|---|---|
| 轻量处理(<1ms) | 200~500 | 处理快,可以多拿 |
| 中等处理(1~50ms) | 50~200 | 平衡吞吐和内存 |
| 重量处理(>50ms) | 10~50 | 处理慢,少拿防积压 |
| 严格顺序消费 | 1 | 单条保证顺序 |
| 多 Consumer 竞争 | 1~10 | 防止某 Consumer 独吞 |
另外,如果每条消息大小差异很大(有的几百字节,有的几 MB),建议设小一点——一条大消息顶几十条小的,堆内存占用不好估算。
自动降级:当处理速度跟不上时保护自己
Prefetch 解决了"Broker 推太快"的问题,但还有一个场景:Consumer 自己的处理速度变慢了。比如依赖的下游服务响应变慢,处理时间从 10ms 变成 500ms。这时即使 prefetch 只设了 50,堆里也会慢慢积压。
解决思路:动态调整 prefetch,当检测到积压时自动降低。
// 定时检查未 ACK 消息数,超过阈值自动降级
@Scheduled(fixedDelay = 10_000)
public void adaptivePrefetch() {
int unacked = getUnackedMessageCount();
int currentPrefetch = getCurrentPrefetch();
if (unacked > currentPrefetch * 0.8) {
// 积压超过 80% → 降级,减少预取
int newPrefetch = Math.max(1, currentPrefetch / 2);
adjustPrefetch(newPrefetch);
log.warn("{} 未 ACK 积压,prefetch {} → {}",
unacked, currentPrefetch, newPrefetch);
} else if (unacked < currentPrefetch * 0.3) {
// 积压低于 30% → 可以适当提高
int newPrefetch = Math.min(500, currentPrefetch + 10);
adjustPrefetch(newPrefetch);
log.info("{} 处理正常,prefetch {} → {}",
unacked, currentPrefetch, newPrefetch);
}
}
核心逻辑就是:积压了就少拿一点,顺畅了就多拿一点。像一个自动阀门,Consumer 处理慢了自动拧紧,处理快了自动拧松。
如果需要更彻底的降级,可以把消息转发到死信队列或者暂存到 Redis,等 Consumer 恢复了再拉回来处理。
一个容易被忽略的配置:默认 Prefetch 是 unlimited
RabbitMQ 不同版本的默认 prefetch 不一样。SimpleMessageListenerContainer 在 Spring AMQP 2.x 里默认是 250,但在某些场景下如果没显式配置,可能回退到 0(unlimited)。
建议所有环境都显式配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 100
acknowledge-mode: manual
# 一个 Consumer 同时处理的未 ACK 上限
# 生产环境不要依赖默认值
总结
RabbitMQ 消费者 OOM,根因就一条:消息在 Consumer 堆里积压了,而且没有上限。
三条防线:
Prefetch 设上限——Broker 一次最多推 N 条,消息在 Broker 磁盘上排队而不在 Consumer 堆里。按处理耗时设值:慢的设 10~50,快的设 100~300。
手动 ACK 记得及时——处理完一条立刻 ACK 一条,别攒到一批再批量 ACK。每条 ACK 都在告诉 Broker"我处理完了,可以再给我一条"。
动态降级保底——定时检查 unacked 数量,超过阈值自动拧小 prefetch。让 Consumer 像一个有弹性的水管,压力大了自己收窄。
三条防线配齐,RabbitMQ Consumer 的内存曲线就是一条平稳的直线,不会出现"隔几天 OOM 一次"的规律性爆炸。
有用的话转给还在用默认 prefetch 的同事。
标题:RabbitMQ 消费者内存暴涨防护:未 ACK 消息堆积撑爆 JVM?Prefetch 限制 + 自动降级策略!
作者:jiangyi
地址:http://jiangyi.space/articles/2026/06/10/1780757199355.html
公众号:服务端技术精选
评论