SpringBoot + 读写分离 + 事务内强制主库:避免主从延迟导致读取脏数据
导语
在大型应用系统中,为了提升数据库的并发处理能力,通常会采用读写分离的架构。主库负责处理写操作,而从库负责处理读操作。然而,这种架构带来了一个常见的问题:主从复制存在延迟,导致从库读取的数据可能是过期数据。本文将介绍如何在SpringBoot应用中实现读写分离,并针对事务场景提供强制主库的解决方案,确保在事务内读取的数据是最新的,避免因主从延迟导致的脏读问题。
一、读写分离与主从延迟问题
1.1 读写分离架构
1. 架构设计
在读写分离架构中:
- 主库(Master):负责处理所有的写操作(INSERT、UPDATE、DELETE)
- 从库(Slave):负责处理所有的读操作(SELECT)
- 数据复制:主库的数据通过复制机制同步到从库
2. 优势
| 优势 | 描述 |
|---|---|
| 读写负载分离 | 写操作和读操作分别由不同的数据库处理 |
| 提高并发能力 | 可以部署多个从库分担读压力 |
| 提升读取性能 | 读操作分散到多个从库,减少单库压力 |
| 高可用性 | 主库故障时,可以将从库提升为主库 |
1.2 主从延迟问题
1. 延迟原因
- 复制机制:MySQL主从复制是异步的,存在延迟
- 网络问题:主从之间的网络延迟
- 负载过高:从库处理能力不足
- 大事务:主库上的大事务会导致延迟
2. 延迟的影响
| 影响 | 描述 |
|---|---|
| 数据不一致 | 从库数据不是最新的 |
| 脏读 | 读取到未提交或过期的数据 |
| 业务错误 | 基于过期数据做出错误的业务决策 |
| 用户体验 | 刚更新的数据查询不到 |
3. 延迟场景示例
// 场景:用户下单后立即查询订单
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
public void createOrder(Order order) {
// 1. 创建订单(写入主库)
orderRepository.save(order);
// 2. 立即查询订单(可能从从库读取)
// 此时从库可能还没有同步到新订单
Order createdOrder = orderRepository.findById(order.getId());
// createdOrder 可能是 null!
}
}
二、SpringBoot集成读写分离
2.1 依赖配置
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- MySQL -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- HikariCP 连接池 -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
2.2 数据源配置
1. 多数据源配置
@Configuration
public class DataSourceConfig {
// 主库数据源
@Bean(name = "masterDataSource")
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
// 从库数据源
@Bean(name = "slaveDataSource")
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSource slaveDataSource() {
return DataSourceBuilder.create().build();
}
// 路由数据源
@Bean(name = "dataSource")
@Primary
public DataSource dataSource(@Qualifier("masterDataSource") DataSource masterDataSource,
@Qualifier("slaveDataSource") DataSource slaveDataSource) {
return new ReadWriteDataSource(masterDataSource, slaveDataSource);
}
}
2. 读写分离路由数据源
public class ReadWriteDataSource extends AbstractRoutingDataSource {
private static final ThreadLocal<String> DATA_SOURCE_KEY = new ThreadLocal<>();
public ReadWriteDataSource(DataSource masterDataSource, DataSource slaveDataSource) {
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("master", masterDataSource);
targetDataSources.put("slave", slaveDataSource);
setTargetDataSources(targetDataSources);
setDefaultTargetDataSource(masterDataSource);
afterPropertiesSet();
}
/**
* 获取当前数据源
*/
@Override
protected Object determineCurrentLookupKey() {
String key = DATA_SOURCE_KEY.get();
if (key == null) {
key = "slave"; // 默认使用从库
}
return key;
}
/**
* 设置读数据源
*/
public static void setReadDataSource() {
DATA_SOURCE_KEY.set("slave");
}
/**
* 设置写数据源
*/
public static void setWriteDataSource() {
DATA_SOURCE_KEY.set("master");
}
/**
* 清除数据源设置
*/
public static void clearDataSource() {
DATA_SOURCE_KEY.remove();
}
}
2.3 事务管理器配置
@Configuration
@EnableTransactionManagement
public class TransactionManagerConfig {
@Autowired
@Qualifier("dataSource")
private DataSource dataSource;
/**
* 主库事务管理器
*/
@Bean(name = "masterTransactionManager")
public DataSourceTransactionManager masterTransactionManager() {
return new DataSourceTransactionManager(dataSource);
}
/**
* 事务管理器
*/
@Bean(name = "transactionManager")
@Primary
public DataSourceTransactionManager transactionManager() {
return new DataSourceTransactionManager(dataSource);
}
}
2.4 AOP切面实现
@Component
@Aspect
@Slf4j
public class DataSourceAspect {
/**
* 切换到从库
*/
@Before("execution(* com.example.*.repository..*.find*(..)) || " +
"execution(* com.example.*.repository..*.get*(..)) || " +
"execution(* com.example.*.repository..*.count*(..))")
public void setReadDataSource() {
ReadWriteDataSource.setReadDataSource();
}
/**
* 切换到主库
*/
@Before("execution(* com.example.*.repository..*.save*(..)) || " +
"execution(* com.example.*.repository..*.delete*(..)) || " +
"execution(* com.example.*.repository..*.update*(..))")
public void setWriteDataSource() {
ReadWriteDataSource.setWriteDataSource();
}
/**
* 清除数据源设置
*/
@After("execution(* com.example.*.repository..*.*(..))")
public void clearDataSource() {
ReadWriteDataSource.clearDataSource();
}
}
三、事务内强制主库实现
3.1 强制主库注解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Master {
/**
* 是否强制使用主库
*/
boolean force() default true;
}
3.2 强制主库切面
@Component
@Aspect
@Slf4j
@Order(1) // 确保在数据源切面之前执行
public class MasterForceAspect {
/**
* 强制主库处理
*/
@Around("@annotation(Master)")
public Object forceMaster(ProceedingJoinPoint joinPoint, Master master) throws Throwable {
try {
// 强制切换到主库
ReadWriteDataSource.setWriteDataSource();
return joinPoint.proceed();
} finally {
// 清除数据源设置
ReadWriteDataSource.clearDataSource();
}
}
}
3.3 事务内强制主库服务
@Service
@Slf4j
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private PlatformTransactionManager transactionManager;
/**
* 创建订单并立即查询(强制主库)
*/
@Master(force = true)
public Order createOrderAndGet(Order order) {
// 1. 创建订单(强制主库)
Order savedOrder = orderRepository.save(order);
// 2. 立即查询订单(强制主库,确保读取最新数据)
Order createdOrder = orderRepository.findById(savedOrder.getId())
.orElseThrow(() -> new RuntimeException("Order not found"));
return createdOrder;
}
/**
* 使用事务强制主库
*/
public Order createOrderWithTransaction(Order order) {
TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
return transactionTemplate.execute(status -> {
// 在事务内,强制使用主库
ReadWriteDataSource.setWriteDataSource();
// 创建订单
Order savedOrder = orderRepository.save(order);
// 立即查询
Order createdOrder = orderRepository.findById(savedOrder.getId())
.orElseThrow(() -> new RuntimeException("Order not found"));
return createdOrder;
});
}
}
3.4 事务同步管理器方案
@Component
public class TransactionContext {
private static final ThreadLocal<Boolean> FORCE_MASTER = new ThreadLocal<>();
/**
* 标记强制使用主库
*/
public static void markForceMaster() {
FORCE_MASTER.set(true);
}
/**
* 清除标记
*/
public static void clear() {
FORCE_MASTER.remove();
}
/**
* 是否强制使用主库
*/
public static boolean isForceMaster() {
return Boolean.TRUE.equals(FORCE_MASTER.get());
}
}
3.5 增强版数据源
public class EnhancedReadWriteDataSource extends AbstractRoutingDataSource {
private static final ThreadLocal<String> DATA_SOURCE_KEY = new ThreadLocal<>();
@Override
protected Object determineCurrentLookupKey() {
// 如果强制使用主库,直接返回主库
if (TransactionContext.isForceMaster()) {
return "master";
}
String key = DATA_SOURCE_KEY.get();
if (key == null) {
key = "slave"; // 默认使用从库
}
return key;
}
public static void setReadDataSource() {
DATA_SOURCE_KEY.set("slave");
}
public static void setWriteDataSource() {
DATA_SOURCE_KEY.set("master");
}
public static void clearDataSource() {
DATA_SOURCE_KEY.remove();
}
}
四、完整实现方案
4.1 配置类
DataSourceConfiguration.java
@Configuration
public class DataSourceConfiguration {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSource slaveDataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}
@Bean
@Primary
public DataSource routingDataSource(@Qualifier("masterDataSource") DataSource masterDataSource,
@Qualifier("slaveDataSource") DataSource slaveDataSource) {
DynamicDataSource dynamicDataSource = new DynamicDataSource();
dynamicDataSource.setDefaultTargetDataSource(masterDataSource);
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(DataSourceType.MASTER, masterDataSource);
targetDataSources.put(DataSourceType.SLAVE, slaveDataSource);
dynamicDataSource.setTargetDataSources(targetDataSources);
return dynamicDataSource;
}
@Bean
@Primary
public DataSource dataSource(@Qualifier("routingDataSource") DataSource routingDataSource) {
return new TransactionSyncDataSource(routingDataSource);
}
}
4.2 事务同步数据源
TransactionSyncDataSource.java
public class TransactionSyncDataSource extends DelegatingDataSource {
public TransactionSyncDataSource(DataSource targetDataSource) {
super(targetDataSource);
}
@Override
public Connection getConnection() throws SQLException {
Connection connection = super.getConnection();
return new TransactionSyncConnection(connection);
}
/**
* 事务同步连接
*/
private static class TransactionSyncConnection implements Connection {
private final Connection delegate;
public TransactionSyncConnection(Connection delegate) {
this.delegate = delegate;
}
@Override
public Connection getConnection() throws SQLException {
// 在事务内,强制使用主库
if (TransactionContext.isForceMaster()) {
// 切换到主库的逻辑
DynamicDataSource.setWriteDataSource();
}
return delegate;
}
// 其他方法委托给原始连接
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
return delegate.prepareStatement(sql);
}
// ... 其他委托方法
}
}
4.3 服务层使用
OrderService.java
@Service
@Slf4j
public class OrderService {
@Autowired
private OrderRepository orderRepository;
/**
* 创建订单(默认主库)
*/
public Order createOrder(Order order) {
return orderRepository.save(order);
}
/**
* 查询订单列表(默认从库)
*/
public List<Order> listOrders() {
return orderRepository.findAll();
}
/**
* 创建订单并返回创建结果(强制主库)
*/
@Master
public Order createOrderAndReturn(Order order) {
Order savedOrder = orderRepository.save(order);
return orderRepository.findById(savedOrder.getId())
.orElseThrow(() -> new RuntimeException("Order not found"));
}
/**
* 创建订单并查询用户(强制主库)
*/
@Master
public UserOrderDTO createOrderAndGetUser(Order order, Long userId) {
Order savedOrder = orderRepository.save(order);
// 立即查询,确保数据一致性
Order createdOrder = orderRepository.findById(savedOrder.getId())
.orElseThrow(() -> new RuntimeException("Order not found"));
// 查询用户信息(强制主库)
User user = userRepository.findById(userId)
.orElseThrow(() -> new RuntimeException("User not found"));
return new UserOrderDTO(user, createdOrder);
}
}
4.4 控制器
OrderController.java
@RestController
@RequestMapping("/api/orders")
public class OrderController {
@Autowired
private OrderService orderService;
/**
* 创建订单并返回结果
*/
@PostMapping
public ResponseEntity<Order> createOrder(@RequestBody Order order) {
Order createdOrder = orderService.createOrderAndReturn(order);
return ResponseEntity.status(HttpStatus.CREATED).body(createdOrder);
}
/**
* 创建订单并获取用户信息
*/
@PostMapping("/with-user")
public ResponseEntity<UserOrderDTO> createOrderWithUser(@RequestBody Order order, @RequestParam Long userId) {
UserOrderDTO dto = orderService.createOrderAndGetUser(order, userId);
return ResponseEntity.status(HttpStatus.CREATED).body(dto);
}
/**
* 查询订单列表
*/
@GetMapping
public ResponseEntity<List<Order>> listOrders() {
List<Order> orders = orderService.listOrders();
return ResponseEntity.ok(orders);
}
}
五、生产级配置
5.1 应用配置
application.yml
# 应用配置
spring:
application:
name: springboot-read-write-separation
# 主库数据源配置
datasource:
master:
jdbc-url: jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC
username: root
password: password
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 20
minimum-idle: 5
idle-timeout: 30000
max-lifetime: 1800000
connection-timeout: 30000
# 从库数据源配置
slave:
jdbc-url: jdbc:mysql://localhost:3307/test?useSSL=false&serverTimezone=UTC
username: root
password: password
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 30
minimum-idle: 10
idle-timeout: 30000
max-lifetime: 1800000
connection-timeout: 30000
# JPA 配置
jpa:
database-platform: org.hibernate.dialect.MySQL8Dialect
hibernate:
ddl-auto: validate
show-sql: false
# 服务器配置
server:
port: 8080
# 读写分离配置
read-write:
# 主从延迟阈值(毫秒)
replication-latency-threshold: 1000
# 强制主库的超时时间
force-master-timeout: 5000
# 日志配置
logging:
level:
com.example: info
5.2 主从延迟监控
ReplicationMonitor.java
@Service
@Slf4j
public class ReplicationMonitor {
@Autowired
private DataSource dataSource;
/**
* 检查主从延迟
*/
public long getReplicationLag() {
try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
// 查询从库的延迟时间
ResultSet rs = statement.executeQuery("SHOW SLAVE STATUS");
if (rs.next()) {
return rs.getLong("Seconds_Behind_Master");
}
} catch (Exception e) {
log.error("Failed to get replication lag", e);
}
return -1;
}
/**
* 检查是否需要强制主库
*/
public boolean shouldForceMaster() {
long lag = getReplicationLag();
return lag > 1000; // 延迟超过1秒,强制使用主库
}
}
5.3 健康检查
DataSourceHealthIndicator.java
@Component
public class DataSourceHealthIndicator implements HealthIndicator {
@Autowired
private DataSource dataSource;
@Autowired
private ReplicationMonitor replicationMonitor;
@Override
public Health health() {
try (Connection connection = dataSource.getConnection()) {
long lag = replicationMonitor.getReplicationLag();
if (lag < 0) {
return Health.down()
.withDetail("replication", "not configured")
.build();
} else if (lag > 1000) {
return Health.up()
.withDetail("replication_lag", lag + "ms")
.withDetail("status", "high lag")
.build();
} else {
return Health.up()
.withDetail("replication_lag", lag + "ms")
.withDetail("status", "normal")
.build();
}
} catch (Exception e) {
return Health.down()
.withDetail("error", e.getMessage())
.build();
}
}
}
六、最佳实践
6.1 强制主库使用场景
1. 刚写入的数据需要立即读取
- 用户注册后需要立即返回用户信息
- 订单创建后需要立即返回订单详情
- 支付完成后需要立即查询支付结果
2. 事务内的读取操作
- 同一个事务内,先写后读的场景
- 需要保证数据一致性的场景
3. 关键业务数据的读取
- 金融交易数据
- 库存数据
- 订单状态
6.2 性能优化
1. 减少强制主库的使用
- 评估是否真的需要强制主库
- 使用其他方式保证数据一致性(如消息队列)
2. 优化主库性能
- 确保主库有足够的处理能力
- 使用索引优化查询性能
3. 减少主从延迟
- 优化网络连接
- 使用半同步复制
- 减少大事务
6.3 监控与告警
1. 监控指标
- 主从延迟时间
- 主库CPU和内存使用率
- 从库查询延迟
2. 告警配置
- 延迟超过阈值时告警
- 主库故障时告警
- 从库故障时告警
七、案例分析
7.1 案例一:电商订单创建
场景:
- 用户在电商平台下单
- 创建订单后需要立即返回订单详情
- 订单状态需要立即查询
问题:
- 创建订单后从从库查询,可能查不到新订单
- 用户体验差
解决方案:
- 创建订单使用@Master注解强制主库
- 立即查询订单也强制主库
- 确保数据一致性
效果:
- 用户下单后能立即看到订单详情
- 数据一致性得到保证
- 用户体验提升
7.2 案例二:库存扣减
场景:
- 用户下单时需要扣减库存
- 扣减后需要立即查询库存数量
- 防止超卖
问题:
- 从库库存数据可能不是最新的
- 可能导致超卖
解决方案:
- 库存扣减使用主库
- 扣减后立即查询库存使用主库
- 确保库存数据一致性
效果:
- 库存数据一致
- 防止超卖
- 系统稳定
7.3 案例三:用户注册
场景:
- 用户注册新账号
- 注册后需要立即返回用户信息
- 需要立即使用用户ID进行其他操作
问题:
- 从库可能还没有同步新用户信息
- 导致后续操作失败
解决方案:
- 用户注册使用主库
- 注册后立即查询使用主库
- 确保用户信息可用
效果:
- 用户注册后能立即使用
- 系统运行正常
- 用户体验良好
互动话题
- 您在项目中遇到过哪些主从延迟的问题?是如何解决的?
- 您对本文介绍的强制主库方案有什么改进建议?
- 您认为在读写分离架构中,还有哪些场景需要强制使用主库?
- 您对未来数据库架构的发展有什么看法?
欢迎在评论区分享您的经验和看法!欢迎关注公众号“服务端技术精选”,获取更多技术文章和建议。
标题:SpringBoot + 读写分离 + 事务内强制主库:避免主从延迟导致读取脏数据
作者:jiangyi
地址:http://jiangyi.space/articles/2026/03/15/1773296032786.html
公众号:服务端技术精选
- 导语
- 一、读写分离与主从延迟问题
- 1.1 读写分离架构
- 1.2 主从延迟问题
- 二、SpringBoot集成读写分离
- 2.1 依赖配置
- 2.2 数据源配置
- 2.3 事务管理器配置
- 2.4 AOP切面实现
- 三、事务内强制主库实现
- 3.1 强制主库注解
- 3.2 强制主库切面
- 3.3 事务内强制主库服务
- 3.4 事务同步管理器方案
- 3.5 增强版数据源
- 四、完整实现方案
- 4.1 配置类
- 4.2 事务同步数据源
- 4.3 服务层使用
- 4.4 控制器
- 五、生产级配置
- 5.1 应用配置
- 5.2 主从延迟监控
- 5.3 健康检查
- 六、最佳实践
- 6.1 强制主库使用场景
- 6.2 性能优化
- 6.3 监控与告警
- 七、案例分析
- 7.1 案例一:电商订单创建
- 7.2 案例二:库存扣减
- 7.3 案例三:用户注册
- 互动话题
评论
0 评论