RocketMQ Tag/SQL92 过滤实战:客户端拉取无效消息浪费带宽?Broker 端精准过滤,网络开销降 70%!

公司做物流系统,订单状态变更通过 RocketMQ 广播。刚开始只有一个消费者,所有消息照单全收。后来业务拆分,多了十几个微服务各自订阅感兴趣的消息。问题来了——每个服务都从 Broker 拉全量消息,然后自己过滤。一天几千万条消息,每个服务实际需要的不到 10%,90% 拉过来就扔了。内网带宽跑满,消息堆积,消费延迟越来越大。

这个问题特别容易被忽视。因为消息队列用起来太简单了,Producer 发、Consumer 收,中间不用管。但当你有了十几个 Consumer 各自只需要不同子集的消息时,全量拉取就是在用带宽换便利。

今天聊聊 RocketMQ 的 Tag 和 SQL92 两种 Broker 端过滤方式,把过滤逻辑从消费端前移到 Broker,让不需要的消息根本不出 Broker 的门。


客户端过滤的问题在哪

默认的 Push Consumer 模式,客户端从 Broker 拉消息的流程是这样的:

Broker ──拉取──→ Consumer(全量消息)
                     │
                     ├─ 需要的消息(10%)→ 处理
                     └─ 不需要的消息(90%)→ 丢弃

看起来没什么问题?你丢你的,又不占硬盘。但问题出在网络传输——那 90% 的消息已经从 Broker 经过内网传到了 Consumer。内网带宽不要钱,但不是无限的。

一个极端案例:日志消息每秒 10 万条,平均 2KB 一条。三个不同的微服务各自订阅不同的日志级别。全量拉取的话,每个服务每秒拉 200MB,三个就是 600MB/s。千兆网卡直接打满。

如果把过滤推到 Broker 端:

Broker ──拉取──→ Consumer(只返回匹配的消息)
     │
     └─ 过滤掉 90% 的消息(不传输)

还是每秒 10 万条消息,但每个 Consumer 只拉到 1 万条,带宽从 200MB/s 降到 20MB/s。


Tag 过滤:最简单的 Broker 端过滤

RocketMQ 每条消息可以打一个 Tag。Tag 是字符串,Producer 发消息时指定,Consumer 订阅时指定。

// Producer:发消息时打 Tag
Message msg = new Message("order-topic", "PAID", body);
producer.send(msg);

// Consumer:订阅时指定 Tag
consumer.subscribe("order-topic", "PAID || SHIPPED");

Tag 过滤的逻辑发生在 Broker 端。Broker 在返回消息给 Consumer 之前,先按 Tag 筛一遍,不符合的直接跳过。

Tag 的限制:一条消息只能有一个 Tag。如果你的过滤条件复杂——比如"订单金额大于 1000 且状态为 PAID",Tag 就搞不定了。


SQL92 过滤:表达式级别的过滤

SQL92 过滤是 Tag 的超集。它允许在消息属性上执行 SQL 风格的表达式过滤。

// Producer:发消息时设置属性
Message msg = new Message("order-topic", "ORDER", body);
msg.putUserProperty("amount", "1500");
msg.putUserProperty("status", "PAID");
msg.putUserProperty("city", "北京");
producer.send(msg);

// Consumer:SQL92 表达式过滤
consumer.subscribe("order-topic",
    MessageSelector.bySql("amount > 1000 AND status = 'PAID'"));

这条消息只会在 amount > 1000 AND status = 'PAID' 时才会被推给 Consumer。不符合条件的消息,从文件读取但不传输。网络带宽省下来了。

SQL92 支持的语法包括:比较运算符、AND/OR、IS NULL、BETWEEN。不支持函数、子查询。毕竟它的定位是消息过滤,不是替代数据库查询。

需要注意的是,Broker 需要开启 SQL92 支持:

# broker.conf
enablePropertyFilter=true

如果不开启,SQL92 表达式会被当成普通 Tag 字符串处理,过滤结果完全不对。


什么时候用 Tag,什么时候用 SQL92

场景推荐方式例子
单一条件,值离散Tag订单状态:PAID/SHIPPED/CANCELLED
多条件组合SQL92status='PAID' AND amount>1000
条件涉及数值比较SQL92金额 > 5000
条件涉及范围SQL92createTime BETWEEN 0 AND 9999999
简单分类,无需属性Tag日志级别:ERROR/WARN/INFO

一个折中方案:Tag 做粗筛,SQL92 做细筛。

Tag: ORDER(消息类型)
SQL92: amount > 1000 AND city = '北京'(业务过滤)

Tag 先筛掉 90% 不相关类型的消息,SQL92 在剩下 10% 里再精筛。两层过滤下来,Consumer 收到的消息基本都是它需要的。


注意:过滤在 Broker 端,但属性值要 Consumer 自己理解

一个常见的错误:SQL92 过滤了 amount > 1000,Consumer 拿到消息后直接用。没问题。但如果 Producer 把 amount 存成了字符串 "1500",而 Consumer 期望的是数字类型,SQL92 的数值比较可能不符合预期。

RocketMQ 的 SQL92 过滤把 userProperty 的值当字符串处理。amount > 1000 这个表达式,如果 amount 的值是 "800"(字符串),比较结果跟数字比较不一样。建议所有用于过滤的属性都用纯数字字符串,避免类型歧义。

另外,SQL92 过滤对 Broker 的 CPU 有一点点开销——每条消息都要做表达式求值。如果消息量极大(每秒百万级),建议先用 Tag 做第一层快速过滤,Tag 过滤是哈希匹配,几乎零开销。


最佳实践

Producer 侧:把过滤字段放 userProperty

msg.putUserProperty("orderStatus", "PAID");
msg.putUserProperty("amount", String.valueOf(order.getAmount()));
msg.putUserProperty("city", order.getCity());

不要把这些字段放 body 里。Broker 只读 userProperty,不解析 body。

Consumer 侧:用 MessageSelector

consumer.subscribe("order-topic",
    MessageSelector.bySql("orderStatus = 'PAID' AND amount >= 1000"));

Broker 侧:打开开关

enablePropertyFilter=true

总结

消息过滤这件事,核心就一句话:把不需要的消息挡在 Broker 里,别让它们占网卡。

Tag 是轻量级的标签过滤,一条消息一个 Tag,简单快速。SQL92 是属性表达式过滤,支持组合条件和数值比较。

大多数场景,Tag + SQL92 两层过滤就够了——Tag 粗筛消息类型,SQL92 细筛业务属性。Consumer 拉到的消息量从全量降到精准子集,带宽开销降 70% 是很保守的估计。


有用的话转给还在 Consumer 端 if-else 过滤全量消息的同事。


标题:RocketMQ Tag/SQL92 过滤实战:客户端拉取无效消息浪费带宽?Broker 端精准过滤,网络开销降 70%!
作者:jiangyi
地址:http://jiangyi.space/articles/2026/06/10/1780755999779.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消