SpringBoot + Canal + RabbitMQ MySQL数据同步示例项目
项目简介
基于SpringBoot + Canal + RabbitMQ实现MySQL数据变更的实时同步到缓存和搜索系统。通过监听MySQL的binlog日志,实时捕获数据变更,并通过RabbitMQ将变更消息发送到下游系统进行处理。
核心功能
- 实时数据同步:基于Canal监听MySQL binlog实现数据变更的实时捕获
- 消息队列解耦:通过RabbitMQ实现数据变更消息的异步处理
- 缓存更新:自动更新Redis缓存,保证缓存与数据库的一致性
- 高可用设计:支持集群部署,确保服务的高可用性
技术栈
- SpringBoot 2.6.3
- Canal Client 1.1.5
- RabbitMQ
- Redis
- MySQL
- FastJSON
项目结构
src/main/java/com/example/canal
├── config/ # 配置类
│ ├── CanalConfig.java # Canal配置类
│ └── RabbitMQConfig.java # RabbitMQ配置类
├── service/ # 业务服务类
│ └── DataSyncService.java # 数据同步服务类
└── CanalApplication.java # 主启动类
环境要求
- Java 8+
- Maven 3.6+
- MySQL 5.7+
- RabbitMQ 3.8+
- Redis 3.0+
- Canal Server 1.1.5+
配置说明
MySQL配置
首先需要在MySQL中开启binlog:
-- my.cnf配置
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
然后创建Canal专用账户:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
application.yml配置
server:
port: 8080
spring:
application:
name: canal-rabbitmq-sync
# RabbitMQ配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual
concurrency: 5
max-concurrency: 10
# Redis配置
redis:
host: localhost
port: 6379
database: 0
timeout: 2000ms
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
# Canal配置
canal:
client:
# Canal服务器地址
hostname: 127.0.0.1
# Canal服务端口
port: 11111
# 目标数据库实例
destination: example
# 用户名
username:
# 密码
password:
# 批量获取数据的数量
batchSize: 1000
核心实现原理
1. Canal数据监听
通过Canal Client连接到Canal Server,订阅指定数据库的变更消息:
@Bean
public CanalConnector canalConnector() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(hostname, port),
destination,
username,
password);
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
return connector;
}
2. 数据变更处理
在DataSyncService中处理数据变更:
private void processEntries(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.ROWDATA) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
// 处理行数据变更
processRowData(entry.getHeader(), rowChange);
} catch (Exception e) {
log.error("解析行数据异常", e);
}
}
}
}
3. 消息发送与缓存更新
将数据变更消息发送到RabbitMQ,并更新Redis缓存:
// 发送到RabbitMQ
rabbitTemplate.convertAndSend(exchange, routingKey, data);
// 更新Redis缓存
redisTemplate.opsForValue().set(key, value, 30, TimeUnit.MINUTES);
部署步骤
- 启动MySQL并配置binlog
- 启动Canal Server
- 启动RabbitMQ服务
- 启动Redis服务
- 配置application.yml中的连接信息
- 运行SpringBoot应用
扩展建议
- 数据过滤:可以根据业务需求对特定表或字段进行过滤
- 格式转换:可以增加数据格式转换功能,适应不同的下游系统
- 监控告警:可以集成监控系统,实时监控数据同步状态
- 失败重试:可以增加失败消息的重试机制,提高系统可靠性
注意事项
- 确保MySQL的binlog格式为ROW模式
- Canal Server需要能够访问MySQL的binlog
- 生产环境中建议使用持久化队列,确保消息不丢失
- 根据实际业务场景调整批量处理大小和并发数
项目地址
联系方式
如有任何问题,请联系!
标题:SpringBoot + Canal + RabbitMQ MySQL数据同步示例项目
作者:jiangyi
地址:http://jiangyi.space/articles/2025/12/21/1766328138906.html
0 评论