RocketMQ 事务消息半消息清理:Half Message 堆积导致 Broker 磁盘告警?自动补偿机制!

做过分布式事务开发的朋友肯定都遇到过这个问题:使用 RocketMQ 的事务消息时,由于网络抖动、服务宕机、消费者超时等原因,部分 Half Message(半消息)无法被正确处理,导致在 Broker 上不断堆积。这不仅占用磁盘空间,严重时还会触发磁盘告警,影响整个消息队列的稳定性。

我之前就遇到过这样一个案例:某天凌晨,监控告警显示 RocketMQ Broker 的磁盘使用率突然飙升至 85%,马上就要触达 90% 的告警阈值。排查后发现,是某个服务的数据库在凌晨进行大批量数据迁移时,部分事务消息的本地事务执行失败,但由于网络重试机制的问题,相关的 Half Message 没有被正确回滚,大批量堆积在了 Broker 上。

今天我们就来聊聊 RocketMQ 事务消息 Half Message 的自动清理方案,让您的系统远离磁盘告警的困扰。

Half Message 的产生与堆积原因

1. 事务消息的执行流程

首先,让我们回顾一下 RocketMQ 事务消息的完整流程:

┌─────────────────────────────────────────────────────────────────────┐
│                     RocketMQ 事务消息完整流程                         │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  1. 生产者发送 Half Message(预消息)                                │
│     └──→ Broker 存储预消息,返回 halfOffset                        │
│                                                                     │
│  2. Broker 执行本地事务                                             │
│     └──→ 生产者根据本地事务结果执行 commit/rollback                │
│                                                                     │
│  3. 事务提交/回滚                                                  │
│     ├── commit:Half Message 转为正式消息,投递给消费者             │
│     └── rollback:Half Message 被标记删除,不投递给消费者            │
│                                                                     │
│  4. 事务状态回查(如果长时间未收到事务结果)                         │
│     └──→ Broker 向生产者查询事务状态                                │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2. Half Message 堆积的根本原因

堆积原因分析:

┌─────────────────────────────────────────────────────────────┐
│ 原因分类                    │ 说明                         │
├─────────────────────────────┼──────────────────────────────┤
│ 本地事务执行失败           │ 业务代码抛出异常未回滚       │
│ 网络通信问题               │ commit/rollback 消息发送失败  │
│ 服务宕机                   │ 事务执行中断,未完成提交     │
│ 消费者超时                 │ 消息处理超时,触发重试       │
│ 事务状态回查失败           │ 回查服务不可用               │
│ 事务超时未回查             │ 超时时间设置不合理           │
└─────────────────────────────┴──────────────────────────────┘

3. Half Message 堆积的后果

后果分析:

场景模拟:每小时产生 10000 条事务消息,其中 1% 堆积

1 天后:10000 × 24 × 1% = 2400 条堆积
1 周后:2400 × 7 = 16800 条堆积
1 月后:16800 × 4 = 67200 条堆积

磁盘占用:
- 每条 Half Message 大小:约 1-2KB
- 1 个月后磁盘占用:约 100-200MB(仅这一个 Topic)

多个 Topic 叠加,磁盘告警只是时间问题

解决方案:自动补偿清理机制

1. 核心设计思想

我们的方案核心是三个关键机制:

  1. 定时扫描:定期扫描堆积的 Half Message
  2. 状态回查:主动查询这些消息的原始事务状态
  3. 自动补偿:根据事务状态执行 commit 或 rollback

架构图如下:

┌─────────────────────────────────────────────────────────────────────┐
│                    RocketMQ Half Message 自动清理系统               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────────────┐ │
│  │  Half Message │───→│  定时扫描器   │───→│  状态回查服务       │ │
│  │  (堆积)       │    │  (定期检测)   │    │  (查询原始事务状态)   │ │
│  └──────────────┘    └──────────────┘    └──────────────────────┘ │
│                                                     │                │
│                                                     ▼                │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────────────┐ │
│  │  补偿执行器   │←───│  决策引擎    │←───│  事务状态结果        │ │
│  │  (执行清理)   │    │  (判断处理)   │    │                      │ │
│  └──────────────┘    └──────────────┘    └──────────────────────┘ │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2. 定时扫描策略

扫描策略设计:

┌─────────────────────────────────────────────────────────────┐
│ 扫描参数配置                                               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  扫描间隔:每 5 分钟执行一次                                │
│  扫描范围:所有事务 Topic 的 Half Message                  │
│  筛选条件:creationTime 早于(当前时间 - 等待时间)         │
│  批量大小:每次最多处理 100 条                             │
│  并发限制:最多 5 个线程并发处理                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

3. 状态回查机制

状态回查流程:

function queryHalfMessageStatus(halfMessage):
    # 1. 解析 Half Message 获取事务 ID
    transactionId = parseTransactionId(halfMessage)

    # 2. 向生产者查询事务状态
    status = transactionListener.getTransactionStatus(transactionId)

    # 3. 根据状态决定处理方式
    if status == COMMITTED:
        return COMMIT
    else if status == ROLLBACK:
        return ROLLBACK
    else:
        return UNKNOWN

4. 自动补偿决策

补偿决策逻辑:

┌─────────────────────────────────────────────────────────────┐
│ 决策状态机                                                 │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌───────────┐                                             │
│  │   START   │                                             │
│  └─────┬─────┘                                             │
│        │                                                   │
│        ▼                                                   │
│  ┌───────────┐    ┌──────────────┐                       │
│  │  QUERYING │───→│   SUCCESS    │                       │
│  └─────┬─────┘    └──────┬───────┘                       │
│        │                  │                                │
│        │            ┌─────┴─────┐                         │
│        │            │            │                         │
│        │            ▼            ▼                         │
│        │      ┌──────────┐ ┌──────────┐                  │
│        │      │ COMMIT   │ │ ROLLBACK │                  │
│        │      └──────────┘ └──────────┘                  │
│        │                                                   │
│        ▼                                                   │
│  ┌───────────┐                                            │
│  │   FAILED  │──→ 重试(最多 3 次)                       │
│  └───────────┘                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

实战方案实现

1. Half Message 扫描器

// Half Message 扫描器
class HalfMessageScanner {
    private ScheduledExecutorService scheduler;
    private RocketMQTemplate rocketMQTemplate;

    function init() {
        scheduler.scheduleAtFixedRate(() -> {
            scanAndProcess();
        }, 0, 5 * 60 * 1000, TimeUnit.MILLISECONDS);
    }

    function scanAndProcess() {
        # 1. 获取所有事务 Topic
        topics = getTransactionTopics();

        # 2. 遍历每个 Topic 扫描 Half Message
        for topic in topics:
            halfMessages = queryHalfMessages(
                topic,
                startTime = now - WAIT_TIME,
                maxCount = 100
            )

            # 3. 并发处理每条消息
            for msg in halfMessages:
                processAsync(msg)
    }
}

2. 事务状态回查服务

// 事务状态回查服务
class TransactionStatusQueryService {

    function queryStatus(halfMessage):
        transactionId = extractTransactionId(halfMessage)

        # 最多重试 3 次
        for attempt in 1..3:
            try:
                status = doQuery(transactionId)
                return status
            catch Exception as e:
                if attempt == 3:
                    log.error("Query failed after 3 attempts: {}", transactionId)
                    return UNKNOWN
                sleep(1000 * attempt)

    private function doQuery(transactionId):
        # 根据事务 ID 查询本地事务状态
        return transactionListener.getTransactionState(transactionId)
}

3. 补偿执行器

// 补偿执行器
class CompensationExecutor {

    function compensate(halfMessage, status):
        switch status:
            case COMMITTED:
                commitMessage(halfMessage)
                break
            case ROLLBACK:
                rollbackMessage(halfMessage)
                break
            case UNKNOWN:
                # 未知状态,放置更长时间后再处理
                scheduleRetry(halfMessage, delay = 1 hour)
                break

    function commitMessage(halfMessage):
        rocketMQTemplate.sendMessage(
            halfMessage.topic,
            MessageBuilder.withPayload(halfMessage.body)
                .setHeader("transactionId", halfMessage.transactionId)
                .build()
        )

    function rollbackMessage(halfMessage):
        rocketMQTemplate.rollback(halfMessage.transactionId)
}

4. 监控告警

监控指标:

┌────────────────────────┬─────────────────────────────────────┐
│ 指标名称               │ 说明                               │
├────────────────────────┼─────────────────────────────────────┤
│ halfMessageCount       │ 当前 Half Message 堆积数量          │
│ scanCount             │ 每次扫描的消息数量                  │
│ commitCount           │ 执行 commit 的数量                  │
│ rollbackCount         │ 执行 rollback 的数量                │
│ queryFailedCount      │ 查询失败的次数                      │
│ diskUsage            │ Broker 磁盘使用率                   │
│ cleanupLatency       │ 清理延迟(从 creation 到清理的时间)  │
└────────────────────────┴─────────────────────────────────────┘

最佳实践与注意事项

1. 合理设置等待时间

等待时间设置建议:

┌─────────────────────────────────────────────────────────────┐
│ 等待时间 = 本地事务超时时间 × 2 + 网络重试时间              │
│                                                             │
│ 例如:                                                      │
│   - 本地事务超时:30 秒                                     │
│   - 网络重试时间:10 秒                                     │
│   - 等待时间 = 30 × 2 + 10 = 70 秒                         │
│                                                             │
│ 建议实际设置:5-10 分钟(留足余量)                         │
└─────────────────────────────────────────────────────────────┘

2. 避免清理正在处理的消息

过滤条件:

# 只清理创建时间超过阈值的消息
creationTime < now - MAX_TRANSACTION_TIME

# 排除正在回查的消息
status != PROCESSING

3. 事务状态存储优化

状态存储建议:

# 使用 Redis 存储事务状态
transaction:status:{transactionId} = COMMITTED | ROLLBACK | PROCESSING

# 设置合理的过期时间
TTL = MAX_TRANSACTION_TIME × 2

4. 优雅关闭

关闭流程:

function shutdown():
    # 1. 停止接收新任务
    scheduler.shutdown()

    # 2. 等待现有任务完成(最多 30 秒)
    scheduler.awaitTermination(30, TimeUnit.SECONDS)

    # 3. 保存当前处理状态
    saveProcessingState()

    # 4. 关闭数据库连接
    closeConnections()

5. 集群部署注意事项

集群部署要点:

┌─────────────────────────────────────────────────────────────┐
│ 1. 只部署一个清理节点(避免重复清理)                        │
│ 2. 使用分布式锁控制清理任务归属                             │
│ 3. 监控告警要联动自动扩容                                   │
│ 4. 定期检查清理任务的健康状态                               │
└─────────────────────────────────────────────────────────────┘

效果对比

方案磁盘占用人工干预可靠性复杂度
人工清理
超时自动删除
状态回查+补偿

总结

RocketMQ 事务消息 Half Message 清理的核心原则:

  1. 定时扫描:定期检测堆积的 Half Message
  2. 状态回查:主动查询原始事务状态,避免误删
  3. 自动补偿:根据状态执行 commit 或 rollback
  4. 监控告警:实时监控堆积量和磁盘使用率
  5. 集群协调:使用分布式锁避免重复清理

记住:Half Message 不可怕,可怕的是不知道如何清理。一个完善的自动清理机制,是 RocketMQ 事务消息的安全保障。


源码获取

文章已同步至小程序博客栏目,需要源码的请关注小程序博客。

公众号:服务端技术精选

小程序码:


标题:RocketMQ 事务消息半消息清理:Half Message 堆积导致 Broker 磁盘告警?自动补偿机制!
作者:jiangyi
地址:http://jiangyi.space/articles/2026/05/24/1779201102484.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消