SpringBoot + Redis 大 Key 拆分 + 自动检测告警:单个 Key 超 10MB?自动拆分防阻塞

前言

在使用 Redis 时,大 Key 是一个常见的性能瓶颈。大 Key 指的是占用内存较大的键值对,通常指单个 Key 大小超过 10MB 的情况。大 Key 会带来一系列问题:

  1. 内存占用:大 Key 会占用大量内存,导致内存使用不均衡
  2. 网络传输:大 Key 会增加网络传输时间,影响系统响应速度
  3. 阻塞操作:对大 Key 进行操作时,会阻塞 Redis 服务器,影响其他操作
  4. 过期删除:大 Key 过期时,Redis 会进行同步删除,可能导致服务卡顿

想象一下这样的场景:你的应用在高峰期突然变得响应缓慢,甚至出现服务不可用的情况。通过监控发现,Redis 服务器 CPU 使用率突然飙升,内存使用异常。经过排查,发现是某个 Key 的大小超过了 10MB,导致 Redis 服务器在处理这个 Key 时被阻塞。

如何解决这个问题? 本文将详细介绍如何在 Spring Boot 中实现 Redis 大 Key 的自动检测、拆分和告警,帮助你避免大 Key 带来的性能问题。

一、核心概念

1.1 大 Key

大 Key 是指占用内存较大的键值对,通常有以下几种类型:

  • String 类型:单个字符串值很大,如存储序列化后的大对象
  • List 类型:列表中元素数量过多
  • Hash 类型:哈希表中字段数量过多
  • Set 类型:集合中元素数量过多
  • ZSet 类型:有序集合中元素数量过多

1.2 大 Key 的危害

  • 内存占用不均衡:大 Key 会导致 Redis 实例内存使用不均衡,影响内存管理
  • 网络传输缓慢:大 Key 会增加网络传输时间,影响系统响应速度
  • 阻塞操作:对大 Key 进行操作时,会阻塞 Redis 服务器,影响其他操作
  • 过期删除卡顿:大 Key 过期时,Redis 会进行同步删除,可能导致服务卡顿
  • 主从复制延迟:大 Key 会增加主从复制的时间,导致复制延迟

1.3 大 Key 拆分策略

针对不同类型的大 Key,有不同的拆分策略:

  • String 类型:将大字符串拆分为多个小字符串,使用前缀 + 序号的方式存储
  • List 类型:将大列表拆分为多个小列表,使用前缀 + 序号的方式存储
  • Hash 类型:根据哈希字段的特性,将哈希表拆分为多个小哈希表
  • Set 类型:将大集合拆分为多个小集合
  • ZSet 类型:将大有序集合拆分为多个小有序集合

1.4 大 Key 检测方法

  • Redis 命令:使用 DEBUG OBJECT 命令查看 Key 的大小
  • Redis 监控:使用 Redis 的监控工具,如 Redis Sentinel、Redis Cluster
  • 第三方工具:使用 Redis RDB 分析工具,如 redis-rdb-tools
  • 自定义脚本:编写脚本定期检测 Key 的大小

1.5 告警机制

  • 阈值告警:当 Key 大小超过阈值时,触发告警
  • 趋势告警:当 Key 大小增长过快时,触发告警
  • 自动处理:当检测到大 Key 时,自动进行拆分处理

二、技术方案

2.1 架构设计

Redis 大 Key 拆分和自动检测告警的架构设计主要包括以下几个部分:

  1. 数据层:Redis 存储层,负责存储数据
  2. 服务层:Spring Boot 应用层,负责业务逻辑和大 Key 处理
  3. 监控层:监控 Redis Key 大小,检测大 Key
  4. 告警层:当检测到大 Key 时,触发告警
  5. 处理层:当检测到大 Key 时,自动进行拆分处理

2.2 技术选型

  • Spring Boot:作为基础框架,提供依赖注入、配置管理等功能
  • Spring Data Redis:用于操作 Redis
  • Lettuce:Redis 客户端,提供异步操作能力
  • Redisson:Redis 客户端,提供分布式锁、集合等高级功能
  • ScheduledExecutorService:用于定期检测大 Key
  • Spring Boot Actuator:用于暴露监控端点
  • Prometheus:用于监控系统指标
  • Grafana:用于可视化监控数据
  • 企业微信/钉钉:用于发送告警通知

2.3 核心流程

  1. 数据存储:应用通过 Spring Data Redis 操作 Redis,存储数据
  2. 大 Key 检测:定期检测 Redis Key 的大小,发现大 Key
  3. 告警触发:当检测到大 Key 时,触发告警通知
  4. 自动拆分:当检测到大 Key 时,自动进行拆分处理
  5. 数据访问:应用通过封装的接口访问拆分后的数据

三、Spring Boot Redis 大 Key 拆分实现

3.1 依赖配置

<dependencies>
    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Data Redis -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <!-- Redisson -->
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson-spring-boot-starter</artifactId>
        <version>3.17.7</version>
    </dependency>

    <!-- Spring Boot Actuator -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!-- Micrometer -->
    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>

    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3.2 配置文件

spring:
  redis:
    host: localhost
    port: 6379
    password:
    database: 0
    timeout: 10000
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0

# 大 Key 配置
redis:
  big-key:
    threshold: 10485760  # 10MB,单位字节
    check-interval: 60000  # 检查间隔,单位毫秒
    split-enabled: true  # 是否启用自动拆分
    alert-enabled: true  # 是否启用告警
    alert-threshold: 5242880  # 告警阈值,5MB
    notify-url: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key"  # 企业微信告警地址

# 监控配置
management:
  endpoints:
    web:
      exposure:
        include: health,info,prometheus

3.3 大 Key 拆分工具类

@Slf4j
@Component
public class RedisBigKeySplitter {

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 拆分 String 类型的大 Key
     */
    public void splitStringKey(String key, byte[] value) {
        log.info("开始拆分 String 类型的大 Key: {}", key);

        // 计算拆分数量
        int chunkSize = 1024 * 1024; // 1MB 每块
        int totalChunks = (value.length + chunkSize - 1) / chunkSize;

        // 存储拆分后的数据
        for (int i = 0; i < totalChunks; i++) {
            int start = i * chunkSize;
            int end = Math.min(start + chunkSize, value.length);
            byte[] chunk = Arrays.copyOfRange(value, start, end);
            redisTemplate.opsForValue().set(key + ":chunk:" + i, new String(chunk, StandardCharsets.UTF_8));
        }

        // 存储元数据
        redisTemplate.opsForHash().put(key + ":meta", "type", "string");
        redisTemplate.opsForHash().put(key + ":meta", "chunks", String.valueOf(totalChunks));
        redisTemplate.opsForHash().put(key + ":meta", "originalSize", String.valueOf(value.length));

        // 删除原始大 Key
        redisTemplate.delete(key);

        log.info("String 类型的大 Key 拆分完成: {}, 拆分为 {} 块", key, totalChunks);
    }

    /**
     * 拆分 List 类型的大 Key
     */
    public void splitListKey(String key, List<String> values) {
        log.info("开始拆分 List 类型的大 Key: {}", key);

        // 计算拆分数量
        int chunkSize = 1000; // 每块 1000 个元素
        int totalChunks = (values.size() + chunkSize - 1) / chunkSize;

        // 存储拆分后的数据
        for (int i = 0; i < totalChunks; i++) {
            int start = i * chunkSize;
            int end = Math.min(start + chunkSize, values.size());
            List<String> chunk = values.subList(start, end);
            for (String value : chunk) {
                redisTemplate.opsForList().rightPush(key + ":chunk:" + i, value);
            }
        }

        // 存储元数据
        redisTemplate.opsForHash().put(key + ":meta", "type", "list");
        redisTemplate.opsForHash().put(key + ":meta", "chunks", String.valueOf(totalChunks));
        redisTemplate.opsForHash().put(key + ":meta", "originalSize", String.valueOf(values.size()));

        // 删除原始大 Key
        redisTemplate.delete(key);

        log.info("List 类型的大 Key 拆分完成: {}, 拆分为 {} 块", key, totalChunks);
    }

    /**
     * 拆分 Hash 类型的大 Key
     */
    public void splitHashKey(String key, Map<String, String> values) {
        log.info("开始拆分 Hash 类型的大 Key: {}", key);

        // 计算拆分数量
        int chunkSize = 1000; // 每块 1000 个字段
        List<Map<String, String>> chunks = new ArrayList<>();
        Map<String, String> currentChunk = new HashMap<>();

        for (Map.Entry<String, String> entry : values.entrySet()) {
            if (currentChunk.size() >= chunkSize) {
                chunks.add(currentChunk);
                currentChunk = new HashMap<>();
            }
            currentChunk.put(entry.getKey(), entry.getValue());
        }
        if (!currentChunk.isEmpty()) {
            chunks.add(currentChunk);
        }

        // 存储拆分后的数据
        for (int i = 0; i < chunks.size(); i++) {
            Map<String, String> chunk = chunks.get(i);
            redisTemplate.opsForHash().putAll(key + ":chunk:" + i, chunk);
        }

        // 存储元数据
        redisTemplate.opsForHash().put(key + ":meta", "type", "hash");
        redisTemplate.opsForHash().put(key + ":meta", "chunks", String.valueOf(chunks.size()));
        redisTemplate.opsForHash().put(key + ":meta", "originalSize", String.valueOf(values.size()));

        // 删除原始大 Key
        redisTemplate.delete(key);

        log.info("Hash 类型的大 Key 拆分完成: {}, 拆分为 {} 块", key, chunks.size());
    }

    /**
     * 拆分 Set 类型的大 Key
     */
    public void splitSetKey(String key, Set<String> values) {
        log.info("开始拆分 Set 类型的大 Key: {}", key);

        // 计算拆分数量
        int chunkSize = 1000; // 每块 1000 个元素
        List<Set<String>> chunks = new ArrayList<>();
        Set<String> currentChunk = new HashSet<>();

        for (String value : values) {
            if (currentChunk.size() >= chunkSize) {
                chunks.add(currentChunk);
                currentChunk = new HashSet<>();
            }
            currentChunk.add(value);
        }
        if (!currentChunk.isEmpty()) {
            chunks.add(currentChunk);
        }

        // 存储拆分后的数据
        for (int i = 0; i < chunks.size(); i++) {
            Set<String> chunk = chunks.get(i);
            for (String value : chunk) {
                redisTemplate.opsForSet().add(key + ":chunk:" + i, value);
            }
        }

        // 存储元数据
        redisTemplate.opsForHash().put(key + ":meta", "type", "set");
        redisTemplate.opsForHash().put(key + ":meta", "chunks", String.valueOf(chunks.size()));
        redisTemplate.opsForHash().put(key + ":meta", "originalSize", String.valueOf(values.size()));

        // 删除原始大 Key
        redisTemplate.delete(key);

        log.info("Set 类型的大 Key 拆分完成: {}, 拆分为 {} 块", key, chunks.size());
    }

    /**
     * 拆分 ZSet 类型的大 Key
     */
    public void splitZSetKey(String key, Set<ZSetOperations.TypedTuple<String>> values) {
        log.info("开始拆分 ZSet 类型的大 Key: {}", key);

        // 计算拆分数量
        int chunkSize = 1000; // 每块 1000 个元素
        List<Set<ZSetOperations.TypedTuple<String>>> chunks = new ArrayList<>();
        Set<ZSetOperations.TypedTuple<String>> currentChunk = new HashSet<>();

        for (ZSetOperations.TypedTuple<String> value : values) {
            if (currentChunk.size() >= chunkSize) {
                chunks.add(currentChunk);
                currentChunk = new HashSet<>();
            }
            currentChunk.add(value);
        }
        if (!currentChunk.isEmpty()) {
            chunks.add(currentChunk);
        }

        // 存储拆分后的数据
        for (int i = 0; i < chunks.size(); i++) {
            Set<ZSetOperations.TypedTuple<String>> chunk = chunks.get(i);
            redisTemplate.opsForZSet().add(key + ":chunk:" + i, chunk);
        }

        // 存储元数据
        redisTemplate.opsForHash().put(key + ":meta", "type", "zset");
        redisTemplate.opsForHash().put(key + ":meta", "chunks", String.valueOf(chunks.size()));
        redisTemplate.opsForHash().put(key + ":meta", "originalSize", String.valueOf(values.size()));

        // 删除原始大 Key
        redisTemplate.delete(key);

        log.info("ZSet 类型的大 Key 拆分完成: {}, 拆分为 {} 块", key, chunks.size());
    }

}

3.4 大 Key 检测服务

@Service
@Slf4j
public class RedisBigKeyDetector {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private RedisBigKeySplitter redisBigKeySplitter;

    @Value("${redis.big-key.threshold:10485760}")
    private long threshold;

    @Value("${redis.big-key.alert-threshold:5242880}")
    private long alertThreshold;

    @Value("${redis.big-key.split-enabled:true}")
    private boolean splitEnabled;

    @Value("${redis.big-key.alert-enabled:true}")
    private boolean alertEnabled;

    @Value("${redis.big-key.notify-url}")
    private String notifyUrl;

    /**
     * 检测大 Key
     */
    public void detectBigKeys() {
        log.info("开始检测大 Key");

        try {
            // 获取所有 Key
            Set<String> keys = redisTemplate.keys("*");
            if (keys == null || keys.isEmpty()) {
                log.info("没有找到 Key");
                return;
            }

            for (String key : keys) {
                // 跳过元数据 Key
                if (key.endsWith(":meta")) {
                    continue;
                }

                // 跳过拆分后的 Key
                if (key.contains(":chunk:")) {
                    continue;
                }

                // 获取 Key 的大小
                long size = getKeySize(key);
                log.info("Key: {}, 大小: {} bytes", key, size);

                // 检测是否为大 Key
                if (size > threshold) {
                    log.warn("检测到大 Key: {}, 大小: {} bytes", key, size);

                    // 触发告警
                    if (alertEnabled) {
                        sendAlert(key, size);
                    }

                    // 自动拆分
                    if (splitEnabled) {
                        splitBigKey(key, size);
                    }
                } else if (size > alertThreshold) {
                    log.warn("Key 接近阈值: {}, 大小: {} bytes", key, size);

                    // 触发告警
                    if (alertEnabled) {
                        sendAlert(key, size);
                    }
                }
            }
        } catch (Exception e) {
            log.error("检测大 Key 失败", e);
        }

        log.info("大 Key 检测完成");
    }

    /**
     * 获取 Key 的大小
     */
    private long getKeySize(String key) {
        try {
            // 获取 Key 的类型
            String type = redisTemplate.type(key).getCode();

            switch (type) {
                case "string":
                    return redisTemplate.opsForValue().get(key) != null ? redisTemplate.opsForValue().get(key).toString().getBytes(StandardCharsets.UTF_8).length : 0;
                case "list":
                    return redisTemplate.opsForList().size(key) * 100; // 估算大小
                case "hash":
                    return redisTemplate.opsForHash().size(key) * 100; // 估算大小
                case "set":
                    return redisTemplate.opsForSet().size(key) * 100; // 估算大小
                case "zset":
                    return redisTemplate.opsForZSet().size(key) * 100; // 估算大小
                default:
                    return 0;
            }
        } catch (Exception e) {
            log.error("获取 Key 大小失败: {}", key, e);
            return 0;
        }
    }

    /**
     * 拆分大 Key
     */
    private void splitBigKey(String key, long size) {
        try {
            // 获取 Key 的类型
            String type = redisTemplate.type(key).getCode();

            switch (type) {
                case "string":
                    String value = (String) redisTemplate.opsForValue().get(key);
                    if (value != null) {
                        redisBigKeySplitter.splitStringKey(key, value.getBytes(StandardCharsets.UTF_8));
                    }
                    break;
                case "list":
                    List<String> listValues = redisTemplate.opsForList().range(key, 0, -1);
                    if (listValues != null) {
                        redisBigKeySplitter.splitListKey(key, listValues);
                    }
                    break;
                case "hash":
                    Map<Object, Object> hashValues = redisTemplate.opsForHash().entries(key);
                    if (hashValues != null) {
                        Map<String, String> stringHashValues = new HashMap<>();
                        for (Map.Entry<Object, Object> entry : hashValues.entrySet()) {
                            stringHashValues.put(entry.getKey().toString(), entry.getValue().toString());
                        }
                        redisBigKeySplitter.splitHashKey(key, stringHashValues);
                    }
                    break;
                case "set":
                    Set<Object> setValues = redisTemplate.opsForSet().members(key);
                    if (setValues != null) {
                        Set<String> stringSetValues = new HashSet<>();
                        for (Object value1 : setValues) {
                            stringSetValues.add(value1.toString());
                        }
                        redisBigKeySplitter.splitSetKey(key, stringSetValues);
                    }
                    break;
                case "zset":
                    Set<ZSetOperations.TypedTuple<Object>> zsetValues = redisTemplate.opsForZSet().rangeWithScores(key, 0, -1);
                    if (zsetValues != null) {
                        Set<ZSetOperations.TypedTuple<String>> stringZSetValues = new HashSet<>();
                        for (ZSetOperations.TypedTuple<Object> value1 : zsetValues) {
                            stringZSetValues.add(new DefaultTypedTuple<>(value1.getValue().toString(), value1.getScore()));
                        }
                        redisBigKeySplitter.splitZSetKey(key, stringZSetValues);
                    }
                    break;
                default:
                    log.warn("不支持的 Key 类型: {}", type);
            }
        } catch (Exception e) {
            log.error("拆分大 Key 失败: {}", key, e);
        }
    }

    /**
     * 发送告警
     */
    private void sendAlert(String key, long size) {
        try {
            // 构建告警消息
            String message = String.format("【Redis 大 Key 告警】\nKey: %s\n大小: %s bytes (%.2f MB)\n阈值: %s bytes (%.2f MB)",
                    key, size, size / (1024.0 * 1024.0), threshold, threshold / (1024.0 * 1024.0));

            // 发送告警
            if (StringUtils.isNotBlank(notifyUrl)) {
                HttpClient client = HttpClient.newHttpClient();
                HttpRequest request = HttpRequest.newBuilder()
                        .uri(URI.create(notifyUrl))
                        .header("Content-Type", "application/json")
                        .POST(HttpRequest.BodyPublishers.ofString("{\"msgtype\":\"text\",\"text\":{\"content\":\"" + message + "\"}}"))
                        .build();
                client.send(request, HttpResponse.BodyHandlers.ofString());
                log.info("告警发送成功: {}", key);
            } else {
                log.info("告警地址未配置,仅记录日志: {}", message);
            }
        } catch (Exception e) {
            log.error("发送告警失败: {}", key, e);
        }
    }

}

3.5 定时任务

@Component
public class RedisBigKeyCheckTask {

    @Autowired
    private RedisBigKeyDetector redisBigKeyDetector;

    @Value("${redis.big-key.check-interval:60000}")
    private long checkInterval;

    @PostConstruct
    public void init() {
        // 启动定时任务
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(() -> {
            try {
                redisBigKeyDetector.detectBigKeys();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, 0, checkInterval, TimeUnit.MILLISECONDS);
    }

}

3.6 大 Key 访问封装

@Service
public class RedisBigKeyService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 获取 String 类型的值
     */
    public String getStringValue(String key) {
        // 检查是否为拆分后的 Key
        if (redisTemplate.hasKey(key + ":meta")) {
            // 获取元数据
            String type = (String) redisTemplate.opsForHash().get(key + ":meta", "type");
            if ("string".equals(type)) {
                // 获取拆分块数
                String chunksStr = (String) redisTemplate.opsForHash().get(key + ":meta", "chunks");
                int chunks = Integer.parseInt(chunksStr);

                // 拼接所有块
                StringBuilder sb = new StringBuilder();
                for (int i = 0; i < chunks; i++) {
                    String chunk = (String) redisTemplate.opsForValue().get(key + ":chunk:" + i);
                    if (chunk != null) {
                        sb.append(chunk);
                    }
                }
                return sb.toString();
            }
        }

        // 原始 Key
        return (String) redisTemplate.opsForValue().get(key);
    }

    /**
     * 设置 String 类型的值
     */
    public void setStringValue(String key, String value) {
        // 检查值大小
        byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
        if (bytes.length > 10485760) { // 10MB
            // 自动拆分
            RedisBigKeySplitter splitter = new RedisBigKeySplitter();
            splitter.splitStringKey(key, bytes);
        } else {
            // 直接存储
            redisTemplate.opsForValue().set(key, value);
        }
    }

    /**
     * 获取 List 类型的值
     */
    public List<String> getListValue(String key) {
        // 检查是否为拆分后的 Key
        if (redisTemplate.hasKey(key + ":meta")) {
            // 获取元数据
            String type = (String) redisTemplate.opsForHash().get(key + ":meta", "type");
            if ("list".equals(type)) {
                // 获取拆分块数
                String chunksStr = (String) redisTemplate.opsForHash().get(key + ":meta", "chunks");
                int chunks = Integer.parseInt(chunksStr);

                // 拼接所有块
                List<String> result = new ArrayList<>();
                for (int i = 0; i < chunks; i++) {
                    List<String> chunk = redisTemplate.opsForList().range(key + ":chunk:" + i, 0, -1);
                    if (chunk != null) {
                        result.addAll(chunk);
                    }
                }
                return result;
            }
        }

        // 原始 Key
        return redisTemplate.opsForList().range(key, 0, -1);
    }

    /**
     * 添加 List 类型的值
     */
    public void addListValue(String key, String value) {
        // 检查是否为拆分后的 Key
        if (redisTemplate.hasKey(key + ":meta")) {
            // 获取元数据
            String type = (String) redisTemplate.opsForHash().get(key + ":meta", "type");
            if ("list".equals(type)) {
                // 获取拆分块数
                String chunksStr = (String) redisTemplate.opsForHash().get(key + ":meta", "chunks");
                int chunks = Integer.parseInt(chunksStr);

                // 添加到最后一个块
                redisTemplate.opsForList().rightPush(key + ":chunk:" + (chunks - 1), value);
                return;
            }
        }

        // 原始 Key
        redisTemplate.opsForList().rightPush(key, value);
    }

    // 其他类型的操作方法...

}

四、自动检测告警实现

4.1 监控配置

# 监控配置
management:
  endpoints:
    web:
      exposure:
        include: health,info,prometheus

# Prometheus 配置
micrometer:
  prometheus:
    enabled: true

4.2 自定义指标

@Component
public class RedisBigKeyMetrics {

    @Autowired
    private MeterRegistry meterRegistry;

    private Counter bigKeyCounter;

    @PostConstruct
    public void init() {
        // 初始化大 Key 计数器
        bigKeyCounter = Counter.builder("redis.big.key.count")
                .description("Redis 大 Key 数量")
                .tag("type", "big_key")
                .register(meterRegistry);
    }

    /**
     * 增加大 Key 计数
     */
    public void incrementBigKeyCount() {
        bigKeyCounter.increment();
    }

    /**
     * 记录大 Key 大小
     */
    public void recordBigKeySize(String key, long size) {
        Gauge.builder("redis.big.key.size", size)
                .description("Redis 大 Key 大小")
                .tag("key", key)
                .register(meterRegistry);
    }

}

4.3 告警配置

在 Prometheus 中配置告警规则:

groups:
  - name: redis_big_key_alerts
    rules:
      - alert: RedisBigKeyDetected
        expr: redis_big_key_count > 0
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "Redis 大 Key 检测"
          description: "检测到 Redis 大 Key,请及时处理"

      - alert: RedisBigKeySizeExceeded
        expr: redis_big_key_size > 10485760
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Redis 大 Key 大小超限"
          description: "Redis Key {{ $labels.key }} 大小超过 10MB,请及时处理"

4.4 企业微信告警

@Component
public class WechatNotifier {

    @Value("${redis.big-key.notify-url}")
    private String notifyUrl;

    /**
     * 发送企业微信告警
     */
    public void sendWechatNotification(String message) {
        try {
            if (StringUtils.isNotBlank(notifyUrl)) {
                HttpClient client = HttpClient.newHttpClient();
                HttpRequest request = HttpRequest.newBuilder()
                        .uri(URI.create(notifyUrl))
                        .header("Content-Type", "application/json")
                        .POST(HttpRequest.BodyPublishers.ofString("{\"msgtype\":\"text\",\"text\":{\"content\":\"" + message + "\"}}"))
                        .build();
                HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
                log.info("企业微信告警发送成功: {}", response.body());
            }
        } catch (Exception e) {
            log.error("发送企业微信告警失败", e);
        }
    }

}

五、Spring Boot 完整实现

5.1 项目结构

redis-big-key-demo/
├── src/
│   ├── main/
│   │   ├── java/com/example/redis/  # 源代码
│   │   │   ├── config/             # 配置类
│   │   │   ├── service/            # 服务类
│   │   │   ├── util/               # 工具类
│   │   │   ├── task/               # 定时任务
│   │   │   └── RedisBigKeyDemoApplication.java  # 应用入口
│   │   └── resources/             # 配置文件
│   └── test/                      # 测试代码
└── pom.xml                        # Maven 依赖

5.2 核心配置

spring:
  redis:
    host: localhost
    port: 6379
    password:
    database: 0
    timeout: 10000
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0

# 大 Key 配置
redis:
  big-key:
    threshold: 10485760  # 10MB,单位字节
    check-interval: 60000  # 检查间隔,单位毫秒
    split-enabled: true  # 是否启用自动拆分
    alert-enabled: true  # 是否启用告警
    alert-threshold: 5242880  # 告警阈值,5MB
    notify-url: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key"  # 企业微信告警地址

# 监控配置
management:
  endpoints:
    web:
      exposure:
        include: health,info,prometheus

# Prometheus 配置
micrometer:
  prometheus:
    enabled: true

5.3 核心代码

5.3.1 Redis 配置

@Configuration
@EnableRedisRepositories
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

}

5.3.2 大 Key 拆分工具类

@Slf4j
@Component
public class RedisBigKeySplitter {

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 拆分 String 类型的大 Key
     */
    public void splitStringKey(String key, byte[] value) {
        log.info("开始拆分 String 类型的大 Key: {}", key);

        // 计算拆分数量
        int chunkSize = 1024 * 1024; // 1MB 每块
        int totalChunks = (value.length + chunkSize - 1) / chunkSize;

        // 存储拆分后的数据
        for (int i = 0; i < totalChunks; i++) {
            int start = i * chunkSize;
            int end = Math.min(start + chunkSize, value.length);
            byte[] chunk = Arrays.copyOfRange(value, start, end);
            redisTemplate.opsForValue().set(key + ":chunk:" + i, new String(chunk, StandardCharsets.UTF_8));
        }

        // 存储元数据
        redisTemplate.opsForHash().put(key + ":meta", "type", "string");
        redisTemplate.opsForHash().put(key + ":meta", "chunks", String.valueOf(totalChunks));
        redisTemplate.opsForHash().put(key + ":meta", "originalSize", String.valueOf(value.length));

        // 删除原始大 Key
        redisTemplate.delete(key);

        log.info("String 类型的大 Key 拆分完成: {}, 拆分为 {} 块", key, totalChunks);
    }

    // 其他类型的拆分方法...

}

5.3.3 大 Key 检测服务

@Service
@Slf4j
public class RedisBigKeyDetector {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private RedisBigKeySplitter redisBigKeySplitter;

    @Autowired
    private RedisBigKeyMetrics redisBigKeyMetrics;

    @Autowired
    private WechatNotifier wechatNotifier;

    @Value("${redis.big-key.threshold:10485760}")
    private long threshold;

    @Value("${redis.big-key.alert-threshold:5242880}")
    private long alertThreshold;

    @Value("${redis.big-key.split-enabled:true}")
    private boolean splitEnabled;

    @Value("${redis.big-key.alert-enabled:true}")
    private boolean alertEnabled;

    /**
     * 检测大 Key
     */
    public void detectBigKeys() {
        log.info("开始检测大 Key");

        try {
            // 获取所有 Key
            Set<String> keys = redisTemplate.keys("*");
            if (keys == null || keys.isEmpty()) {
                log.info("没有找到 Key");
                return;
            }

            for (String key : keys) {
                // 跳过元数据 Key
                if (key.endsWith(":meta")) {
                    continue;
                }

                // 跳过拆分后的 Key
                if (key.contains(":chunk:")) {
                    continue;
                }

                // 获取 Key 的大小
                long size = getKeySize(key);
                log.info("Key: {}, 大小: {} bytes", key, size);

                // 检测是否为大 Key
                if (size > threshold) {
                    log.warn("检测到大 Key: {}, 大小: {} bytes", key, size);

                    // 增加大 Key 计数
                    redisBigKeyMetrics.incrementBigKeyCount();
                    // 记录大 Key 大小
                    redisBigKeyMetrics.recordBigKeySize(key, size);

                    // 触发告警
                    if (alertEnabled) {
                        sendAlert(key, size);
                    }

                    // 自动拆分
                    if (splitEnabled) {
                        splitBigKey(key, size);
                    }
                } else if (size > alertThreshold) {
                    log.warn("Key 接近阈值: {}, 大小: {} bytes", key, size);

                    // 触发告警
                    if (alertEnabled) {
                        sendAlert(key, size);
                    }
                }
            }
        } catch (Exception e) {
            log.error("检测大 Key 失败", e);
        }

        log.info("大 Key 检测完成");
    }

    // 其他方法...

    /**
     * 发送告警
     */
    private void sendAlert(String key, long size) {
        try {
            // 构建告警消息
            String message = String.format("【Redis 大 Key 告警】\nKey: %s\n大小: %s bytes (%.2f MB)\n阈值: %s bytes (%.2f MB)",
                    key, size, size / (1024.0 * 1024.0), threshold, threshold / (1024.0 * 1024.0));

            // 发送企业微信告警
            wechatNotifier.sendWechatNotification(message);
        } catch (Exception e) {
            log.error("发送告警失败: {}", key, e);
        }
    }

}

5.3.4 定时任务

@Component
public class RedisBigKeyCheckTask {

    @Autowired
    private RedisBigKeyDetector redisBigKeyDetector;

    @Value("${redis.big-key.check-interval:60000}")
    private long checkInterval;

    @PostConstruct
    public void init() {
        // 启动定时任务
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(() -> {
            try {
                redisBigKeyDetector.detectBigKeys();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, 0, checkInterval, TimeUnit.MILLISECONDS);
    }

}

5.3.5 大 Key 访问封装

@Service
public class RedisBigKeyService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 获取 String 类型的值
     */
    public String getStringValue(String key) {
        // 检查是否为拆分后的 Key
        if (redisTemplate.hasKey(key + ":meta")) {
            // 获取元数据
            String type = (String) redisTemplate.opsForHash().get(key + ":meta", "type");
            if ("string".equals(type)) {
                // 获取拆分块数
                String chunksStr = (String) redisTemplate.opsForHash().get(key + ":meta", "chunks");
                int chunks = Integer.parseInt(chunksStr);

                // 拼接所有块
                StringBuilder sb = new StringBuilder();
                for (int i = 0; i < chunks; i++) {
                    String chunk = (String) redisTemplate.opsForValue().get(key + ":chunk:" + i);
                    if (chunk != null) {
                        sb.append(chunk);
                    }
                }
                return sb.toString();
            }
        }

        // 原始 Key
        return (String) redisTemplate.opsForValue().get(key);
    }

    /**
     * 设置 String 类型的值
     */
    public void setStringValue(String key, String value) {
        // 检查值大小
        byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
        if (bytes.length > 10485760) { // 10MB
            // 自动拆分
            RedisBigKeySplitter splitter = new RedisBigKeySplitter();
            splitter.splitStringKey(key, bytes);
        } else {
            // 直接存储
            redisTemplate.opsForValue().set(key, value);
        }
    }

    // 其他类型的操作方法...

}

六、最佳实践

6.1 大 Key 预防最佳实践

原则

  • 合理设计 Key:避免使用过大的 Key,合理设计 Key 的结构
  • 数据分片:将大数据分片存储,避免单个 Key 过大
  • 定期清理:定期清理过期数据,避免数据积累
  • 监控预警:建立监控预警机制,及时发现大 Key

建议

  • 使用 Redis 集群,分散数据存储
  • 对大型数据使用分片存储,如使用前缀 + 序号的方式
  • 定期检查 Redis Key 的大小,发现大 Key 及时处理
  • 使用 Redis 的过期机制,自动清理过期数据

6.2 大 Key 拆分最佳实践

原则

  • 类型适配:根据 Key 的类型选择合适的拆分策略
  • 粒度适中:拆分粒度要适中,避免过多的小 Key
  • 元数据管理:合理管理拆分后的元数据,便于数据访问
  • 向后兼容:保持接口的向后兼容,避免影响现有代码

建议

  • String 类型:按 1MB 大小拆分
  • List 类型:按 1000 个元素拆分
  • Hash 类型:按 1000 个字段拆分
  • Set 类型:按 1000 个元素拆分
  • ZSet 类型:按 1000 个元素拆分

6.3 监控告警最佳实践

原则

  • 实时监控:实时监控 Redis Key 的大小
  • 阈值合理:设置合理的告警阈值
  • 多渠道告警:使用多种告警渠道,确保告警及时送达
  • 自动处理:对于大 Key,实现自动拆分处理

建议

  • 使用 Prometheus + Grafana 监控 Redis Key 大小
  • 设置两级告警阈值:预警阈值和处理阈值
  • 使用企业微信、钉钉等渠道发送告警
  • 实现大 Key 的自动拆分处理

6.4 性能优化最佳实践

原则

  • 减少网络传输:减少大 Key 的网络传输
  • 避免阻塞操作:避免对大 Key 进行阻塞操作
  • 优化存储结构:优化数据存储结构,减少内存占用
  • 合理使用数据类型:根据业务场景选择合适的数据类型

建议

  • 使用 Redis Pipeline 批量操作,减少网络往返
  • 对大 Key 的操作使用异步方式,避免阻塞主线程
  • 使用压缩算法,减少数据大小
  • 合理使用 Redis 数据类型,如使用 Hash 存储对象

七、总结

Redis 大 Key 是一个常见的性能瓶颈,会导致内存占用不均衡、网络传输缓慢、阻塞操作、过期删除卡顿等问题。通过本文的实现方案,开发者可以构建一个功能强大的 Redis 大 Key 管理系统,帮助团队更好地管理 Redis 数据,避免大 Key 带来的性能问题。

互动话题

  1. 你在实际项目中遇到过大 Key 问题吗?是如何解决的?
  2. 你认为大 Key 拆分的最佳粒度是多少?
  3. 你有使用过类似的工具或方案吗?

欢迎在评论区留言讨论!更多技术文章,欢迎关注公众号:服务端技术精选


标题:SpringBoot + Redis 大 Key 拆分 + 自动检测告警:单个 Key 超 10MB?自动拆分防阻塞
作者:jiangyi
地址:http://jiangyi.space/articles/2026/04/11/1775830865099.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消