SpringBoot + 数据库连接泄漏检测 + 自动回收:连接未关闭?自动追踪泄漏点并告警

前言

在数据库应用中,数据库连接是一种宝贵的资源。然而,在实际开发中,由于代码编写不当或其他原因,可能会导致数据库连接泄漏。连接泄漏是指应用程序获取数据库连接后,没有正确关闭,导致连接池中的连接被耗尽,最终影响应用程序的性能和可用性。

想象一下这样的场景:你的应用在生产环境中运行一段时间后,突然出现性能下降,响应时间变长。通过查看日志,你发现数据库连接池中的连接数量持续增加,最终达到最大值,导致新的连接请求被拒绝。你需要手动重启应用才能恢复正常。

如何解决这个问题? 本文将详细介绍如何在 Spring Boot 中实现数据库连接泄漏检测和自动回收,帮助你及时发现和处理连接泄漏问题。

一、核心概念

1.1 数据库连接池

数据库连接池是一种管理数据库连接的机制,它可以:

  • 预先创建一定数量的数据库连接
  • 管理连接的分配和回收
  • 监控连接的使用情况
  • 提高数据库操作的性能

常见的数据库连接池包括:HikariCP、Druid、C3P0 等。

1.2 连接泄漏

连接泄漏是指应用程序获取数据库连接后,没有正确关闭,导致连接池中的连接被耗尽。连接泄漏的常见原因包括:

  • 代码中忘记调用 close() 方法关闭连接
  • 异常处理不当,导致 close() 方法没有被执行
  • 连接被长时间占用,没有及时释放

1.3 连接泄漏检测

连接泄漏检测是指通过监控数据库连接的使用情况,发现并识别连接泄漏的过程。常见的检测方法包括:

  • 监控连接的使用时间
  • 跟踪连接的获取和释放位置
  • 分析连接池的状态

1.4 自动回收

自动回收是指当检测到连接泄漏时,自动回收泄漏的连接,避免连接池被耗尽。常见的回收方法包括:

  • 强制关闭长时间占用的连接
  • 定期检查连接池的状态,回收空闲连接

二、技术方案

2.1 架构设计

数据库连接泄漏检测和自动回收的架构设计主要包括以下几个部分:

  1. 数据层:数据库,存储业务数据
  2. 连接池层:数据库连接池,管理数据库连接
  3. 监控层:监控连接池的状态和连接的使用情况
  4. 检测层:检测连接泄漏
  5. 回收层:自动回收泄漏的连接
  6. 告警层:当检测到连接泄漏时,触发告警通知

2.2 技术选型

  • Spring Boot:作为基础框架,提供依赖注入、配置管理等功能
  • HikariCP:作为数据库连接池,提供高性能的连接管理
  • Spring Data JPA:用于操作数据库
  • MySQL:作为数据库存储
  • Spring Boot Actuator:用于暴露监控端点
  • Prometheus:用于监控系统指标
  • 企业微信/钉钉:用于发送告警通知

2.3 核心流程

  1. 连接获取:应用程序从连接池获取数据库连接
  2. 连接使用:应用程序使用数据库连接执行数据库操作
  3. 连接释放:应用程序使用完毕后,释放数据库连接
  4. 连接监控:监控连接的使用情况,包括获取时间、使用时间、释放时间等
  5. 泄漏检测:检测连接是否泄漏,例如连接使用时间过长
  6. 自动回收:当检测到连接泄漏时,自动回收泄漏的连接
  7. 告警触发:当检测到连接泄漏时,触发告警通知

三、Spring Boot 数据库连接泄漏检测实现

3.1 依赖配置

<dependencies>
    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Data JPA -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>

    <!-- MySQL -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>

    <!-- HikariCP -->
    <dependency>
        <groupId>com.zaxxer</groupId>
        <artifactId>HikariCP</artifactId>
    </dependency>

    <!-- Spring Boot Actuator -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!-- Micrometer -->
    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>

    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3.2 配置文件

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/test_db?useSSL=false&serverTimezone=UTC
    username: root
    password: 123456
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      pool-name: test-hikari-pool
      maximum-pool-size: 10
      minimum-idle: 5
      idle-timeout: 30000
      max-lifetime: 1800000
      connection-timeout: 30000
      leak-detection-threshold: 60000  # 60秒,检测连接泄漏的阈值

  jpa:
    hibernate:
      ddl-auto: update
    show-sql: true
    properties:
      hibernate.format_sql: true

# 监控配置
management:
  endpoints:
    web:
      exposure:
        include: health,info,prometheus

# 连接泄漏检测配置
connection:
  leak:
    detection:
      enabled: true
      check-interval: 60000  # 检查间隔,单位毫秒
      alert-enabled: true  # 是否启用告警
      notify-url: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key"  # 企业微信告警地址

# Prometheus 配置
micrometer:
  prometheus:
    enabled: true

# 应用配置
server:
  port: 8080

3.3 数据源配置

@Configuration
public class DataSourceConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource() {
        return new HikariDataSource();
    }

}

3.4 连接泄漏检测服务

@Service
@Slf4j
public class ConnectionLeakDetector {

    @Autowired
    private DataSource dataSource;

    @Value("${connection.leak.detection.check-interval:60000}")
    private long checkInterval;

    @Value("${connection.leak.detection.alert-enabled:true}")
    private boolean alertEnabled;

    @Value("${connection.leak.detection.notify-url}")
    private String notifyUrl;

    @PostConstruct
    public void init() {
        // 启动定时任务,检测连接泄漏
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(() -> {
            try {
                detectConnectionLeaks();
            } catch (Exception e) {
                log.error("检测连接泄漏失败", e);
            }
        }, 0, checkInterval, TimeUnit.MILLISECONDS);
    }

    /**
     * 检测连接泄漏
     */
    public void detectConnectionLeaks() {
        log.info("开始检测连接泄漏");

        try {
            // 获取 Hikari 数据源
            if (dataSource instanceof HikariDataSource) {
                HikariDataSource hikariDataSource = (HikariDataSource) dataSource;
                HikariPoolMXBean poolMXBean = hikariDataSource.getHikariPoolMXBean();

                // 获取连接池状态
                int activeConnections = poolMXBean.getActiveConnections();
                int idleConnections = poolMXBean.getIdleConnections();
                int totalConnections = poolMXBean.getTotalConnections();
                int threadsAwaitingConnection = poolMXBean.getThreadsAwaitingConnection();

                log.info("连接池状态: 活跃连接={}, 空闲连接={}, 总连接={}, 等待连接的线程数={}",
                        activeConnections, idleConnections, totalConnections, threadsAwaitingConnection);

                // 检查是否有连接泄漏
                if (activeConnections > 0) {
                    // 这里可以添加更详细的连接泄漏检测逻辑
                    // 例如,检查每个连接的使用时间,是否超过阈值
                    checkConnectionUsageTime();
                }
            }
        } catch (Exception e) {
            log.error("检测连接泄漏失败", e);
        }

        log.info("连接泄漏检测完成");
    }

    /**
     * 检查连接使用时间
     */
    private void checkConnectionUsageTime() {
        // 这里可以实现更详细的连接使用时间检查
        // 例如,通过 AOP 或代理方式,记录每个连接的获取时间和使用时间
        // 当连接使用时间超过阈值时,触发告警
    }

    /**
     * 发送告警
     */
    private void sendAlert(String message) {
        try {
            // 发送告警
            if (alertEnabled && StringUtils.isNotBlank(notifyUrl)) {
                HttpClient client = HttpClient.newHttpClient();
                HttpRequest request = HttpRequest.newBuilder()
                        .uri(URI.create(notifyUrl))
                        .header("Content-Type", "application/json")
                        .POST(HttpRequest.BodyPublishers.ofString("{\"msgtype\":\"text\",\"text\":{\"content\":\"" + message + "\"}}"))
                        .build();
                client.send(request, HttpResponse.BodyHandlers.ofString());
                log.info("告警发送成功");
            } else {
                log.info("告警地址未配置,仅记录日志: {}", message);
            }
        } catch (Exception e) {
            log.error("发送告警失败", e);
        }
    }

}

3.5 连接代理

为了更详细地监控连接的使用情况,我们可以创建一个连接代理,记录连接的获取时间、使用时间和释放时间:

public class ConnectionProxy implements Connection {

    private final Connection delegate;
    private final long acquireTime;
    private final String stackTrace;

    public ConnectionProxy(Connection delegate) {
        this.delegate = delegate;
        this.acquireTime = System.currentTimeMillis();
        this.stackTrace = generateStackTrace();
    }

    /**
     * 生成堆栈跟踪信息
     */
    private String generateStackTrace() {
        StringBuilder sb = new StringBuilder();
        StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
        for (int i = 2; i < stackTraceElements.length; i++) {
            sb.append(stackTraceElements[i]).append("\n");
        }
        return sb.toString();
    }

    /**
     * 获取连接获取时间
     */
    public long getAcquireTime() {
        return acquireTime;
    }

    /**
     * 获取连接使用时间
     */
    public long getUsageTime() {
        return System.currentTimeMillis() - acquireTime;
    }

    /**
     * 获取堆栈跟踪信息
     */
    public String getStackTrace() {
        return stackTrace;
    }

    // 实现 Connection 接口的方法,委托给原始连接
    @Override
    public Statement createStatement() throws SQLException {
        return delegate.createStatement();
    }

    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        return delegate.prepareStatement(sql);
    }

    // 其他方法实现...

    @Override
    public void close() throws SQLException {
        // 记录连接释放时间
        long releaseTime = System.currentTimeMillis();
        long usageTime = releaseTime - acquireTime;
        System.out.println("连接使用时间: " + usageTime + "ms");
        delegate.close();
    }

}

3.6 数据源代理

为了使用连接代理,我们需要创建一个数据源代理,在获取连接时返回连接代理:

public class DataSourceProxy implements DataSource {

    private final DataSource delegate;

    public DataSourceProxy(DataSource delegate) {
        this.delegate = delegate;
    }

    @Override
    public Connection getConnection() throws SQLException {
        Connection connection = delegate.getConnection();
        return new ConnectionProxy(connection);
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        Connection connection = delegate.getConnection(username, password);
        return new ConnectionProxy(connection);
    }

    // 其他方法实现...

}

3.7 数据源配置修改

修改数据源配置,使用数据源代理:

@Configuration
public class DataSourceConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource() {
        HikariDataSource hikariDataSource = new HikariDataSource();
        return new DataSourceProxy(hikariDataSource);
    }

}

3.8 连接泄漏检测增强

增强连接泄漏检测服务,使用连接代理来获取更详细的连接使用信息:

@Service
@Slf4j
public class ConnectionLeakDetector {

    @Autowired
    private DataSource dataSource;

    // 存储所有活跃的连接
    private final Map<Connection, ConnectionProxy> activeConnections = new ConcurrentHashMap<>();

    @Value("${connection.leak.detection.check-interval:60000}")
    private long checkInterval;

    @Value("${connection.leak.detection.alert-enabled:true}")
    private boolean alertEnabled;

    @Value("${connection.leak.detection.notify-url}")
    private String notifyUrl;

    @Value("${spring.datasource.hikari.leak-detection-threshold:60000}")
    private long leakDetectionThreshold;

    @PostConstruct
    public void init() {
        // 启动定时任务,检测连接泄漏
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(() -> {
            try {
                detectConnectionLeaks();
            } catch (Exception e) {
                log.error("检测连接泄漏失败", e);
            }
        }, 0, checkInterval, TimeUnit.MILLISECONDS);
    }

    /**
     * 注册活跃连接
     */
    public void registerConnection(Connection connection) {
        if (connection instanceof ConnectionProxy) {
            activeConnections.put(connection, (ConnectionProxy) connection);
        }
    }

    /**
     * 移除活跃连接
     */
    public void removeConnection(Connection connection) {
        activeConnections.remove(connection);
    }

    /**
     * 检测连接泄漏
     */
    public void detectConnectionLeaks() {
        log.info("开始检测连接泄漏");

        try {
            // 检查活跃连接
            for (Map.Entry<Connection, ConnectionProxy> entry : activeConnections.entrySet()) {
                Connection connection = entry.getKey();
                ConnectionProxy connectionProxy = entry.getValue();
                long usageTime = connectionProxy.getUsageTime();

                // 检查连接使用时间是否超过阈值
                if (usageTime > leakDetectionThreshold) {
                    log.warn("检测到连接泄漏: 使用时间={}ms, 阈值={}ms", usageTime, leakDetectionThreshold);
                    log.warn("连接获取堆栈: {}", connectionProxy.getStackTrace());

                    // 触发告警
                    if (alertEnabled) {
                        String message = String.format("【数据库连接泄漏告警】\n使用时间: %dms\n阈值: %dms\n获取堆栈: %s",
                                usageTime, leakDetectionThreshold, connectionProxy.getStackTrace());
                        sendAlert(message);
                    }

                    // 尝试回收连接
                    try {
                        connection.close();
                        removeConnection(connection);
                        log.info("已回收泄漏的连接");
                    } catch (SQLException e) {
                        log.error("回收连接失败", e);
                    }
                }
            }

            // 获取 Hikari 数据源状态
            if (dataSource instanceof DataSourceProxy) {
                DataSourceProxy dataSourceProxy = (DataSourceProxy) dataSource;
                // 这里可以获取原始数据源并检查状态
            }
        } catch (Exception e) {
            log.error("检测连接泄漏失败", e);
        }

        log.info("连接泄漏检测完成");
    }

    /**
     * 发送告警
     */
    private void sendAlert(String message) {
        try {
            // 发送告警
            if (alertEnabled && StringUtils.isNotBlank(notifyUrl)) {
                HttpClient client = HttpClient.newHttpClient();
                HttpRequest request = HttpRequest.newBuilder()
                        .uri(URI.create(notifyUrl))
                        .header("Content-Type", "application/json")
                        .POST(HttpRequest.BodyPublishers.ofString("{\"msgtype\":\"text\",\"text\":{\"content\":\"" + message + "\"}}"))
                        .build();
                client.send(request, HttpResponse.BodyHandlers.ofString());
                log.info("告警发送成功");
            } else {
                log.info("告警地址未配置,仅记录日志: {}", message);
            }
        } catch (Exception e) {
            log.error("发送告警失败", e);
        }
    }

}

3.9 连接代理修改

修改连接代理,在创建和关闭连接时注册和移除连接:

public class ConnectionProxy implements Connection {

    private final Connection delegate;
    private final long acquireTime;
    private final String stackTrace;
    private final ConnectionLeakDetector leakDetector;

    public ConnectionProxy(Connection delegate, ConnectionLeakDetector leakDetector) {
        this.delegate = delegate;
        this.acquireTime = System.currentTimeMillis();
        this.stackTrace = generateStackTrace();
        this.leakDetector = leakDetector;
        // 注册活跃连接
        leakDetector.registerConnection(this);
    }

    // 其他方法...

    @Override
    public void close() throws SQLException {
        // 记录连接释放时间
        long releaseTime = System.currentTimeMillis();
        long usageTime = releaseTime - acquireTime;
        System.out.println("连接使用时间: " + usageTime + "ms");
        // 移除活跃连接
        leakDetector.removeConnection(this);
        delegate.close();
    }

}

3.10 数据源代理修改

修改数据源代理,在创建连接代理时注入连接泄漏检测器:

public class DataSourceProxy implements DataSource {

    private final DataSource delegate;
    private final ConnectionLeakDetector leakDetector;

    public DataSourceProxy(DataSource delegate, ConnectionLeakDetector leakDetector) {
        this.delegate = delegate;
        this.leakDetector = leakDetector;
    }

    @Override
    public Connection getConnection() throws SQLException {
        Connection connection = delegate.getConnection();
        return new ConnectionProxy(connection, leakDetector);
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        Connection connection = delegate.getConnection(username, password);
        return new ConnectionProxy(connection, leakDetector);
    }

    // 其他方法...

}

3.11 数据源配置修改

修改数据源配置,注入连接泄漏检测器:

@Configuration
public class DataSourceConfig {

    @Bean
    public ConnectionLeakDetector connectionLeakDetector() {
        return new ConnectionLeakDetector();
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource(ConnectionLeakDetector connectionLeakDetector) {
        HikariDataSource hikariDataSource = new HikariDataSource();
        return new DataSourceProxy(hikariDataSource, connectionLeakDetector);
    }

}

四、自动回收实现

4.1 连接回收服务

@Service
@Slf4j
public class ConnectionRecycler {

    @Autowired
    private ConnectionLeakDetector connectionLeakDetector;

    @Value("${connection.leak.detection.check-interval:60000}")
    private long checkInterval;

    @PostConstruct
    public void init() {
        // 启动定时任务,回收泄漏的连接
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(() -> {
            try {
                recycleConnections();
            } catch (Exception e) {
                log.error("回收连接失败", e);
            }
        }, 0, checkInterval, TimeUnit.MILLISECONDS);
    }

    /**
     * 回收泄漏的连接
     */
    public void recycleConnections() {
        log.info("开始回收泄漏的连接");

        try {
            // 调用连接泄漏检测器检测并回收连接
            connectionLeakDetector.detectConnectionLeaks();
        } catch (Exception e) {
            log.error("回收连接失败", e);
        }

        log.info("连接回收完成");
    }

}

4.2 监控指标

@Component
public class ConnectionMetrics {

    @Autowired
    private MeterRegistry meterRegistry;

    private Counter connectionAcquireCounter;
    private Counter connectionReleaseCounter;
    private Counter connectionLeakCounter;
    private Gauge activeConnectionsGauge;

    @PostConstruct
    public void init() {
        // 初始化连接获取计数器
        connectionAcquireCounter = Counter.builder("connection.acquire.count")
                .description("数据库连接获取数量")
                .tag("type", "connection")
                .register(meterRegistry);

        // 初始化连接释放计数器
        connectionReleaseCounter = Counter.builder("connection.release.count")
                .description("数据库连接释放数量")
                .tag("type", "connection")
                .register(meterRegistry);

        // 初始化连接泄漏计数器
        connectionLeakCounter = Counter.builder("connection.leak.count")
                .description("数据库连接泄漏数量")
                .tag("type", "connection")
                .register(meterRegistry);

        // 初始化活跃连接 gauge
        activeConnectionsGauge = Gauge.builder("connection.active.count", () -> {
            // 这里可以返回当前活跃连接数
            return 0;
        })
                .description("当前活跃连接数")
                .tag("type", "connection")
                .register(meterRegistry);
    }

    /**
     * 增加连接获取计数
     */
    public void incrementConnectionAcquireCount() {
        connectionAcquireCounter.increment();
    }

    /**
     * 增加连接释放计数
     */
    public void incrementConnectionReleaseCount() {
        connectionReleaseCounter.increment();
    }

    /**
     * 增加连接泄漏计数
     */
    public void incrementConnectionLeakCount() {
        connectionLeakCounter.increment();
    }

    /**
     * 更新活跃连接数
     */
    public void updateActiveConnectionsCount(int count) {
        // 这里可以更新活跃连接数
    }

}

五、实战案例

5.1 业务场景

假设我们有一个电商系统,需要处理用户的订单和支付请求。系统使用 Spring Boot + JPA + MySQL 构建,使用 HikariCP 作为数据库连接池。

5.2 实现方案

  1. 配置连接池:配置 HikariCP,设置连接池大小、空闲超时时间和泄漏检测阈值
  2. 创建连接代理:创建连接代理,记录连接的获取时间、使用时间和释放时间
  3. 创建数据源代理:创建数据源代理,在获取连接时返回连接代理
  4. 实现连接泄漏检测:实现连接泄漏检测服务,定期检测连接泄漏
  5. 实现自动回收:实现连接回收服务,自动回收泄漏的连接
  6. 实现告警机制:当检测到连接泄漏时,触发告警通知

5.3 代码实现

5.3.1 数据源配置

@Configuration
public class DataSourceConfig {

    @Bean
    public ConnectionLeakDetector connectionLeakDetector() {
        return new ConnectionLeakDetector();
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource(ConnectionLeakDetector connectionLeakDetector) {
        HikariDataSource hikariDataSource = new HikariDataSource();
        return new DataSourceProxy(hikariDataSource, connectionLeakDetector);
    }

}

5.3.2 连接代理

public class ConnectionProxy implements Connection {

    private final Connection delegate;
    private final long acquireTime;
    private final String stackTrace;
    private final ConnectionLeakDetector leakDetector;
    private final ConnectionMetrics metrics;

    public ConnectionProxy(Connection delegate, ConnectionLeakDetector leakDetector, ConnectionMetrics metrics) {
        this.delegate = delegate;
        this.acquireTime = System.currentTimeMillis();
        this.stackTrace = generateStackTrace();
        this.leakDetector = leakDetector;
        this.metrics = metrics;
        // 注册活跃连接
        leakDetector.registerConnection(this);
        // 增加连接获取计数
        metrics.incrementConnectionAcquireCount();
    }

    /**
     * 生成堆栈跟踪信息
     */
    private String generateStackTrace() {
        StringBuilder sb = new StringBuilder();
        StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
        for (int i = 2; i < stackTraceElements.length; i++) {
            sb.append(stackTraceElements[i]).append("\n");
        }
        return sb.toString();
    }

    /**
     * 获取连接获取时间
     */
    public long getAcquireTime() {
        return acquireTime;
    }

    /**
     * 获取连接使用时间
     */
    public long getUsageTime() {
        return System.currentTimeMillis() - acquireTime;
    }

    /**
     * 获取堆栈跟踪信息
     */
    public String getStackTrace() {
        return stackTrace;
    }

    // 实现 Connection 接口的方法,委托给原始连接
    @Override
    public Statement createStatement() throws SQLException {
        return delegate.createStatement();
    }

    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        return delegate.prepareStatement(sql);
    }

    // 其他方法实现...

    @Override
    public void close() throws SQLException {
        // 记录连接释放时间
        long releaseTime = System.currentTimeMillis();
        long usageTime = releaseTime - acquireTime;
        System.out.println("连接使用时间: " + usageTime + "ms");
        // 移除活跃连接
        leakDetector.removeConnection(this);
        // 增加连接释放计数
        metrics.incrementConnectionReleaseCount();
        delegate.close();
    }

}

5.3.3 数据源代理

public class DataSourceProxy implements DataSource {

    private final DataSource delegate;
    private final ConnectionLeakDetector leakDetector;
    private final ConnectionMetrics metrics;

    public DataSourceProxy(DataSource delegate, ConnectionLeakDetector leakDetector, ConnectionMetrics metrics) {
        this.delegate = delegate;
        this.leakDetector = leakDetector;
        this.metrics = metrics;
    }

    @Override
    public Connection getConnection() throws SQLException {
        Connection connection = delegate.getConnection();
        return new ConnectionProxy(connection, leakDetector, metrics);
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        Connection connection = delegate.getConnection(username, password);
        return new ConnectionProxy(connection, leakDetector, metrics);
    }

    // 其他方法实现...

}

5.3.4 连接泄漏检测服务

@Service
@Slf4j
public class ConnectionLeakDetector {

    @Autowired
    private DataSource dataSource;

    @Autowired
    private ConnectionMetrics metrics;

    // 存储所有活跃的连接
    private final Map<Connection, ConnectionProxy> activeConnections = new ConcurrentHashMap<>();

    @Value("${connection.leak.detection.check-interval:60000}")
    private long checkInterval;

    @Value("${connection.leak.detection.alert-enabled:true}")
    private boolean alertEnabled;

    @Value("${connection.leak.detection.notify-url}")
    private String notifyUrl;

    @Value("${spring.datasource.hikari.leak-detection-threshold:60000}")
    private long leakDetectionThreshold;

    @PostConstruct
    public void init() {
        // 启动定时任务,检测连接泄漏
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(() -> {
            try {
                detectConnectionLeaks();
            } catch (Exception e) {
                log.error("检测连接泄漏失败", e);
            }
        }, 0, checkInterval, TimeUnit.MILLISECONDS);
    }

    /**
     * 注册活跃连接
     */
    public void registerConnection(Connection connection) {
        if (connection instanceof ConnectionProxy) {
            activeConnections.put(connection, (ConnectionProxy) connection);
            // 更新活跃连接数
            metrics.updateActiveConnectionsCount(activeConnections.size());
        }
    }

    /**
     * 移除活跃连接
     */
    public void removeConnection(Connection connection) {
        activeConnections.remove(connection);
        // 更新活跃连接数
        metrics.updateActiveConnectionsCount(activeConnections.size());
    }

    /**
     * 检测连接泄漏
     */
    public void detectConnectionLeaks() {
        log.info("开始检测连接泄漏");

        try {
            // 检查活跃连接
            for (Map.Entry<Connection, ConnectionProxy> entry : activeConnections.entrySet()) {
                Connection connection = entry.getKey();
                ConnectionProxy connectionProxy = entry.getValue();
                long usageTime = connectionProxy.getUsageTime();

                // 检查连接使用时间是否超过阈值
                if (usageTime > leakDetectionThreshold) {
                    log.warn("检测到连接泄漏: 使用时间={}ms, 阈值={}ms", usageTime, leakDetectionThreshold);
                    log.warn("连接获取堆栈: {}", connectionProxy.getStackTrace());

                    // 增加连接泄漏计数
                    metrics.incrementConnectionLeakCount();

                    // 触发告警
                    if (alertEnabled) {
                        String message = String.format("【数据库连接泄漏告警】\n使用时间: %dms\n阈值: %dms\n获取堆栈: %s",
                                usageTime, leakDetectionThreshold, connectionProxy.getStackTrace());
                        sendAlert(message);
                    }

                    // 尝试回收连接
                    try {
                        connection.close();
                        removeConnection(connection);
                        log.info("已回收泄漏的连接");
                    } catch (SQLException e) {
                        log.error("回收连接失败", e);
                    }
                }
            }

            // 获取 Hikari 数据源状态
            if (dataSource instanceof DataSourceProxy) {
                DataSourceProxy dataSourceProxy = (DataSourceProxy) dataSource;
                // 这里可以获取原始数据源并检查状态
            }
        } catch (Exception e) {
            log.error("检测连接泄漏失败", e);
        }

        log.info("连接泄漏检测完成");
    }

    /**
     * 发送告警
     */
    private void sendAlert(String message) {
        try {
            // 发送告警
            if (alertEnabled && StringUtils.isNotBlank(notifyUrl)) {
                HttpClient client = HttpClient.newHttpClient();
                HttpRequest request = HttpRequest.newBuilder()
                        .uri(URI.create(notifyUrl))
                        .header("Content-Type", "application/json")
                        .POST(HttpRequest.BodyPublishers.ofString("{\"msgtype\":\"text\",\"text\":{\"content\":\"" + message + "\"}}"))
                        .build();
                client.send(request, HttpResponse.BodyHandlers.ofString());
                log.info("告警发送成功");
            } else {
                log.info("告警地址未配置,仅记录日志: {}", message);
            }
        } catch (Exception e) {
            log.error("发送告警失败", e);
        }
    }

}

5.3.5 连接回收服务

@Service
@Slf4j
public class ConnectionRecycler {

    @Autowired
    private ConnectionLeakDetector connectionLeakDetector;

    @Value("${connection.leak.detection.check-interval:60000}")
    private long checkInterval;

    @PostConstruct
    public void init() {
        // 启动定时任务,回收泄漏的连接
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(() -> {
            try {
                recycleConnections();
            } catch (Exception e) {
                log.error("回收连接失败", e);
            }
        }, 0, checkInterval, TimeUnit.MILLISECONDS);
    }

    /**
     * 回收泄漏的连接
     */
    public void recycleConnections() {
        log.info("开始回收泄漏的连接");

        try {
            // 调用连接泄漏检测器检测并回收连接
            connectionLeakDetector.detectConnectionLeaks();
        } catch (Exception e) {
            log.error("回收连接失败", e);
        }

        log.info("连接回收完成");
    }

}

5.3.6 监控指标

@Component
public class ConnectionMetrics {

    @Autowired
    private MeterRegistry meterRegistry;

    private Counter connectionAcquireCounter;
    private Counter connectionReleaseCounter;
    private Counter connectionLeakCounter;
    private AtomicInteger activeConnectionsCount = new AtomicInteger(0);

    @PostConstruct
    public void init() {
        // 初始化连接获取计数器
        connectionAcquireCounter = Counter.builder("connection.acquire.count")
                .description("数据库连接获取数量")
                .tag("type", "connection")
                .register(meterRegistry);

        // 初始化连接释放计数器
        connectionReleaseCounter = Counter.builder("connection.release.count")
                .description("数据库连接释放数量")
                .tag("type", "connection")
                .register(meterRegistry);

        // 初始化连接泄漏计数器
        connectionLeakCounter = Counter.builder("connection.leak.count")
                .description("数据库连接泄漏数量")
                .tag("type", "connection")
                .register(meterRegistry);

        // 初始化活跃连接 gauge
        Gauge.builder("connection.active.count", activeConnectionsCount, AtomicInteger::get)
                .description("当前活跃连接数")
                .tag("type", "connection")
                .register(meterRegistry);
    }

    /**
     * 增加连接获取计数
     */
    public void incrementConnectionAcquireCount() {
        connectionAcquireCounter.increment();
    }

    /**
     * 增加连接释放计数
     */
    public void incrementConnectionReleaseCount() {
        connectionReleaseCounter.increment();
    }

    /**
     * 增加连接泄漏计数
     */
    public void incrementConnectionLeakCount() {
        connectionLeakCounter.increment();
    }

    /**
     * 更新活跃连接数
     */
    public void updateActiveConnectionsCount(int count) {
        activeConnectionsCount.set(count);
    }

}

5.4 测试场景

5.4.1 测试连接泄漏

  1. 创建一个测试接口,故意不关闭数据库连接:
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private DataSource dataSource;

    @GetMapping("/leak")
    public String testConnectionLeak() throws SQLException {
        // 获取连接但不关闭
        Connection connection = dataSource.getConnection();
        System.out.println("获取连接: " + connection);
        // 这里故意不关闭连接,模拟连接泄漏
        return "测试连接泄漏";
    }

}
  1. 调用测试接口,然后查看日志,确认连接泄漏被检测到并回收:
# 调用测试接口
curl http://localhost:8080/test/leak

# 查看日志
# 应该看到连接泄漏被检测到,并且触发告警

5.4.2 测试正常连接使用

  1. 创建一个测试接口,正常使用并关闭数据库连接:
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private DataSource dataSource;

    @GetMapping("/normal")
    public String testNormalConnection() throws SQLException {
        // 获取连接
        Connection connection = dataSource.getConnection();
        System.out.println("获取连接: " + connection);
        // 使用连接
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery("SELECT 1");
        if (resultSet.next()) {
            System.out.println("查询结果: " + resultSet.getInt(1));
        }
        // 关闭连接
        resultSet.close();
        statement.close();
        connection.close();
        System.out.println("关闭连接: " + connection);
        return "测试正常连接使用";
    }

}
  1. 调用测试接口,然后查看日志,确认连接正常关闭:
# 调用测试接口
curl http://localhost:8080/test/normal

# 查看日志
# 应该看到连接正常获取、使用和关闭

六、最佳实践

6.1 连接池配置最佳实践

原则

  • 合理设置连接池大小:根据系统的并发量和数据库的处理能力,设置合理的连接池大小
  • 设置空闲超时时间:设置合理的空闲超时时间,避免连接长时间空闲
  • 设置最大生命周期:设置连接的最大生命周期,避免连接老化
  • 启用泄漏检测:启用连接泄漏检测,及时发现连接泄漏问题

建议

  • 连接池大小:一般设置为 CPU 核心数的 2-4 倍
  • 空闲超时时间:一般设置为 30-60 秒
  • 最大生命周期:一般设置为 1-3 小时
  • 泄漏检测阈值:一般设置为 60 秒

6.2 连接使用最佳实践

原则

  • 使用 try-with-resources:使用 try-with-resources 语句,确保连接自动关闭
  • 避免长时间占用连接:避免在连接上执行长时间运行的操作
  • 及时释放连接:使用完毕后,及时释放连接
  • 处理异常:正确处理异常,确保连接在异常情况下也能被关闭

建议

  • 使用 try-with-resources 语句来管理连接、语句和结果集
  • 将数据库操作封装在事务中,确保操作的原子性
  • 避免在循环中获取和释放连接,应该在循环外获取连接,循环内使用,循环外释放

6.3 连接泄漏检测最佳实践

原则

  • 定期检测:定期检测连接泄漏,及时发现问题
  • 详细记录:记录连接的获取位置和使用时间,便于定位泄漏点
  • 自动回收:当检测到连接泄漏时,自动回收泄漏的连接
  • 及时告警:当检测到连接泄漏时,及时触发告警,通知运维人员

建议

  • 检测间隔:一般设置为 60 秒
  • 告警方式:使用企业微信、钉钉等渠道发送告警
  • 告警级别:根据连接泄漏的严重程度,设置不同的告警级别

6.4 监控最佳实践

原则

  • 实时监控:实时监控连接池的状态和连接的使用情况
  • 可视化:使用 Prometheus + Grafana 等工具,可视化监控数据
  • 告警阈值:设置合理的告警阈值,避免误报
  • 历史数据:存储历史监控数据,便于分析趋势

建议

  • 监控指标:活跃连接数、空闲连接数、总连接数、等待连接的线程数、连接获取时间、连接使用时间等
  • 告警阈值:当活跃连接数超过连接池大小的 80% 时,触发告警
  • 监控频率:一般设置为 15-60 秒

七、总结

数据库连接泄漏是一个常见的问题,它会导致连接池中的连接被耗尽,最终影响应用程序的性能和可用性。通过本文的实现方案,开发者可以构建一个可靠的数据库连接管理系统,及时发现和处理连接泄漏问题,提高应用程序的稳定性和可靠性。

互动话题

  1. 你在实际项目中遇到过数据库连接泄漏的问题吗?是如何解决的?
  2. 你认为连接池配置的最佳实践是什么?
  3. 你有使用过其他数据库连接泄漏检测的解决方案吗?

欢迎在评论区留言讨论!更多技术文章,欢迎关注公众号:服务端技术精选


标题:SpringBoot + 数据库连接泄漏检测 + 自动回收:连接未关闭?自动追踪泄漏点并告警
作者:jiangyi
地址:http://jiangyi.space/articles/2026/04/12/1775889972872.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消