SpringBoot + 读写分离 + 事务内强制主库:避免主从延迟导致读取脏数据

导语

在大型应用系统中,为了提升数据库的并发处理能力,通常会采用读写分离的架构。主库负责处理写操作,而从库负责处理读操作。然而,这种架构带来了一个常见的问题:主从复制存在延迟,导致从库读取的数据可能是过期数据。本文将介绍如何在SpringBoot应用中实现读写分离,并针对事务场景提供强制主库的解决方案,确保在事务内读取的数据是最新的,避免因主从延迟导致的脏读问题。

一、读写分离与主从延迟问题

1.1 读写分离架构

1. 架构设计

在读写分离架构中:

  • 主库(Master):负责处理所有的写操作(INSERT、UPDATE、DELETE)
  • 从库(Slave):负责处理所有的读操作(SELECT)
  • 数据复制:主库的数据通过复制机制同步到从库

2. 优势

优势描述
读写负载分离写操作和读操作分别由不同的数据库处理
提高并发能力可以部署多个从库分担读压力
提升读取性能读操作分散到多个从库,减少单库压力
高可用性主库故障时,可以将从库提升为主库

1.2 主从延迟问题

1. 延迟原因

  • 复制机制:MySQL主从复制是异步的,存在延迟
  • 网络问题:主从之间的网络延迟
  • 负载过高:从库处理能力不足
  • 大事务:主库上的大事务会导致延迟

2. 延迟的影响

影响描述
数据不一致从库数据不是最新的
脏读读取到未提交或过期的数据
业务错误基于过期数据做出错误的业务决策
用户体验刚更新的数据查询不到

3. 延迟场景示例

// 场景:用户下单后立即查询订单
@Service
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    public void createOrder(Order order) {
        // 1. 创建订单(写入主库)
        orderRepository.save(order);
        
        // 2. 立即查询订单(可能从从库读取)
        // 此时从库可能还没有同步到新订单
        Order createdOrder = orderRepository.findById(order.getId());
        // createdOrder 可能是 null!
    }
}

二、SpringBoot集成读写分离

2.1 依赖配置

<dependencies>
    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- Spring Data JPA -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    
    <!-- MySQL -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
    
    <!-- HikariCP 连接池 -->
    <dependency>
        <groupId>com.zaxxer</groupId>
        <artifactId>HikariCP</artifactId>
    </dependency>
    
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

2.2 数据源配置

1. 多数据源配置

@Configuration
public class DataSourceConfig {
    
    // 主库数据源
    @Bean(name = "masterDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.master")
    public DataSource masterDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    // 从库数据源
    @Bean(name = "slaveDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.slave")
    public DataSource slaveDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    // 路由数据源
    @Bean(name = "dataSource")
    @Primary
    public DataSource dataSource(@Qualifier("masterDataSource") DataSource masterDataSource,
                                  @Qualifier("slaveDataSource") DataSource slaveDataSource) {
        return new ReadWriteDataSource(masterDataSource, slaveDataSource);
    }
}

2. 读写分离路由数据源

public class ReadWriteDataSource extends AbstractRoutingDataSource {
    
    private static final ThreadLocal<String> DATA_SOURCE_KEY = new ThreadLocal<>();
    
    public ReadWriteDataSource(DataSource masterDataSource, DataSource slaveDataSource) {
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("master", masterDataSource);
        targetDataSources.put("slave", slaveDataSource);
        
        setTargetDataSources(targetDataSources);
        setDefaultTargetDataSource(masterDataSource);
        afterPropertiesSet();
    }
    
    /**
     * 获取当前数据源
     */
    @Override
    protected Object determineCurrentLookupKey() {
        String key = DATA_SOURCE_KEY.get();
        if (key == null) {
            key = "slave"; // 默认使用从库
        }
        return key;
    }
    
    /**
     * 设置读数据源
     */
    public static void setReadDataSource() {
        DATA_SOURCE_KEY.set("slave");
    }
    
    /**
     * 设置写数据源
     */
    public static void setWriteDataSource() {
        DATA_SOURCE_KEY.set("master");
    }
    
    /**
     * 清除数据源设置
     */
    public static void clearDataSource() {
        DATA_SOURCE_KEY.remove();
    }
}

2.3 事务管理器配置

@Configuration
@EnableTransactionManagement
public class TransactionManagerConfig {
    
    @Autowired
    @Qualifier("dataSource")
    private DataSource dataSource;
    
    /**
     * 主库事务管理器
     */
    @Bean(name = "masterTransactionManager")
    public DataSourceTransactionManager masterTransactionManager() {
        return new DataSourceTransactionManager(dataSource);
    }
    
    /**
     * 事务管理器
     */
    @Bean(name = "transactionManager")
    @Primary
    public DataSourceTransactionManager transactionManager() {
        return new DataSourceTransactionManager(dataSource);
    }
}

2.4 AOP切面实现

@Component
@Aspect
@Slf4j
public class DataSourceAspect {
    
    /**
     * 切换到从库
     */
    @Before("execution(* com.example.*.repository..*.find*(..)) || " +
            "execution(* com.example.*.repository..*.get*(..)) || " +
            "execution(* com.example.*.repository..*.count*(..))")
    public void setReadDataSource() {
        ReadWriteDataSource.setReadDataSource();
    }
    
    /**
     * 切换到主库
     */
    @Before("execution(* com.example.*.repository..*.save*(..)) || " +
            "execution(* com.example.*.repository..*.delete*(..)) || " +
            "execution(* com.example.*.repository..*.update*(..))")
    public void setWriteDataSource() {
        ReadWriteDataSource.setWriteDataSource();
    }
    
    /**
     * 清除数据源设置
     */
    @After("execution(* com.example.*.repository..*.*(..))")
    public void clearDataSource() {
        ReadWriteDataSource.clearDataSource();
    }
}

三、事务内强制主库实现

3.1 强制主库注解

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Master {
    /**
     * 是否强制使用主库
     */
    boolean force() default true;
}

3.2 强制主库切面

@Component
@Aspect
@Slf4j
@Order(1) // 确保在数据源切面之前执行
public class MasterForceAspect {
    
    /**
     * 强制主库处理
     */
    @Around("@annotation(Master)")
    public Object forceMaster(ProceedingJoinPoint joinPoint, Master master) throws Throwable {
        try {
            // 强制切换到主库
            ReadWriteDataSource.setWriteDataSource();
            return joinPoint.proceed();
        } finally {
            // 清除数据源设置
            ReadWriteDataSource.clearDataSource();
        }
    }
}

3.3 事务内强制主库服务

@Service
@Slf4j
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private PlatformTransactionManager transactionManager;
    
    /**
     * 创建订单并立即查询(强制主库)
     */
    @Master(force = true)
    public Order createOrderAndGet(Order order) {
        // 1. 创建订单(强制主库)
        Order savedOrder = orderRepository.save(order);
        
        // 2. 立即查询订单(强制主库,确保读取最新数据)
        Order createdOrder = orderRepository.findById(savedOrder.getId())
            .orElseThrow(() -> new RuntimeException("Order not found"));
        
        return createdOrder;
    }
    
    /**
     * 使用事务强制主库
     */
    public Order createOrderWithTransaction(Order order) {
        TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
        
        return transactionTemplate.execute(status -> {
            // 在事务内,强制使用主库
            ReadWriteDataSource.setWriteDataSource();
            
            // 创建订单
            Order savedOrder = orderRepository.save(order);
            
            // 立即查询
            Order createdOrder = orderRepository.findById(savedOrder.getId())
                .orElseThrow(() -> new RuntimeException("Order not found"));
            
            return createdOrder;
        });
    }
}

3.4 事务同步管理器方案

@Component
public class TransactionContext {
    
    private static final ThreadLocal<Boolean> FORCE_MASTER = new ThreadLocal<>();
    
    /**
     * 标记强制使用主库
     */
    public static void markForceMaster() {
        FORCE_MASTER.set(true);
    }
    
    /**
     * 清除标记
     */
    public static void clear() {
        FORCE_MASTER.remove();
    }
    
    /**
     * 是否强制使用主库
     */
    public static boolean isForceMaster() {
        return Boolean.TRUE.equals(FORCE_MASTER.get());
    }
}

3.5 增强版数据源

public class EnhancedReadWriteDataSource extends AbstractRoutingDataSource {
    
    private static final ThreadLocal<String> DATA_SOURCE_KEY = new ThreadLocal<>();
    
    @Override
    protected Object determineCurrentLookupKey() {
        // 如果强制使用主库,直接返回主库
        if (TransactionContext.isForceMaster()) {
            return "master";
        }
        
        String key = DATA_SOURCE_KEY.get();
        if (key == null) {
            key = "slave"; // 默认使用从库
        }
        return key;
    }
    
    public static void setReadDataSource() {
        DATA_SOURCE_KEY.set("slave");
    }
    
    public static void setWriteDataSource() {
        DATA_SOURCE_KEY.set("master");
    }
    
    public static void clearDataSource() {
        DATA_SOURCE_KEY.remove();
    }
}

四、完整实现方案

4.1 配置类

DataSourceConfiguration.java

@Configuration
public class DataSourceConfiguration {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.master")
    public DataSource masterDataSource() {
        return DataSourceBuilder.create().type(HikariDataSource.class).build();
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.slave")
    public DataSource slaveDataSource() {
        return DataSourceBuilder.create().type(HikariDataSource.class).build();
    }
    
    @Bean
    @Primary
    public DataSource routingDataSource(@Qualifier("masterDataSource") DataSource masterDataSource,
                                         @Qualifier("slaveDataSource") DataSource slaveDataSource) {
        DynamicDataSource dynamicDataSource = new DynamicDataSource();
        dynamicDataSource.setDefaultTargetDataSource(masterDataSource);
        
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put(DataSourceType.MASTER, masterDataSource);
        targetDataSources.put(DataSourceType.SLAVE, slaveDataSource);
        dynamicDataSource.setTargetDataSources(targetDataSources);
        
        return dynamicDataSource;
    }
    
    @Bean
    @Primary
    public DataSource dataSource(@Qualifier("routingDataSource") DataSource routingDataSource) {
        return new TransactionSyncDataSource(routingDataSource);
    }
}

4.2 事务同步数据源

TransactionSyncDataSource.java

public class TransactionSyncDataSource extends DelegatingDataSource {
    
    public TransactionSyncDataSource(DataSource targetDataSource) {
        super(targetDataSource);
    }
    
    @Override
    public Connection getConnection() throws SQLException {
        Connection connection = super.getConnection();
        return new TransactionSyncConnection(connection);
    }
    
    /**
     * 事务同步连接
     */
    private static class TransactionSyncConnection implements Connection {
        
        private final Connection delegate;
        
        public TransactionSyncConnection(Connection delegate) {
            this.delegate = delegate;
        }
        
        @Override
        public Connection getConnection() throws SQLException {
            // 在事务内,强制使用主库
            if (TransactionContext.isForceMaster()) {
                // 切换到主库的逻辑
                DynamicDataSource.setWriteDataSource();
            }
            return delegate;
        }
        
        // 其他方法委托给原始连接
        @Override
        public PreparedStatement prepareStatement(String sql) throws SQLException {
            return delegate.prepareStatement(sql);
        }
        
        // ... 其他委托方法
    }
}

4.3 服务层使用

OrderService.java

@Service
@Slf4j
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    /**
     * 创建订单(默认主库)
     */
    public Order createOrder(Order order) {
        return orderRepository.save(order);
    }
    
    /**
     * 查询订单列表(默认从库)
     */
    public List<Order> listOrders() {
        return orderRepository.findAll();
    }
    
    /**
     * 创建订单并返回创建结果(强制主库)
     */
    @Master
    public Order createOrderAndReturn(Order order) {
        Order savedOrder = orderRepository.save(order);
        return orderRepository.findById(savedOrder.getId())
            .orElseThrow(() -> new RuntimeException("Order not found"));
    }
    
    /**
     * 创建订单并查询用户(强制主库)
     */
    @Master
    public UserOrderDTO createOrderAndGetUser(Order order, Long userId) {
        Order savedOrder = orderRepository.save(order);
        // 立即查询,确保数据一致性
        Order createdOrder = orderRepository.findById(savedOrder.getId())
            .orElseThrow(() -> new RuntimeException("Order not found"));
        
        // 查询用户信息(强制主库)
        User user = userRepository.findById(userId)
            .orElseThrow(() -> new RuntimeException("User not found"));
        
        return new UserOrderDTO(user, createdOrder);
    }
}

4.4 控制器

OrderController.java

@RestController
@RequestMapping("/api/orders")
public class OrderController {
    
    @Autowired
    private OrderService orderService;
    
    /**
     * 创建订单并返回结果
     */
    @PostMapping
    public ResponseEntity<Order> createOrder(@RequestBody Order order) {
        Order createdOrder = orderService.createOrderAndReturn(order);
        return ResponseEntity.status(HttpStatus.CREATED).body(createdOrder);
    }
    
    /**
     * 创建订单并获取用户信息
     */
    @PostMapping("/with-user")
    public ResponseEntity<UserOrderDTO> createOrderWithUser(@RequestBody Order order, @RequestParam Long userId) {
        UserOrderDTO dto = orderService.createOrderAndGetUser(order, userId);
        return ResponseEntity.status(HttpStatus.CREATED).body(dto);
    }
    
    /**
     * 查询订单列表
     */
    @GetMapping
    public ResponseEntity<List<Order>> listOrders() {
        List<Order> orders = orderService.listOrders();
        return ResponseEntity.ok(orders);
    }
}

五、生产级配置

5.1 应用配置

application.yml

# 应用配置
spring:
  application:
    name: springboot-read-write-separation
  
  # 主库数据源配置
  datasource:
    master:
      jdbc-url: jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC
      username: root
      password: password
      driver-class-name: com.mysql.cj.jdbc.Driver
      hikari:
        maximum-pool-size: 20
        minimum-idle: 5
        idle-timeout: 30000
        max-lifetime: 1800000
        connection-timeout: 30000
    
    # 从库数据源配置
    slave:
      jdbc-url: jdbc:mysql://localhost:3307/test?useSSL=false&serverTimezone=UTC
      username: root
      password: password
      driver-class-name: com.mysql.cj.jdbc.Driver
      hikari:
        maximum-pool-size: 30
        minimum-idle: 10
        idle-timeout: 30000
        max-lifetime: 1800000
        connection-timeout: 30000

# JPA 配置
  jpa:
    database-platform: org.hibernate.dialect.MySQL8Dialect
    hibernate:
      ddl-auto: validate
    show-sql: false

# 服务器配置
server:
  port: 8080

# 读写分离配置
read-write:
  # 主从延迟阈值(毫秒)
  replication-latency-threshold: 1000
  # 强制主库的超时时间
  force-master-timeout: 5000

# 日志配置
logging:
  level:
    com.example: info

5.2 主从延迟监控

ReplicationMonitor.java

@Service
@Slf4j
public class ReplicationMonitor {
    
    @Autowired
    private DataSource dataSource;
    
    /**
     * 检查主从延迟
     */
    public long getReplicationLag() {
        try (Connection connection = dataSource.getConnection();
             Statement statement = connection.createStatement()) {
            
            // 查询从库的延迟时间
            ResultSet rs = statement.executeQuery("SHOW SLAVE STATUS");
            if (rs.next()) {
                return rs.getLong("Seconds_Behind_Master");
            }
        } catch (Exception e) {
            log.error("Failed to get replication lag", e);
        }
        return -1;
    }
    
    /**
     * 检查是否需要强制主库
     */
    public boolean shouldForceMaster() {
        long lag = getReplicationLag();
        return lag > 1000; // 延迟超过1秒,强制使用主库
    }
}

5.3 健康检查

DataSourceHealthIndicator.java

@Component
public class DataSourceHealthIndicator implements HealthIndicator {
    
    @Autowired
    private DataSource dataSource;
    
    @Autowired
    private ReplicationMonitor replicationMonitor;
    
    @Override
    public Health health() {
        try (Connection connection = dataSource.getConnection()) {
            long lag = replicationMonitor.getReplicationLag();
            
            if (lag < 0) {
                return Health.down()
                    .withDetail("replication", "not configured")
                    .build();
            } else if (lag > 1000) {
                return Health.up()
                    .withDetail("replication_lag", lag + "ms")
                    .withDetail("status", "high lag")
                    .build();
            } else {
                return Health.up()
                    .withDetail("replication_lag", lag + "ms")
                    .withDetail("status", "normal")
                    .build();
            }
        } catch (Exception e) {
            return Health.down()
                .withDetail("error", e.getMessage())
                .build();
        }
    }
}

六、最佳实践

6.1 强制主库使用场景

1. 刚写入的数据需要立即读取

  • 用户注册后需要立即返回用户信息
  • 订单创建后需要立即返回订单详情
  • 支付完成后需要立即查询支付结果

2. 事务内的读取操作

  • 同一个事务内,先写后读的场景
  • 需要保证数据一致性的场景

3. 关键业务数据的读取

  • 金融交易数据
  • 库存数据
  • 订单状态

6.2 性能优化

1. 减少强制主库的使用

  • 评估是否真的需要强制主库
  • 使用其他方式保证数据一致性(如消息队列)

2. 优化主库性能

  • 确保主库有足够的处理能力
  • 使用索引优化查询性能

3. 减少主从延迟

  • 优化网络连接
  • 使用半同步复制
  • 减少大事务

6.3 监控与告警

1. 监控指标

  • 主从延迟时间
  • 主库CPU和内存使用率
  • 从库查询延迟

2. 告警配置

  • 延迟超过阈值时告警
  • 主库故障时告警
  • 从库故障时告警

七、案例分析

7.1 案例一:电商订单创建

场景

  • 用户在电商平台下单
  • 创建订单后需要立即返回订单详情
  • 订单状态需要立即查询

问题

  • 创建订单后从从库查询,可能查不到新订单
  • 用户体验差

解决方案

  1. 创建订单使用@Master注解强制主库
  2. 立即查询订单也强制主库
  3. 确保数据一致性

效果

  • 用户下单后能立即看到订单详情
  • 数据一致性得到保证
  • 用户体验提升

7.2 案例二:库存扣减

场景

  • 用户下单时需要扣减库存
  • 扣减后需要立即查询库存数量
  • 防止超卖

问题

  • 从库库存数据可能不是最新的
  • 可能导致超卖

解决方案

  1. 库存扣减使用主库
  2. 扣减后立即查询库存使用主库
  3. 确保库存数据一致性

效果

  • 库存数据一致
  • 防止超卖
  • 系统稳定

7.3 案例三:用户注册

场景

  • 用户注册新账号
  • 注册后需要立即返回用户信息
  • 需要立即使用用户ID进行其他操作

问题

  • 从库可能还没有同步新用户信息
  • 导致后续操作失败

解决方案

  1. 用户注册使用主库
  2. 注册后立即查询使用主库
  3. 确保用户信息可用

效果

  • 用户注册后能立即使用
  • 系统运行正常
  • 用户体验良好

互动话题

  1. 您在项目中遇到过哪些主从延迟的问题?是如何解决的?
  2. 您对本文介绍的强制主库方案有什么改进建议?
  3. 您认为在读写分离架构中,还有哪些场景需要强制使用主库?
  4. 您对未来数据库架构的发展有什么看法?

欢迎在评论区分享您的经验和看法!欢迎关注公众号“服务端技术精选”,获取更多技术文章和建议。


标题:SpringBoot + 读写分离 + 事务内强制主库:避免主从延迟导致读取脏数据
作者:jiangyi
地址:http://jiangyi.space/articles/2026/03/15/1773296032786.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消