SpringBoot + Redis Pipeline + 批量操作:百次查询合并一次往返,性能提升 5 倍

前言

在高并发场景下,Redis 作为缓存中间件被广泛使用。然而,许多开发者在使用 Redis 时往往忽略了网络开销带来的性能损耗。当需要执行大量 Redis 命令时,逐条发送命令会导致大量的网络往返,严重影响系统性能。

Redis Pipeline(管道)技术可以将多条命令打包一次性发送,将百次网络往返合并为一次,性能提升可达 5 倍以上。本文将深入讲解 Redis Pipeline 的原理、使用场景和最佳实践。

一、为什么需要 Pipeline?

1. 传统方式的问题

┌─────────────────────────────────────────────────────────────┐
│                    传统逐条命令执行                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  客户端                        Redis 服务器                  │
│    │                              │                         │
│    │──── GET key1 ────────────────▶│                        │
│    │◀─── value1 ──────────────────│  RTT ~1ms              │
│    │                              │                         │
│    │──── GET key2 ────────────────▶│                        │
│    │◀─── value2 ──────────────────│  RTT ~1ms              │
│    │                              │                         │
│    │──── GET key3 ────────────────▶│                        │
│    │◀─── value3 ──────────────────│  RTT ~1ms              │
│    │                              │                         │
│    │         ... 重复 100 次 ...    │                        │
│    │                              │                         │
│  总耗时 ≈ 100 × 1ms = 100ms                                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2. Pipeline 的优势

┌─────────────────────────────────────────────────────────────┐
│                    Pipeline 批量执行                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  客户端                        Redis 服务器                  │
│    │                              │                         │
│    │──── GET key1                │                         │
│    │──── GET key2                │                         │
│    │──── GET key3                │                         │
│    │──── ...                     │  一次性发送              │
│    │──── GET key100 ─────────────▶│                        │
│    │                              │                         │
│    │◀─── value1                  │                         │
│    │◀─── value2                  │                         │
│    │◀─── value3                  │  一次性返回              │
│    │◀─── ...                     │                         │
│    │◀─── value100 ───────────────│                        │
│    │                              │                         │
│  总耗时 ≈ 1 × 1ms = 1ms(仅一次网络往返)                    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

3. 性能对比

操作方式100 次命令耗时1000 次命令耗时性能提升
逐条执行~100ms~1000ms基准
Pipeline~2ms~10ms50-100 倍
Lua 脚本~1ms~5ms100-200 倍

二、Pipeline 原理深入

1. 工作机制

┌─────────────────────────────────────────────────────────────┐
│                    Pipeline 工作原理                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   客户端                             │   │
│  │  ┌─────────────────────────────────────────────┐   │   │
│  │  │              命令缓冲区                       │   │   │
│  │  │  SET key1 val1                              │   │   │
│  │  │  SET key2 val2                              │   │   │
│  │  │  SET key3 val3                              │   │   │
│  │  │  ...                                        │   │   │
│  │  └─────────────────────────────────────────────┘   │   │
│  │                        │                            │   │
│  │                        ▼                            │   │
│  │              一次性发送所有命令                      │   │
│  └─────────────────────────────────────────────────────┘   │
│                           │                                 │
│                           ▼                                 │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                  Redis 服务器                        │   │
│  │                                                     │   │
│  │  1. 接收所有命令                                    │   │
│  │  2. 依次执行(非原子)                               │   │
│  │  3. 缓存所有结果                                    │   │
│  │  4. 一次性返回                                      │   │
│  │                                                     │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2. 与事务的区别

特性Pipeline事务(MULTI/EXEC)
原子性不保证保证
网络往返1 次1 次(需配合 Pipeline)
命令执行顺序执行原子执行
错误处理单条失败不影响其他语法错误全部回滚
适用场景批量读写、无依赖操作需要原子性的操作

3. 注意事项

┌─────────────────────────────────────────────────────────────┐
│                    Pipeline 使用注意事项                     │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 内存消耗                                                 │
│     - Pipeline 会缓存所有命令和结果                          │
│     - 不建议单次发送过多命令(建议分批,每批 100-500 条)      │
│                                                             │
│  2. 非原子性                                                 │
│     - Pipeline 中的命令可能被其他客户端命令插队               │
│     - 需要原子性请使用事务或 Lua 脚本                        │
│                                                             │
│  3. 超时设置                                                 │
│     - 大量命令可能导致执行时间过长                            │
│     - 需要合理设置超时时间                                   │
│                                                             │
│  4. 错误处理                                                 │
│     - 单条命令失败不影响其他命令                              │
│     - 需要检查每条命令的返回结果                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

三、Spring Boot 集成实现

1. 项目依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
    </dependency>
</dependencies>

2. Redis 配置

spring:
  redis:
    host: localhost
    port: 6379
    password: 
    database: 0
    lettuce:
      pool:
        max-active: 20
        max-idle: 10
        min-idle: 5
        max-wait: 3000ms

3. 核心代码实现

3.1 Pipeline 服务封装

@Service
@Slf4j
public class RedisPipelineService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public List<Object> executePipeline(List<RedisCallback<Object>> callbacks) {
        return redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                for (RedisCallback<Object> callback : callbacks) {
                    operations.execute(callback);
                }
                return null;
            }
        });
    }

    public void batchSet(Map<String, Object> data) {
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                data.forEach((key, value) -> {
                    operations.opsForValue().set(key, value);
                });
                return null;
            }
        });
        log.info("批量写入完成: count={}", data.size());
    }

    public List<Object> batchGet(List<String> keys) {
        List<Object> results = redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                for (String key : keys) {
                    operations.opsForValue().get(key);
                }
                return null;
            }
        });
        log.info("批量读取完成: count={}", keys.size());
        return results;
    }

    public void batchDelete(List<String> keys) {
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                for (String key : keys) {
                    operations.delete(key);
                }
                return null;
            }
        });
        log.info("批量删除完成: count={}", keys.size());
    }

    public void batchSetWithExpire(Map<String, Object> data, long expireSeconds) {
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                data.forEach((key, value) -> {
                    operations.opsForValue().set(key, value, 
                        expireSeconds, TimeUnit.SECONDS);
                });
                return null;
            }
        });
        log.info("批量写入(带过期时间)完成: count={}, expire={}s", 
                data.size(), expireSeconds);
    }

}

3.2 批量 Hash 操作

@Service
public class RedisHashPipelineService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void batchHashSet(String key, Map<String, Object> fieldValues) {
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                operations.opsForHash().putAll(key, fieldValues);
                return null;
            }
        });
    }

    public List<Object> batchHashGet(String key, List<String> fields) {
        return redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                for (String field : fields) {
                    operations.opsForHash().get(key, field);
                }
                return null;
            }
        });
    }

    public void batchMultiHashSet(Map<String, Map<String, Object>> data) {
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                data.forEach((key, fieldValues) -> {
                    operations.opsForHash().putAll(key, fieldValues);
                });
                return null;
            }
        });
    }

}

3.3 分批处理工具

@Component
public class PipelineBatchProcessor {

    @Autowired
    private RedisPipelineService pipelineService;

    private static final int DEFAULT_BATCH_SIZE = 500;

    public void batchSetInChunks(Map<String, Object> data, int batchSize) {
        List<Map<String, Object>> chunks = splitIntoChunks(data, batchSize);
        
        for (int i = 0; i < chunks.size(); i++) {
            Map<String, Object> chunk = chunks.get(i);
            pipelineService.batchSet(chunk);
            log.info("处理第 {}/{} 批数据, size={}", i + 1, chunks.size(), chunk.size());
        }
    }

    public List<Object> batchGetInChunks(List<String> keys, int batchSize) {
        List<List<String>> chunks = splitIntoChunks(keys, batchSize);
        List<Object> allResults = new ArrayList<>();

        for (int i = 0; i < chunks.size(); i++) {
            List<String> chunk = chunks.get(i);
            List<Object> results = pipelineService.batchGet(chunk);
            allResults.addAll(results);
            log.info("处理第 {}/{} 批数据, size={}", i + 1, chunks.size(), chunk.size());
        }

        return allResults;
    }

    private <T> List<List<T>> splitIntoChunks(List<T> list, int batchSize) {
        List<List<T>> chunks = new ArrayList<>();
        for (int i = 0; i < list.size(); i += batchSize) {
            int end = Math.min(i + batchSize, list.size());
            chunks.add(list.subList(i, end));
        }
        return chunks;
    }

    private <K, V> List<Map<K, V>> splitIntoChunks(Map<K, V> map, int batchSize) {
        List<Map<K, V>> chunks = new ArrayList<>();
        List<Map.Entry<K, V>> entries = new ArrayList<>(map.entrySet());
        
        for (int i = 0; i < entries.size(); i += batchSize) {
            int end = Math.min(i + batchSize, entries.size());
            Map<K, V> chunk = new HashMap<>();
            for (int j = i; j < end; j++) {
                Map.Entry<K, V> entry = entries.get(j);
                chunk.put(entry.getKey(), entry.getValue());
            }
            chunks.add(chunk);
        }
        return chunks;
    }

}

4. 性能测试对比

@Service
@Slf4j
public class PerformanceTestService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private RedisPipelineService pipelineService;

    public PerformanceResult comparePerformance(int count) {
        Map<String, Object> testData = generateTestData(count);
        List<String> keys = new ArrayList<>(testData.keySet());

        PerformanceResult result = new PerformanceResult();
        result.setCount(count);

        long normalSetTime = testNormalSet(testData);
        result.setNormalSetTime(normalSetTime);

        long pipelineSetTime = testPipelineSet(testData);
        result.setPipelineSetTime(pipelineSetTime);

        long normalGetTime = testNormalGet(keys);
        result.setNormalGetTime(normalGetTime);

        long pipelineGetTime = testPipelineGet(keys);
        result.setPipelineGetTime(pipelineGetTime);

        result.setSetImprovement((double) normalSetTime / pipelineSetTime);
        result.setGetImprovement((double) normalGetTime / pipelineGetTime);

        return result;
    }

    private long testNormalSet(Map<String, Object> data) {
        long start = System.currentTimeMillis();
        data.forEach((key, value) -> redisTemplate.opsForValue().set(key, value));
        return System.currentTimeMillis() - start;
    }

    private long testPipelineSet(Map<String, Object> data) {
        long start = System.currentTimeMillis();
        pipelineService.batchSet(data);
        return System.currentTimeMillis() - start;
    }

    private long testNormalGet(List<String> keys) {
        long start = System.currentTimeMillis();
        keys.forEach(key -> redisTemplate.opsForValue().get(key));
        return System.currentTimeMillis() - start;
    }

    private long testPipelineGet(List<String> keys) {
        long start = System.currentTimeMillis();
        pipelineService.batchGet(keys);
        return System.currentTimeMillis() - start;
    }

    private Map<String, Object> generateTestData(int count) {
        Map<String, Object> data = new HashMap<>();
        for (int i = 0; i < count; i++) {
            data.put("test:key:" + i, "value_" + i);
        }
        return data;
    }

}

@Data
class PerformanceResult {
    private int count;
    private long normalSetTime;
    private long pipelineSetTime;
    private long normalGetTime;
    private long pipelineGetTime;
    private double setImprovement;
    private double getImprovement;
}

四、高级应用场景

1. 批量缓存预热

@Service
public class CacheWarmupService {

    @Autowired
    private RedisPipelineService pipelineService;

    @Autowired
    private UserRepository userRepository;

    public void warmupUserCache() {
        log.info("开始缓存预热...");

        List<User> users = userRepository.findAll();
        Map<String, Object> cacheData = new HashMap<>();

        for (User user : users) {
            String key = "user:" + user.getId();
            cacheData.put(key, user);
        }

        pipelineService.batchSetWithExpire(cacheData, 3600);

        log.info("缓存预热完成: count={}", users.size());
    }

}

2. 批量数据同步

@Service
public class DataSyncService {

    @Autowired
    private RedisPipelineService pipelineService;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void syncFromDbToCache(List<Product> products) {
        Map<String, Object> cacheData = products.stream()
            .collect(Collectors.toMap(
                p -> "product:" + p.getId(),
                p -> p
            ));

        pipelineService.batchSet(cacheData);
    }

    public List<Product> batchGetProducts(List<Long> productIds) {
        List<String> keys = productIds.stream()
            .map(id -> "product:" + id)
            .collect(Collectors.toList());

        List<Object> results = pipelineService.batchGet(keys);

        return results.stream()
            .filter(Objects::nonNull)
            .map(obj -> (Product) obj)
            .collect(Collectors.toList());
    }

}

3. 批量计数器更新

@Service
public class CounterBatchService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void batchIncrement(Map<String, Long> counters) {
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                counters.forEach((key, increment) -> {
                    operations.opsForValue().increment(key, increment);
                });
                return null;
            }
        });
    }

    public Map<String, Long> batchGetCounters(List<String> keys) {
        List<Object> results = redisTemplate.executePipelined(
            new SessionCallback<Object>() {
                @Override
                public Object execute(RedisOperations operations) {
                    for (String key : keys) {
                        operations.opsForValue().get(key);
                    }
                    return null;
                }
            }
        );

        Map<String, Long> counters = new HashMap<>();
        for (int i = 0; i < keys.size(); i++) {
            Object value = results.get(i);
            counters.put(keys.get(i), value != null ? Long.parseLong(value.toString()) : 0L);
        }
        return counters;
    }

}

4. Pipeline + 事务组合

@Service
public class PipelineTransactionService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void executeWithTransaction(List<String> keys, Map<String, Object> updates) {
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                operations.multi();
                
                try {
                    for (String key : keys) {
                        operations.delete(key);
                    }
                    
                    updates.forEach((key, value) -> {
                        operations.opsForValue().set(key, value);
                    });
                    
                    operations.exec();
                } catch (Exception e) {
                    operations.discard();
                    throw e;
                }
                
                return null;
            }
        });
    }

}

五、最佳实践

1. 批量大小选择

数据量建议批量大小说明
< 100全部一次发送网络开销可忽略
100 - 1000100 - 500平衡性能和内存
1000 - 10000500 - 1000分批处理
> 100001000多批次处理

2. 错误处理策略

@Service
public class SafePipelineService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public PipelineResult safeBatchSet(Map<String, Object> data) {
        PipelineResult result = new PipelineResult();

        try {
            List<Object> results = redisTemplate.executePipelined(
                new SessionCallback<Object>() {
                    @Override
                    public Object execute(RedisOperations operations) {
                        data.forEach((key, value) -> {
                            try {
                                operations.opsForValue().set(key, value);
                            } catch (Exception e) {
                                result.addError(key, e.getMessage());
                            }
                        });
                        return null;
                    }
                }
            );

            result.setSuccessCount(data.size() - result.getErrors().size());

        } catch (Exception e) {
            result.setGlobalError(e.getMessage());
        }

        return result;
    }

}

@Data
class PipelineResult {
    private int successCount;
    private Map<String, String> errors = new HashMap<>();
    private String globalError;

    public void addError(String key, String error) {
        errors.put(key, error);
    }
}

3. 监控指标

指标说明告警阈值
Pipeline 执行时间单次 Pipeline 执行耗时> 100ms
批量大小单次 Pipeline 命令数> 1000
失败率Pipeline 执行失败比例> 1%
内存使用Redis 内存使用率> 80%

六、常见问题

Q1: Pipeline 和 mget/mset 有什么区别?

A:

  • mget/mset 是 Redis 原生命令,只支持单一操作类型
  • Pipeline 支持混合多种命令类型
  • Pipeline 更灵活,可以包含条件逻辑

Q2: Pipeline 会阻塞 Redis 吗?

A:

  • Pipeline 命令会占用 Redis 线程
  • 大量命令可能导致其他请求延迟
  • 建议控制批量大小,避免一次性发送过多命令

Q3: 如何处理 Pipeline 中的部分失败?

A:

  • Pipeline 不保证原子性
  • 需要检查每条命令的返回结果
  • 对于关键操作,考虑使用事务或 Lua 脚本

Q4: Pipeline 适合什么场景?

A:

  • 批量数据导入/导出
  • 缓存预热
  • 批量查询
  • 数据同步

七、性能测试结果

测试环境

  • Redis 7.0
  • Spring Boot 3.2
  • 本地网络(RTT < 1ms)

测试结果

操作数据量普通方式Pipeline提升倍数
SET10015ms2ms7.5x
SET1000150ms8ms18.75x
SET100001500ms70ms21.4x
GET10012ms2ms6x
GET1000120ms7ms17.1x
GET100001200ms65ms18.5x

八、总结

Redis Pipeline 是提升 Redis 批量操作性能的利器:

核心优势:

  • 减少网络往返:百次命令合并一次发送
  • 显著性能提升:5-20 倍性能提升
  • 实现简单:Spring Boot 开箱即用

适用场景:

  • 批量数据读写
  • 缓存预热
  • 数据同步
  • 批量计数器更新

注意事项:

  • 不保证原子性
  • 控制批量大小
  • 合理设置超时
  • 做好错误处理

更多技术文章,欢迎关注公众号"服务端技术精选",及时获取最新动态。


标题:SpringBoot + Redis Pipeline + 批量操作:百次查询合并一次往返,性能提升 5 倍
作者:jiangyi
地址:http://jiangyi.space/articles/2026/03/28/1774262062702.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消