SpringBoot + Canal + RabbitMQ MySQL数据同步示例项目

项目简介

基于SpringBoot + Canal + RabbitMQ实现MySQL数据变更的实时同步到缓存和搜索系统。通过监听MySQL的binlog日志,实时捕获数据变更,并通过RabbitMQ将变更消息发送到下游系统进行处理。

核心功能

  1. 实时数据同步:基于Canal监听MySQL binlog实现数据变更的实时捕获
  2. 消息队列解耦:通过RabbitMQ实现数据变更消息的异步处理
  3. 缓存更新:自动更新Redis缓存,保证缓存与数据库的一致性
  4. 高可用设计:支持集群部署,确保服务的高可用性

技术栈

  • SpringBoot 2.6.3
  • Canal Client 1.1.5
  • RabbitMQ
  • Redis
  • MySQL
  • FastJSON

项目结构

src/main/java/com/example/canal
├── config/ # 配置类
│ ├── CanalConfig.java # Canal配置类
│ └── RabbitMQConfig.java # RabbitMQ配置类
├── service/ # 业务服务类
│ └── DataSyncService.java # 数据同步服务类
└── CanalApplication.java # 主启动类

环境要求

  • Java 8+
  • Maven 3.6+
  • MySQL 5.7+
  • RabbitMQ 3.8+
  • Redis 3.0+
  • Canal Server 1.1.5+

配置说明

MySQL配置

首先需要在MySQL中开启binlog:

-- my.cnf配置
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1

然后创建Canal专用账户:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

application.yml配置

server:
  port: 8080

spring:
  application:
    name: canal-rabbitmq-sync
  
  # RabbitMQ配置
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 5
        max-concurrency: 10
        
  # Redis配置
  redis:
    host: localhost
    port: 6379
    database: 0
    timeout: 2000ms
    lettuce:
      pool:
        max-active: 8
        max-idle: 8
        min-idle: 0

# Canal配置
canal:
  client:
    # Canal服务器地址
    hostname: 127.0.0.1
    # Canal服务端口
    port: 11111
    # 目标数据库实例
    destination: example
    # 用户名
    username: 
    # 密码
    password: 
    # 批量获取数据的数量
    batchSize: 1000

核心实现原理

1. Canal数据监听

通过Canal Client连接到Canal Server,订阅指定数据库的变更消息:

@Bean
public CanalConnector canalConnector() {
    CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress(hostname, port), 
            destination, 
            username, 
            password);
    
    connector.connect();
    connector.subscribe(".*\\..*");
    connector.rollback();
    
    return connector;
}

2. 数据变更处理

在DataSyncService中处理数据变更:

private void processEntries(List<Entry> entries) {
    for (Entry entry : entries) {
        if (entry.getEntryType() == EntryType.ROWDATA) {
            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                // 处理行数据变更
                processRowData(entry.getHeader(), rowChange);
            } catch (Exception e) {
                log.error("解析行数据异常", e);
            }
        }
    }
}

3. 消息发送与缓存更新

将数据变更消息发送到RabbitMQ,并更新Redis缓存:

// 发送到RabbitMQ
rabbitTemplate.convertAndSend(exchange, routingKey, data);

// 更新Redis缓存
redisTemplate.opsForValue().set(key, value, 30, TimeUnit.MINUTES);

部署步骤

  1. 启动MySQL并配置binlog
  2. 启动Canal Server
  3. 启动RabbitMQ服务
  4. 启动Redis服务
  5. 配置application.yml中的连接信息
  6. 运行SpringBoot应用

扩展建议

  1. 数据过滤:可以根据业务需求对特定表或字段进行过滤
  2. 格式转换:可以增加数据格式转换功能,适应不同的下游系统
  3. 监控告警:可以集成监控系统,实时监控数据同步状态
  4. 失败重试:可以增加失败消息的重试机制,提高系统可靠性

注意事项

  1. 确保MySQL的binlog格式为ROW模式
  2. Canal Server需要能够访问MySQL的binlog
  3. 生产环境中建议使用持久化队列,确保消息不丢失
  4. 根据实际业务场景调整批量处理大小和并发数

项目地址

源码下载

联系方式

如有任何问题,请联系!


标题:SpringBoot + Canal + RabbitMQ MySQL数据同步示例项目
作者:jiangyi
地址:http://jiangyi.space/articles/2025/12/21/1766328138906.html

    0 评论
avatar