SpringBoot + FFmpeg + Redis:视频转码、截图、水印异步处理平台搭建
引言:视频处理的那些坑
各位服务端的兄弟们,你们有没有遇到过这样的场景:用户上传了一个大视频,你直接在当前线程里处理,结果导致接口响应超时,用户体验极差?或者视频处理过程中服务器CPU飙升,影响了其他服务的正常运行?再或者多个视频同时处理,直接把服务器搞崩了?
视频处理是典型的CPU密集型任务,如果处理不当,很容易成为系统的性能瓶颈。今天我们就来聊聊如何用SpringBoot + FFmpeg + Redis搭建一个高性能的视频异步处理平台,让视频处理不再成为系统的负担。
为什么需要异步处理?
先说说为什么视频处理必须异步。
想象一下,你是一家在线教育平台的后端工程师。老师上传了一个2小时的课程视频,如果同步处理,用户要等2个小时才能看到处理结果,这显然是不可接受的。而且,在处理视频的过程中,服务器资源被大量占用,其他用户的服务质量也会受到影响。
异步处理的优势:
- 用户体验好:用户提交任务后立即返回,不需要等待
- 资源利用合理:避免长时间占用服务器资源
- 可扩展性强:可以通过增加处理节点来提升处理能力
- 容错性好:单个任务失败不影响其他任务
技术选型:为什么选择这些技术?
FFmpeg:视频处理的瑞士军刀
FFmpeg是业界最强大的多媒体处理工具,几乎支持所有主流的音视频格式。它可以完成:
- 视频转码(MP4转AVI、H.264转H.265等)
- 视频截图
- 添加水印
- 视频剪辑
- 格式转换
Redis:任务队列的完美选择
Redis的发布订阅功能和数据结构非常适合做任务队列:
- 高性能:内存操作,速度极快
- 原子性:保证任务处理的原子性
- 持久化:防止任务丢失
- 灵活:支持多种数据结构
SpringBoot:快速开发的利器
SpringBoot提供了:
- 自动配置:快速集成各种组件
- 异步支持:内置异步处理能力
- 监控:丰富的监控指标
系统架构设计
我们的异步处理平台主要包括以下几个模块:
- 任务提交模块:接收用户上传的视频和处理请求
- 任务队列模块:使用Redis存储待处理任务
- 任务分发模块:将任务分发给可用的处理节点
- 任务处理模块:使用FFmpeg执行具体的视频处理任务
- 状态查询模块:用户可以查询任务处理进度
- 结果存储模块:处理完成后存储结果
核心实现思路
1. 任务模型设计
首先定义一个任务模型:
public class VideoProcessTask {
private String taskId; // 任务ID
private String originalUrl; // 原始视频URL
private List<ProcessType> operations; // 处理操作列表
private String callbackUrl; // 回调URL
private String status; // 任务状态
private String resultUrl; // 处理结果URL
private long createTime; // 创建时间
private long updateTime; // 更新时间
}
2. 任务队列实现
使用Redis的List数据结构实现任务队列:
@Component
public class TaskQueueService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 添加任务到队列
public void addTask(VideoProcessTask task) {
redisTemplate.opsForList().leftPush("video_process_queue", task);
}
// 从队列获取任务
public VideoProcessTask getTask() {
return (VideoProcessTask) redisTemplate.opsForList()
.rightPop("video_process_queue", 30, TimeUnit.SECONDS);
}
// 更新任务状态
public void updateTaskStatus(String taskId, String status) {
String key = "task:" + taskId;
redisTemplate.opsForHash().put(key, "status", status);
redisTemplate.opsForHash().put(key, "updateTime", System.currentTimeMillis());
}
}
3. 异步处理任务
使用Spring的@Async注解实现异步处理:
@Service
public class VideoProcessService {
@Async("taskExecutor")
public CompletableFuture<String> processVideo(VideoProcessTask task) {
try {
// 更新任务状态为处理中
taskQueueService.updateTaskStatus(task.getTaskId(), "processing");
// 执行FFmpeg命令
String result = executeFFmpegCommand(task);
// 更新任务状态为完成
taskQueueService.updateTaskStatus(task.getTaskId(), "completed");
taskQueueService.updateTaskStatus(task.getTaskId(), "resultUrl", result);
// 发送回调
sendCallback(task.getCallbackUrl(), task.getTaskId(), "completed", result);
return CompletableFuture.completedFuture("success");
} catch (Exception e) {
// 更新任务状态为失败
taskQueueService.updateTaskStatus(task.getTaskId(), "failed");
return CompletableFuture.completedFuture("failed");
}
}
private String executeFFmpegCommand(VideoProcessTask task) {
// 构建FFmpeg命令
List<String> command = new ArrayList<>();
command.add("ffmpeg");
command.add("-i");
command.add(task.getOriginalUrl());
// 根据处理类型添加参数
for (ProcessType operation : task.getOperations()) {
switch (operation) {
case TRANSCODE:
command.add("-c:v");
command.add("libx264");
command.add("-c:a");
command.add("aac");
break;
case SCREENSHOT:
command.add("-ss");
command.add("00:00:10"); // 截取第10秒的画面
command.add("-vframes");
command.add("1");
break;
case WATERMARK:
command.add("-i");
command.add("watermark.png");
command.add("-filter_complex");
command.add("[1][0]overlay=10:10");
break;
}
}
// 输出文件
String outputPath = generateOutputPath(task.getTaskId());
command.add(outputPath);
// 执行命令
ProcessBuilder processBuilder = new ProcessBuilder(command);
Process process = processBuilder.start();
// 等待处理完成
int exitCode = process.waitFor();
if (exitCode != 0) {
throw new RuntimeException("FFmpeg processing failed");
}
return outputPath;
}
}
4. 任务调度器
创建一个任务调度器,持续从队列中取出任务进行处理:
@Component
public class TaskScheduler {
@Autowired
private TaskQueueService taskQueueService;
@Autowired
private VideoProcessService videoProcessService;
@Scheduled(fixedRate = 1000) // 每秒检查一次
public void scheduleTasks() {
// 检查是否有可用的处理线程
if (isProcessorAvailable()) {
VideoProcessTask task = taskQueueService.getTask();
if (task != null) {
videoProcessService.processVideo(task);
}
}
}
private boolean isProcessorAvailable() {
// 检查当前正在处理的任务数量
// 避免过多并发导致服务器负载过高
return getCurrentProcessingCount() < MAX_CONCURRENT_TASKS;
}
}
高级特性实现
1. 任务优先级
可以为不同类型的任务设置不同的优先级:
public enum TaskPriority {
LOW(1), // 普通任务
MEDIUM(2), // 中等优先级
HIGH(3); // 高优先级
private final int value;
TaskPriority(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}
2. 任务进度跟踪
通过FFmpeg的进度回调来跟踪处理进度:
private void executeFFmpegWithProgress(VideoProcessTask task) throws IOException, InterruptedException {
ProcessBuilder processBuilder = new ProcessBuilder("ffmpeg", "-i", task.getOriginalUrl(), ...);
Process process = processBuilder.start();
// 读取FFmpeg的进度信息
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
String line;
while ((line = reader.readLine()) != null) {
if (line.contains("frame=")) {
// 解析进度信息并更新到Redis
updateProgress(task.getTaskId(), parseProgress(line));
}
}
process.waitFor();
}
3. 容错与重试机制
实现任务失败后的重试机制:
@Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000))
public String processVideoWithRetry(VideoProcessTask task) {
return executeFFmpegCommand(task);
}
@Recover
public String recover(Exception ex, VideoProcessTask task) {
// 重试失败后的处理逻辑
taskQueueService.updateTaskStatus(task.getTaskId(), "failed_after_retry");
return "failed";
}
性能优化建议
- 合理设置并发数:根据服务器CPU核心数和内存大小设置合适的并发处理数
- 文件存储优化:使用分布式文件系统存储视频文件
- 内存管理:及时清理处理完成的临时文件
- 监控告警:监控任务队列长度、处理时间等关键指标
最佳实践
- 任务拆分:对于复杂的视频处理任务,可以拆分成多个子任务
- 资源隔离:为视频处理任务设置专门的资源池
- 降级策略:在高负载情况下,可以暂时停止非关键的视频处理任务
- 安全考虑:对上传的视频文件进行安全检查,防止恶意文件
总结
通过SpringBoot + FFmpeg + Redis的组合,我们可以搭建一个高性能的视频异步处理平台。关键在于:
- 合理架构:任务队列、异步处理、状态管理
- 性能优化:控制并发、资源管理、监控告警
- 容错处理:重试机制、降级策略、异常处理
记住,视频处理虽然复杂,但通过合理的架构设计和异步处理,完全可以做到高性能、高可用。掌握了这些技巧,你就能轻松应对各种视频处理需求,再也不用担心视频处理拖垮服务器了。
标题:SpringBoot + FFmpeg + Redis:视频转码、截图、水印异步处理平台搭建
作者:jiangyi
地址:http://jiangyi.space/articles/2025/12/30/1767073184185.html
0 评论