RocketMQ 实战指南:从入门到原理到生产实战、八股面试

引言:为什么你需要掌握 RocketMQ?

还记得去年双十一,我们公司核心交易系统因为消息队列性能瓶颈导致订单处理延迟,差点酿成重大事故。事后复盘发现,问题的根源在于团队对消息队列的理解停留在"会用"层面,缺乏深入原理和调优经验。

消息队列作为分布式系统的核心组件,承载着异步解耦、流量削峰、数据分发等关键职责。RocketMQ 作为阿里巴巴开源的分布式消息中间件,凭借其高吞吐量、高可用性、丰富的消息特性,已成为国内互联网公司的首选方案。

本文将从入门到原理,从实战到面试,带你全面掌握 RocketMQ。


一、RocketMQ 入门:10分钟快速上手

1.1 什么是 RocketMQ?

RocketMQ 是阿里巴巴于2012年开源的第三代分布式消息中间件,2016年成为 Apache 顶级项目。它借鉴了 Kafka 的高吞吐设计,同时解决了 Kafka 在事务消息、延迟消息、消息轨迹等方面的不足。

核心特点:

  • 高吞吐量:单机写入性能可达10万+ TPS
  • 高可用性:支持多 Master 多 Slave 架构,自动故障切换
  • 丰富的消息类型:普通消息、顺序消息、事务消息、延迟消息
  • 消息轨迹:支持消息全链路追踪
  • 灵活的消息过滤:支持 Tag 过滤和 SQL92 过滤

1.2 核心概念

在深入使用之前,先理解 RocketMQ 的核心概念:

┌─────────────────────────────────────────────────────────────┐
│                      RocketMQ 架构图                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌─────────┐    ┌─────────┐    ┌─────────┐                │
│   │Producer │    │Producer │    │Producer │                │
│   └────┬────┘    └────┬────┘    └────┬────┘                │
│        │              │              │                      │
│        └──────────────┼──────────────┘                      │
│                       │                                     │
│              ┌────────┴────────┐                           │
│              │   NameServer    │  ← 服务注册与发现          │
│              │   (集群部署)     │                           │
│              └────────┬────────┘                           │
│                       │                                     │
│        ┌──────────────┼──────────────┐                     │
│        │              │              │                      │
│   ┌────┴────┐    ┌────┴────┐    ┌────┴────┐               │
│   │ Broker  │    │ Broker  │    │ Broker  │  ← 消息存储    │
│   │ Master  │    │ Master  │    │ Master  │               │
│   │  +Slave │    │  +Slave │    │  +Slave │               │
│   └────┬────┘    └────┬────┘    └────┬────┘               │
│        │              │              │                      │
│        └──────────────┼──────────────┘                      │
│                       │                                     │
│        ┌──────────────┼──────────────┐                     │
│        │              │              │                      │
│   ┌────┴────┐    ┌────┴────┐    ┌────┴────┐               │
│   │Consumer │    │Consumer │    │Consumer │                │
│   └─────────┘    └─────────┘    └─────────┘               │
│                                                             │
└─────────────────────────────────────────────────────────────┘

核心组件:

组件作用类比
NameServer服务注册与发现,管理 Broker 路由信息类似 ZooKeeper,但更轻量
Broker消息存储和转发,核心处理单元类似 Kafka 的 Broker
Producer消息生产者,负责发送消息消息的发送方
Consumer消息消费者,负责消费消息消息的接收方
Topic消息主题,逻辑上的消息分类类似数据库的表
Queue消息队列,物理上的存储单元Topic 的分片

1.3 消息模型

RocketMQ 支持两种消息模型:

集群消费(Clustering)

  • 一条消息只会被消费组内的一个消费者消费
  • 适用于大部分业务场景
  • 消费者组内的消费者共同分担消息消费压力
// 集群消费模式(默认)
consumer.setMessageModel(MessageModel.CLUSTERING);

广播消费(Broadcasting)

  • 一条消息会被消费组内的所有消费者消费
  • 适用于需要所有消费者都收到消息的场景
  • 例如:配置同步、缓存刷新
// 广播消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);

二、RocketMQ 原理深度解析

2.1 存储机制:为什么 RocketMQ 这么快?

RocketMQ 的高性能很大程度上得益于其精巧的存储设计。

2.1.1 存储架构

┌─────────────────────────────────────────────────────────────┐
│                    RocketMQ 存储架构                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                  CommitLog(提交日志)                │   │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐   │   │
│  │  │Message 1│ │Message 2│ │Message 3│ │Message 4│   │   │
│  │  │ 1KB     │ │ 2KB     │ │ 1.5KB   │ │ 3KB     │   │   │
│  │  └─────────┘ └─────────┘ └─────────┘ └─────────┘   │   │
│  │                                                     │   │
│  │  特点:顺序写入、定长存储(默认1GB)、刷盘策略可配置   │   │
│  └─────────────────────────────────────────────────────┘   │
│                            │                                │
│                            ▼                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                  ConsumeQueue(消费队列)             │   │
│  │  ┌─────────────────────────────────────────────┐   │   │
│  │  │ CommitLog Offset | Size | Message Tag Hash  │   │   │
│  │  │      8 bytes     |4 bytes|    8 bytes        │   │   │
│  │  └─────────────────────────────────────────────┘   │   │
│  │                                                     │   │
│  │  特点:定长存储(20字节/条)、快速定位消息           │   │
│  └─────────────────────────────────────────────────────┘   │
│                            │                                │
│                            ▼                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                  IndexFile(索引文件)                │   │
│  │  ┌─────────────────────────────────────────────┐   │   │
│  │  │ Key Hash | CommitLog Offset | Timestamp     │   │   │
│  │  └─────────────────────────────────────────────┘   │   │
│  │                                                     │   │
│  │  特点:支持按消息Key查询、Hash索引结构              │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

核心设计思想:

  1. 顺序写入:CommitLog 采用顺序写入方式,相比随机写入性能提升10倍以上
  2. 内存映射(Mmap):利用操作系统的 PageCache,实现高效的文件读写
  3. 零拷贝:发送消息时直接从 PageCache 发送到网卡,减少数据拷贝
  4. 读写分离:ConsumeQueue 存储消息索引,CommitLog 存储消息内容

2.1.2 刷盘机制

RocketMQ 支持两种刷盘模式:

同步刷盘(SYNC_FLUSH)

  • 消息写入 PageCache 后立即刷盘
  • 数据可靠性高,但性能相对较低
  • 适用于金融等对数据可靠性要求极高的场景

异步刷盘(ASYNC_FLUSH)

  • 消息先写入 PageCache,由后台线程异步刷盘
  • 性能高,但存在少量数据丢失风险
  • 适用于大部分互联网业务场景
// Broker 配置
flushDiskType = SYNC_FLUSH  // 同步刷盘
flushDiskType = ASYNC_FLUSH // 异步刷盘(默认)

2.1.3 主从复制

RocketMQ 支持两种主从复制模式:

同步复制(SYNC_MASTER)

  • Master 和 Slave 都写入成功后才返回成功
  • 数据可靠性高,但写入延迟增加

异步复制(ASYNC_MASTER)

  • Master 写入成功后立即返回,异步同步到 Slave
  • 写入性能好,但存在主从切换时的数据丢失风险
// Broker 配置
brokerRole = SYNC_MASTER  // 同步复制
brokerRole = ASYNC_MASTER // 异步复制(默认)

生产环境推荐配置:

场景刷盘方式复制方式说明
金融支付SYNC_FLUSHSYNC_MASTER最高可靠性,性能最低
电商订单ASYNC_FLUSHSYNC_MASTER平衡可靠性和性能
日志收集ASYNC_FLUSHASYNC_MASTER最高性能,可接受少量丢失

2.2 消息发送流程

┌─────────────────────────────────────────────────────────────┐
│                    消息发送流程                               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. Producer 从 NameServer 获取 Topic 路由信息               │
│     └─> 缓存路由信息,定期更新                               │
│                                                             │
│  2. Producer 根据负载均衡策略选择 Queue                       │
│     └─> 轮询、随机、按消息 Key 哈希等策略                    │
│                                                             │
│  3. Producer 发送消息到 Broker                               │
│     └─> 同步发送 / 异步发送 / 单向发送                       │
│                                                             │
│  4. Broker 接收消息并存储                                    │
│     └─> 写入 CommitLog                                     │
│     └─> 构建 ConsumeQueue 索引                               │
│     └─> 根据配置刷盘和复制                                   │
│                                                             │
│  5. Broker 返回发送结果                                     │
│     └─> SEND_OK / FLUSH_DISK_TIMEOUT / FLUSH_SLAVE_TIMEOUT   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

三种发送方式:

// 1. 同步发送(最常用)
SendResult sendResult = producer.send(msg);

// 2. 异步发送(高吞吐场景)
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        // 发送成功处理
    }
    @Override
    public void onException(Throwable e) {
        // 发送失败处理
    }
});

// 3. 单向发送(日志收集等场景)
producer.sendOneway(msg);

2.3 消息消费流程

┌─────────────────────────────────────────────────────────────┐
│                    消息消费流程                               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. Consumer 从 NameServer 获取 Topic 路由信息               │
│                                                             │
│  2. Consumer 向 Broker 发送拉取消息请求(Pull 模式)          │
│     或 Broker 主动推送消息(Push 模式,实际是长轮询)          │
│                                                             │
│  3. Broker 根据 ConsumeQueue 定位消息                        │
│     └─> 根据 offset 查找 ConsumeQueue                       │
│     └─> 获取 CommitLog Offset 和 Size                       │
│     └─> 从 CommitLog 读取消息内容                            │
│                                                             │
│  4. Consumer 处理消息                                        │
│     └─> 业务逻辑处理                                        │
│     └─> 消费成功返回 CONSUME_SUCCESS                         │
│     └─> 消费失败返回 RECONSUME_LATER(进入重试队列)          │
│                                                             │
│  5. Consumer 更新消费进度(Offset)                           │
│     └─> 本地存储(广播模式)                                 │
│     └─> Broker 存储(集群模式)                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

消费模式对比:

特性Push 模式Pull 模式
实现方式长轮询(本质是 Pull)主动拉取
实时性取决于拉取频率
吞吐量取决于拉取策略
灵活性低(封装程度高)高(可自定义策略)
适用场景大部分业务场景特殊业务逻辑

2.4 消息可靠性保障

RocketMQ 通过多重机制保障消息可靠性:

2.4.1 生产端可靠性

发送重试机制:

// 设置发送失败时的重试次数(默认2次)
producer.setRetryTimesWhenSendFailed(3);

// 设置异步发送失败时的重试次数
producer.setRetryTimesWhenSendAsyncFailed(3);

失败策略:

  • 失败时自动选择其他 Broker 重试
  • 可通过 setRetryAnotherBrokerWhenNotStoreOK 控制是否切换到其他 Broker

2.4.2 Broker 端可靠性

  • 刷盘机制:同步刷盘保证数据落盘
  • 主从复制:同步复制保证多副本
  • 文件恢复:启动时自动恢复未刷盘的数据

2.4.3 消费端可靠性

消费重试机制:

消费失败 ──> 返回 RECONSUME_LATER ──> 进入 %RETRY% + ConsumerGroup 队列
                                              │
                                              ▼
                                    延迟一段时间后重新消费
                                              │
                                              ▼
                                    超过最大重试次数(默认16次)
                                              │
                                              ▼
                                    进入死信队列 %DLQ% + ConsumerGroup

重试延迟级别:

重试次数延迟时间
110秒
230秒
31分钟
42分钟
53分钟
......
162小时

三、RocketMQ 高级特性

3.1 顺序消息

全局顺序 vs 分区顺序:

全局顺序(性能低)              分区顺序(性能高)
┌─────────┐                   ┌─────────┬─────────┬─────────┐
│ Order 1 │                   │Queue 0  │Queue 1  │Queue 2  │
│ Order 2 │                   ├─────────┼─────────┼─────────┤
│ Order 3 │                   │Order 1  │Order 2  │Order 3  │
│ Order 4 │                   │Order 4  │Order 5  │Order 6  │
│ Order 5 │                   │Order 7  │Order 8  │Order 9  │
└─────────┘                   └─────────┴─────────┴─────────┘
      │                              │
      ▼                              ▼
  单队列,单消费者              多队列,按Key哈希
  吞吐量受限                  相同Key的消息有序

分区顺序消息实现:

// 生产者:使用 MessageQueueSelector 选择队列
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Long orderId = (Long) arg;
        // 根据 orderId 选择队列,保证相同 orderId 的消息进入同一队列
        long index = orderId % mqs.size();
        return mqs.get((int) index);
    }
}, orderId);

// 消费者:使用 MessageListenerOrderly
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        // 同一队列的消息串行消费,保证顺序
        for (MessageExt msg : msgs) {
            // 处理消息
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

3.2 事务消息

RocketMQ 的事务消息实现了最终一致性,解决了分布式事务问题。

┌─────────────────────────────────────────────────────────────┐
│                    事务消息流程                               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Producer              Broker              Consumer          │
│     │                    │                    │             │
│     │ 1.发送半消息        │                    │             │
│     ├───────────────────>│                    │             │
│     │                    │                    │             │
│     │ 2.执行本地事务      │                    │             │
│     │ [业务逻辑+本地事务]  │                    │             │
│     │                    │                    │             │
│     │ 3.发送确认/回滚     │                    │             │
│     ├───────────────────>│                    │             │
│     │                    │                    │             │
│     │                    │ 4.投递消息          │             │
│     │                    ├───────────────────>│             │
│     │                    │                    │             │
│     │                    │ 5.消费消息          │             │
│     │                    │<───────────────────┤             │
│     │                    │                    │             │
│     │ 6.回查(如果需要)  │                    │             │
│     │<───────────────────┤                    │             │
│     │                    │                    │             │
└─────────────────────────────────────────────────────────────┘

事务消息实现:

// 生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地事务
            boolean success = executeBusinessTransaction(msg, arg);
            if (success) {
                return LocalTransactionState.COMMIT_MESSAGE; // 提交消息
            } else {
                return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息
            }
        } catch (Exception e) {
            return LocalTransactionState.UNKNOW; // 未知状态,等待回查
        }
    }
    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 事务回查
        boolean isCommitted = checkTransactionStatus(msg);
        if (isCommitted) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
});

3.3 延迟消息

RocketMQ 支持18个级别的延迟消息:

// 设置延迟级别(1-18)
msg.setDelayTimeLevel(3); // 延迟1分钟

// 延迟级别对应表
// 1: 1s, 2: 5s, 3: 10s, 4: 30s, 5: 1m, 6: 2m, 7: 3m, 8: 4m, 9: 5m
// 10: 6m, 11: 7m, 12: 8m, 13: 9m, 14: 10m, 15: 20m, 16: 30m, 17: 1h, 18: 2h

自定义延迟时间(RocketMQ 5.0+):

// RocketMQ 5.0 支持任意延迟时间
msg.setDelayTimeMs(5000); // 延迟5秒

3.4 消息过滤

Tag 过滤(服务端过滤):

// 生产者设置 Tag
Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello".getBytes());

// 消费者订阅指定 Tag
consumer.subscribe("TopicTest", "TagA || TagB"); // 订阅 TagA 或 TagB
consumer.subscribe("TopicTest", "*"); // 订阅所有 Tag

SQL92 过滤(服务端过滤):

// 生产者设置属性
msg.putUserProperty("a", "1");
msg.putUserProperty("b", "2");

// 消费者使用 SQL92 过滤
consumer.subscribe("TopicTest", MessageSelector.bySql("a > 5 AND b = '2'"));

四、生产实战:从0到1搭建高可用集群

4.1 集群架构设计

┌─────────────────────────────────────────────────────────────┐
│                  生产环境推荐架构                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                 NameServer 集群                      │   │
│  │           ┌─────────┐ ┌─────────┐ ┌─────────┐      │   │
│  │           │  ns-1   │ │  ns-2   │ │  ns-3   │      │   │
│  │           │:9876    │ │:9876    │ │:9876    │      │   │
│  │           └─────────┘ └─────────┘ └─────────┘      │   │
│  │                                                     │   │
│  │  部署建议:2个及以上节点,无状态,可水平扩展          │   │
│  └─────────────────────────────────────────────────────┘   │
│                            │                                │
│                            ▼                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                  Broker 集群                         │   │
│  │                                                     │   │
│  │  ┌─────────────────┐  ┌─────────────────┐          │   │
│  │  │   Broker-a      │  │   Broker-b      │          │   │
│  │  │ ┌─────┐┌─────┐ │  │ ┌─────┐┌─────┐ │          │   │
│  │  │ │Master││Slave│ │  │ │Master││Slave│ │          │   │
│  │  │ │:10911││:10912│ │  │ │:10911││:10912│ │          │   │
│  │  │ └─────┘└─────┘ │  │ └─────┘└─────┘ │          │   │
│  │  └─────────────────┘  └─────────────────┘          │   │
│  │                                                     │   │
│  │  部署建议:2主2从,主从交叉部署在不同机器            │   │
│  │  Broker-a-Master 和 Broker-b-Slave 在同一台机器    │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

4.2 配置文件详解

NameServer 配置:

# 启动 NameServer
nohup sh bin/mqnamesrv &

# 查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log

Broker 配置(broker-a.properties):

# Broker 标识
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0  # 0=Master, >0=Slave

# NameServer 地址
namesrvAddr=192.168.1.101:9876;192.168.1.102:9876

# 监听端口
listenPort=10911

# 存储路径
storePathRootDir=/data/rocketmq/store-a
storePathCommitLog=/data/rocketmq/store-a/commitlog

# 刷盘和复制配置
flushDiskType=ASYNC_FLUSH
brokerRole=SYNC_MASTER

# 自动创建 Topic(生产环境建议关闭)
autoCreateTopicEnable=false

# 消息大小限制
defaultTopicQueueNums=4
maxMessageSize=4194304  # 4MB

# 发送线程池配置
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=16

Broker Slave 配置(broker-a-s.properties):

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1  # Slave

namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
listenPort=10912

storePathRootDir=/data/rocketmq/store-a-s
storePathCommitLog=/data/rocketmq/store-a-s/commitlog

flushDiskType=ASYNC_FLUSH
brokerRole=SLAVE

4.3 监控与运维

关键监控指标:

指标说明告警阈值
tps每秒处理消息数低于预期的50%
rt响应时间P99 > 100ms
store存储使用率> 80%
memory内存使用率> 80%
cpuCPU使用率> 80%

常用运维命令:

# 查看集群状态
sh bin/mqadmin clusterList -n localhost:9876

# 查看 Topic 列表
sh bin/mqadmin topicList -n localhost:9876

# 查看 Topic 路由信息
sh bin/mqadmin topicRoute -n localhost:9876 -t TopicTest

# 查看消费进度
sh bin/mqadmin consumerProgress -n localhost:9876 -g consumer_group

# 重置消费位点
sh bin/mqadmin resetOffsetByTime -n localhost:9876 -g consumer_group -t TopicTest -s "2024-01-15#10:00:00"

# 查看消息
sh bin/mqadmin queryMsgById -n localhost:9876 -i <msgId>

4.4 性能调优

JVM 参数优化:

# Broker JVM 参数
JAVA_OPT="-server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m"
JAVA_OPT="${JAVA_OPT} -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30"

操作系统优化:

# 1. 文件描述符限制
vi /etc/security/limits.conf
* soft nofile 655350
* hard nofile 655350

# 2. 内核参数优化
vi /etc/sysctl.conf
vm.overcommit_memory=1
vm.swappiness=10
net.core.rmem_max=134217728
net.core.wmem_max=134217728

Broker 配置优化:

# 增加发送线程池
sendMessageThreadPoolNums=32

# 启用 transientStorePool(TransientStorePoolEnable=true)
transientStorePoolEnable=true

# 增加 CommitLog 文件大小(默认1GB,可增加到2GB)
mappedFileSizeCommitLog=2147483648

# 启用文件预热
warmMapedFileEnable=true

五、八股面试:高频考点与深度解析

5.1 基础篇

Q1:RocketMQ 和 Kafka 有什么区别?

特性RocketMQKafka
开发语言JavaScala/Java
消息延迟支持18个级别不支持原生延迟消息
事务消息支持支持(但实现复杂)
消息轨迹原生支持需自行实现
消息过滤Tag + SQL92仅支持 Topic
单机吞吐10万+ TPS百万级 TPS
社区生态国内活跃国际活跃
适用场景电商、金融日志、大数据

Q2:RocketMQ 为什么这么快?

  1. 顺序写入:CommitLog 顺序写入,避免磁盘随机寻址
  2. 内存映射:使用 Mmap 技术,读写操作直接操作内存
  3. 零拷贝:sendfile 系统调用,数据直接从 PageCache 发送到网卡
  4. 读写分离:ConsumeQueue 存储索引,CommitLog 存储内容
  5. 批量处理:支持消息批量发送和消费

Q3:RocketMQ 如何保证消息不丢失?

生产端:

  • 同步发送 + 失败重试
  • 失败时记录日志,人工补偿

Broker 端:

  • 同步刷盘(SYNC_FLUSH)
  • 同步复制(SYNC_MASTER)
  • 主从自动切换

消费端:

  • 先处理业务逻辑,再返回消费成功
  • 消费失败进入重试队列
  • 超过重试次数进入死信队列

5.2 进阶篇

Q4:RocketMQ 的事务消息是如何实现的?

事务消息采用两阶段提交思想:

  1. 第一阶段:Producer 发送半消息(Half Message)到 Broker
  2. 第二阶段:Producer 执行本地事务,然后发送 Commit 或 Rollback
  3. 回查机制:Broker 定期回查本地事务状态,根据结果决定提交或回滚

关键点:

  • 半消息对消费者不可见
  • 回查机制保证最终一致性
  • 本地事务执行成功但确认消息丢失时,通过回查保证一致性

Q5:RocketMQ 的 Rebalance 机制是什么?

Rebalance(再平衡)是指当消费者组内的消费者数量变化时,重新分配 Queue 的过程。

触发时机:

  • 消费者启动或停止
  • Topic 的 Queue 数量变化
  • 消费者订阅关系变化

分配策略:

  • 平均分配(默认):AllocateMessageQueueAveragely
  • 环形分配:AllocateMessageQueueAveragelyByCircle
  • 一致性哈希:AllocateMessageQueueConsistentHash

Rebalance 问题:

  • 消费暂停:Rebalance 期间暂停消费
  • 重复消费:Rebalance 过程中可能重复消费
  • 解决方案:减少 Rebalance 频率,使用一致性哈希策略

Q6:RocketMQ 如何处理消息积压?

原因分析:

  1. 消费者处理能力不足
  2. 消费者异常导致消费停滞
  3. 网络问题导致消费缓慢

解决方案:

// 1. 增加消费者实例
// 水平扩展消费者,注意消费者数量不要超过 Queue 数量

// 2. 优化消费逻辑
// - 批量消费
consumer.setConsumeMessageBatchMaxSize(50);
// - 多线程消费
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(50);

// 3. 跳过非重要消息
consumer.setMaxReconsumeTimes(3);  // 减少重试次数

// 4. 临时扩容
// 启动临时消费者组,快速消费积压消息

紧急处理方案:

# 查看消费积压情况
sh bin/mqadmin consumerProgress -n localhost:9876 -g consumer_group

# 重置消费位点(跳过积压消息)
sh bin/mqadmin resetOffsetByTime -n localhost:9876 -g consumer_group -t TopicTest -s "now"

5.3 高级篇

Q7:RocketMQ 的存储文件是如何清理的?

RocketMQ 使用文件过期删除机制

CommitLog 文件清理流程:
1. 每天凌晨4点触发清理任务
2. 检查文件最后修改时间
3. 如果文件超过72小时(默认)且所有消息都已消费,则删除
4. 如果磁盘使用率超过75%,则立即触发清理
5. 如果磁盘使用率超过85%,则强制清理(不管是否已消费)
6. 如果磁盘使用率超过90%,则拒绝写入

配置参数:

# 文件保留时间(小时)
fileReservedTime=72

# 磁盘使用率阈值
diskMaxUsedSpaceRatio=75

Q8:RocketMQ 如何实现高可用?

NameServer 高可用:

  • 多节点部署,无状态设计
  • 客户端缓存路由信息,NameServer 宕机不影响已有连接

Broker 高可用:

  • Master-Slave 架构
  • 同步复制保证数据一致性
  • 主从自动切换(配合 NameServer 的路由更新)

Producer 高可用:

  • 失败重试机制
  • 自动选择其他 Broker 发送

Consumer 高可用:

  • 消费者组内多实例
  • 自动 Rebalance
  • 消费失败重试

Q9:RocketMQ 的消息幂等性如何保证?

RocketMQ 不保证消息不重复,幂等性需要业务方保证:

// 方案1:数据库唯一索引
// 消息中包含业务唯一键,插入数据库时利用唯一索引去重

// 方案2:Redis Setnx
String key = "msg:" + msg.getKeys();
Boolean success = redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS);
if (!success) {
    // 消息已处理,直接返回
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

// 方案3:数据库状态机
// 根据业务状态判断是否已经处理
Order order = orderMapper.selectById(orderId);
if (order.getStatus() != OrderStatus.PENDING) {
    // 订单已处理,直接返回
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

5.4 场景设计题

Q10:设计一个秒杀系统的消息队列方案

秒杀系统消息队列设计:

┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐
│  用户请求 │───>│  Nginx  │───>│  Gateway │───>│  秒杀服务 │
└─────────┘    └─────────┘    └─────────┘    └────┬────┘
                                                   │
                                                   │ 1. 校验(库存、用户)
                                                   │ 2. 发送 MQ
                                                   ▼
                                            ┌─────────┐
                                            │ RocketMQ│
                                            │ (削峰)  │
                                            └────┬────┘
                                                 │
                                                 ▼
                                          ┌─────────────┐
                                          │  订单消费者  │
                                          │  (异步处理)  │
                                          └──────┬──────┘
                                                 │
                                                 ▼
                                          ┌─────────────┐
                                          │  数据库      │
                                          │ (扣减库存)   │
                                          └─────────────┘

关键点:
1. 秒杀服务只做校验和发消息,快速返回
2. RocketMQ 削峰填谷,保护数据库
3. 消费者异步处理订单,保证最终一致性
4. 设置合理的消费线程数,避免数据库压力过大

Q11:如何设计一个可靠的消息通知系统?

消息通知系统设计:

┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐
│ 业务系统 │───>│ 消息中心 │───>│ RocketMQ │───>│ 通知服务 │
└─────────┘    └─────────┘    └─────────┘    └────┬────┘
                                                   │
                      ┌────────────────────────────┼────────────────────────────┐
                      │                            │                            │
                      ▼                            ▼                            ▼
                ┌─────────┐                 ┌─────────┐                  ┌─────────┐
                │ 短信通知 │                 │ 邮件通知 │                  │ Push通知 │
                └─────────┘                 └─────────┘                  └─────────┘
                      │                            │                            │
                      ▼                            ▼                            ▼
                ┌─────────┐                 ┌─────────┐                  ┌─────────┐
                │ 短信网关 │                 │ 邮件网关 │                  │ Push网关 │
                └─────────┘                 └─────────┘                  └─────────┘

可靠性保障:
1. 消息持久化到数据库(发送中状态)
2. 发送成功后更新状态(已发送)
3. 定时任务扫描发送中的消息,超过5分钟重试
4. 超过3次发送失败,标记为失败,人工介入
5. 消费端幂等性保证

六、总结与展望

6.1 学习路线

入门阶段:

  1. 理解消息队列的基本概念和作用
  2. 掌握 RocketMQ 的核心组件和架构
  3. 学会基本的生产者和消费者开发

进阶阶段:

  1. 深入理解存储机制和消息发送/消费流程
  2. 掌握顺序消息、事务消息、延迟消息等高级特性
  3. 学会集群搭建和性能调优

高级阶段:

  1. 阅读源码,理解底层实现原理
  2. 掌握生产环境的问题排查和性能优化
  3. 能够设计高可用的消息队列架构

6.2 版本演进

RocketMQ 4.x:

  • 稳定版本,功能完善
  • 广泛应用于生产环境

RocketMQ 5.x:

  • 全新架构,支持 Proxy 模式
  • 支持任意延迟时间
  • 更好的云原生支持

6.3 生态工具

  • RocketMQ Console:可视化管理控制台
  • RocketMQ Exporter:Prometheus 监控导出器
  • RocketMQ Flink Connector:与 Flink 集成
  • RocketMQ Spring Boot Starter:Spring Boot 集成

参考资料

  1. RocketMQ 官方文档
  2. RocketMQ 源码
  3. RocketMQ 最佳实践

如果本文对你有帮助,欢迎关注「服务端技术精选」公众号,获取更多后端技术干货。


思考题:

  1. 在你的项目中,RocketMQ 主要解决了什么问题?遇到了哪些坑?
  2. 如果让你设计一个支持百万 TPS 的消息系统,你会如何设计?
  3. RocketMQ 5.0 的 Proxy 模式解决了什么问题?

欢迎在评论区分享你的想法和经验,我们一起交流学习!


标题:RocketMQ 实战指南:从入门到原理到生产实战、八股面试
作者:jiangyi
地址:http://jiangyi.space/articles/2026/03/06/1772639336911.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消