SpringBoot + FFmpeg + Redis:视频转码、截图、水印异步处理平台搭建

引言:视频处理的那些坑

各位服务端的兄弟们,你们有没有遇到过这样的场景:用户上传了一个大视频,你直接在当前线程里处理,结果导致接口响应超时,用户体验极差?或者视频处理过程中服务器CPU飙升,影响了其他服务的正常运行?再或者多个视频同时处理,直接把服务器搞崩了?

视频处理是典型的CPU密集型任务,如果处理不当,很容易成为系统的性能瓶颈。今天我们就来聊聊如何用SpringBoot + FFmpeg + Redis搭建一个高性能的视频异步处理平台,让视频处理不再成为系统的负担。

为什么需要异步处理?

先说说为什么视频处理必须异步。

想象一下,你是一家在线教育平台的后端工程师。老师上传了一个2小时的课程视频,如果同步处理,用户要等2个小时才能看到处理结果,这显然是不可接受的。而且,在处理视频的过程中,服务器资源被大量占用,其他用户的服务质量也会受到影响。

异步处理的优势:

  1. 用户体验好:用户提交任务后立即返回,不需要等待
  2. 资源利用合理:避免长时间占用服务器资源
  3. 可扩展性强:可以通过增加处理节点来提升处理能力
  4. 容错性好:单个任务失败不影响其他任务

技术选型:为什么选择这些技术?

FFmpeg:视频处理的瑞士军刀

FFmpeg是业界最强大的多媒体处理工具,几乎支持所有主流的音视频格式。它可以完成:

  • 视频转码(MP4转AVI、H.264转H.265等)
  • 视频截图
  • 添加水印
  • 视频剪辑
  • 格式转换

Redis:任务队列的完美选择

Redis的发布订阅功能和数据结构非常适合做任务队列:

  • 高性能:内存操作,速度极快
  • 原子性:保证任务处理的原子性
  • 持久化:防止任务丢失
  • 灵活:支持多种数据结构

SpringBoot:快速开发的利器

SpringBoot提供了:

  • 自动配置:快速集成各种组件
  • 异步支持:内置异步处理能力
  • 监控:丰富的监控指标

系统架构设计

我们的异步处理平台主要包括以下几个模块:

  1. 任务提交模块:接收用户上传的视频和处理请求
  2. 任务队列模块:使用Redis存储待处理任务
  3. 任务分发模块:将任务分发给可用的处理节点
  4. 任务处理模块:使用FFmpeg执行具体的视频处理任务
  5. 状态查询模块:用户可以查询任务处理进度
  6. 结果存储模块:处理完成后存储结果

核心实现思路

1. 任务模型设计

首先定义一个任务模型:

public class VideoProcessTask {
    private String taskId;           // 任务ID
    private String originalUrl;      // 原始视频URL
    private List<ProcessType> operations; // 处理操作列表
    private String callbackUrl;      // 回调URL
    private String status;           // 任务状态
    private String resultUrl;        // 处理结果URL
    private long createTime;         // 创建时间
    private long updateTime;         // 更新时间
}

2. 任务队列实现

使用Redis的List数据结构实现任务队列:

@Component
public class TaskQueueService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 添加任务到队列
    public void addTask(VideoProcessTask task) {
        redisTemplate.opsForList().leftPush("video_process_queue", task);
    }
    
    // 从队列获取任务
    public VideoProcessTask getTask() {
        return (VideoProcessTask) redisTemplate.opsForList()
            .rightPop("video_process_queue", 30, TimeUnit.SECONDS);
    }
    
    // 更新任务状态
    public void updateTaskStatus(String taskId, String status) {
        String key = "task:" + taskId;
        redisTemplate.opsForHash().put(key, "status", status);
        redisTemplate.opsForHash().put(key, "updateTime", System.currentTimeMillis());
    }
}

3. 异步处理任务

使用Spring的@Async注解实现异步处理:

@Service
public class VideoProcessService {
    
    @Async("taskExecutor")
    public CompletableFuture<String> processVideo(VideoProcessTask task) {
        try {
            // 更新任务状态为处理中
            taskQueueService.updateTaskStatus(task.getTaskId(), "processing");
            
            // 执行FFmpeg命令
            String result = executeFFmpegCommand(task);
            
            // 更新任务状态为完成
            taskQueueService.updateTaskStatus(task.getTaskId(), "completed");
            taskQueueService.updateTaskStatus(task.getTaskId(), "resultUrl", result);
            
            // 发送回调
            sendCallback(task.getCallbackUrl(), task.getTaskId(), "completed", result);
            
            return CompletableFuture.completedFuture("success");
        } catch (Exception e) {
            // 更新任务状态为失败
            taskQueueService.updateTaskStatus(task.getTaskId(), "failed");
            return CompletableFuture.completedFuture("failed");
        }
    }
    
    private String executeFFmpegCommand(VideoProcessTask task) {
        // 构建FFmpeg命令
        List<String> command = new ArrayList<>();
        command.add("ffmpeg");
        command.add("-i");
        command.add(task.getOriginalUrl());
        
        // 根据处理类型添加参数
        for (ProcessType operation : task.getOperations()) {
            switch (operation) {
                case TRANSCODE:
                    command.add("-c:v");
                    command.add("libx264");
                    command.add("-c:a");
                    command.add("aac");
                    break;
                case SCREENSHOT:
                    command.add("-ss");
                    command.add("00:00:10"); // 截取第10秒的画面
                    command.add("-vframes");
                    command.add("1");
                    break;
                case WATERMARK:
                    command.add("-i");
                    command.add("watermark.png");
                    command.add("-filter_complex");
                    command.add("[1][0]overlay=10:10");
                    break;
            }
        }
        
        // 输出文件
        String outputPath = generateOutputPath(task.getTaskId());
        command.add(outputPath);
        
        // 执行命令
        ProcessBuilder processBuilder = new ProcessBuilder(command);
        Process process = processBuilder.start();
        
        // 等待处理完成
        int exitCode = process.waitFor();
        
        if (exitCode != 0) {
            throw new RuntimeException("FFmpeg processing failed");
        }
        
        return outputPath;
    }
}

4. 任务调度器

创建一个任务调度器,持续从队列中取出任务进行处理:

@Component
public class TaskScheduler {
    
    @Autowired
    private TaskQueueService taskQueueService;
    
    @Autowired
    private VideoProcessService videoProcessService;
    
    @Scheduled(fixedRate = 1000) // 每秒检查一次
    public void scheduleTasks() {
        // 检查是否有可用的处理线程
        if (isProcessorAvailable()) {
            VideoProcessTask task = taskQueueService.getTask();
            if (task != null) {
                videoProcessService.processVideo(task);
            }
        }
    }
    
    private boolean isProcessorAvailable() {
        // 检查当前正在处理的任务数量
        // 避免过多并发导致服务器负载过高
        return getCurrentProcessingCount() < MAX_CONCURRENT_TASKS;
    }
}

高级特性实现

1. 任务优先级

可以为不同类型的任务设置不同的优先级:

public enum TaskPriority {
    LOW(1),      // 普通任务
    MEDIUM(2),   // 中等优先级
    HIGH(3);     // 高优先级
    
    private final int value;
    
    TaskPriority(int value) {
        this.value = value;
    }
    
    public int getValue() {
        return value;
    }
}

2. 任务进度跟踪

通过FFmpeg的进度回调来跟踪处理进度:

private void executeFFmpegWithProgress(VideoProcessTask task) throws IOException, InterruptedException {
    ProcessBuilder processBuilder = new ProcessBuilder("ffmpeg", "-i", task.getOriginalUrl(), ...);
    
    Process process = processBuilder.start();
    
    // 读取FFmpeg的进度信息
    BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
    String line;
    while ((line = reader.readLine()) != null) {
        if (line.contains("frame=")) {
            // 解析进度信息并更新到Redis
            updateProgress(task.getTaskId(), parseProgress(line));
        }
    }
    
    process.waitFor();
}

3. 容错与重试机制

实现任务失败后的重试机制:

@Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000))
public String processVideoWithRetry(VideoProcessTask task) {
    return executeFFmpegCommand(task);
}

@Recover
public String recover(Exception ex, VideoProcessTask task) {
    // 重试失败后的处理逻辑
    taskQueueService.updateTaskStatus(task.getTaskId(), "failed_after_retry");
    return "failed";
}

性能优化建议

  1. 合理设置并发数:根据服务器CPU核心数和内存大小设置合适的并发处理数
  2. 文件存储优化:使用分布式文件系统存储视频文件
  3. 内存管理:及时清理处理完成的临时文件
  4. 监控告警:监控任务队列长度、处理时间等关键指标

最佳实践

  1. 任务拆分:对于复杂的视频处理任务,可以拆分成多个子任务
  2. 资源隔离:为视频处理任务设置专门的资源池
  3. 降级策略:在高负载情况下,可以暂时停止非关键的视频处理任务
  4. 安全考虑:对上传的视频文件进行安全检查,防止恶意文件

总结

通过SpringBoot + FFmpeg + Redis的组合,我们可以搭建一个高性能的视频异步处理平台。关键在于:

  1. 合理架构:任务队列、异步处理、状态管理
  2. 性能优化:控制并发、资源管理、监控告警
  3. 容错处理:重试机制、降级策略、异常处理

记住,视频处理虽然复杂,但通过合理的架构设计和异步处理,完全可以做到高性能、高可用。掌握了这些技巧,你就能轻松应对各种视频处理需求,再也不用担心视频处理拖垮服务器了。


标题:SpringBoot + FFmpeg + Redis:视频转码、截图、水印异步处理平台搭建
作者:jiangyi
地址:http://jiangyi.space/articles/2025/12/30/1767073184185.html

    0 评论
avatar