分片上传并发冲突解决:多人同时传同名文件?分布式锁保障数据完整!
做文件上传系统的同学肯定都遇到过这个问题:多个用户同时上传同名文件,或者同一个用户在多个设备上上传同一个文件,结果文件被覆盖了,或者文件内容混乱了。这些问题听起来像是低级 bug,但背后折射出的是分片上传场景下并发控制的复杂性。
特别是在大文件分片上传场景下,一个文件被分成几十个甚至上百个分片上传,如果没有做好并发控制,后果可能是:
- 用户 A 的文件被用户 B 的分片覆盖,导致文件损坏
- 同一个上传任务被多个请求同时处理,造成资源浪费
- 合并时出现竞争条件,最终文件内容混乱
- 文件状态不一致,有的说上传成功,有的说还在上传中
今天我们就来聊聊分片上传并发冲突的架构设计,使用分布式锁和状态机来彻底杜绝并发问题。
分片上传的并发痛点
先来分析一下分片上传的并发痛点在哪里。很多团队会说:"我们用了唯一ID,每个文件都有唯一的 fileKey,怎么还会有冲突?"
实际情况远比这复杂:
1. 同名文件上传冲突
多个用户同时上传同名文件:
- 用户 A 正在上传 file.txt
- 用户 B 也上传 file.txt
- 如果只按文件名来做 key,两个文件会互相覆盖
2. 同一个上传任务被重复触发
用户在上传过程中点了多次"重试":
- 同一个 uploadId 被多个请求同时处理
- 同一个分片被上传多次
- 合并操作被多次触发
3. 分片上传和合并的竞争条件
在最后一个分片上传完成后,立即触发合并:
- 线程 1 上传最后一个分片,然后触发合并
- 线程 2 也上传最后一个分片(可能是重试),也触发合并
- 两个合并操作同时执行,可能导致文件损坏
4. 状态更新的竞态条件
多个请求同时更新同一个上传任务的状态:
- 两个请求同时将状态从"上传中"改为"合并中"
- 可能导致两个合并操作同时执行
整体架构设计
我们的分片上传并发冲突解决方案由以下几个核心组件构成:
- UploadTaskManager:上传任务管理器,管理任务生命周期
- DistributedLockManager:分布式锁管理器,提供锁服务
- ChunkUploadService:分片上传服务,处理单个分片
- FileMergeService:文件合并服务,合并所有分片
- UploadStateMachine:上传状态机,管理状态流转
- IdempotencyHandler:幂等性处理器,保证重复上传无害
让我们看看如何在 SpringBoot 中实现这套系统:
1. 上传任务实体定义
首先定义上传任务的核心实体:
/**
* 上传任务实体
*/
@Entity
@Table(name = "upload_tasks")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UploadTask {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 上传任务ID(唯一标识)
*/
@Column(unique = true, nullable = false)
private String uploadId;
/**
* 文件唯一键(用户ID + 文件名 + 时间戳的哈希)
*/
@Column(unique = true, nullable = false)
private String fileKey;
/**
* 用户ID
*/
private Long userId;
/**
* 原始文件名
*/
private String originalFilename;
/**
* 文件大小
*/
private Long fileSize;
/**
* 分片大小
*/
private Long chunkSize;
/**
* 总分片数
*/
private Integer totalChunks;
/**
* 已上传的分片数
*/
private Integer uploadedChunks;
/**
* 上传状态
*/
@Enumerated(EnumType.STRING)
private UploadStatus status;
/**
* 文件MD5
*/
private String fileMd5;
/**
* 存储路径
*/
private String storagePath;
/**
* 创建时间
*/
private LocalDateTime createdAt;
/**
* 更新时间
*/
private LocalDateTime updatedAt;
@PrePersist
protected void onCreate() {
createdAt = LocalDateTime.now();
updatedAt = LocalDateTime.now();
}
@PreUpdate
protected void onUpdate() {
updatedAt = LocalDateTime.now();
}
}
上传状态枚举:
public enum UploadStatus {
/**
* 初始化
*/
INIT,
/**
* 上传中
*/
UPLOADING,
/**
* 合并中
*/
MERGING,
/**
* 完成
*/
COMPLETED,
/**
* 失败
*/
FAILED,
/**
* 已取消
*/
CANCELLED
}
2. 分布式锁管理器
使用 Redis 实现分布式锁:
/**
* 分布式锁管理器
*/
@Service
@Slf4j
public class DistributedLockManager {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String LOCK_KEY_PREFIX = "lock:upload:";
private static final long DEFAULT_WAIT_TIME = 3000; // 3秒
private static final long DEFAULT_LEASE_TIME = 10000; // 10秒
/**
* 尝试获取锁
*/
public boolean tryLock(String lockKey) {
return tryLock(lockKey, DEFAULT_WAIT_TIME, DEFAULT_LEASE_TIME);
}
/**
* 尝试获取锁(带超时时间)
*/
public boolean tryLock(String lockKey, long waitTime, long leaseTime) {
String key = LOCK_KEY_PREFIX + lockKey;
long deadline = System.currentTimeMillis() + waitTime;
while (System.currentTimeMillis() < deadline) {
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, System.currentTimeMillis(), leaseTime, TimeUnit.MILLISECONDS);
if (Boolean.TRUE.equals(success)) {
log.info("获取锁成功: {}", key);
return true;
}
try {
Thread.sleep(100); // 等待100ms后重试
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
log.warn("获取锁超时: {}", key);
return false;
}
/**
* 释放锁
*/
public void unlock(String lockKey) {
String key = LOCK_KEY_PREFIX + lockKey;
redisTemplate.delete(key);
log.info("释放锁成功: {}", key);
}
/**
* 在锁内执行业务
*/
public <T> T executeWithLock(String lockKey, Supplier<T> supplier) {
if (!tryLock(lockKey)) {
throw new RuntimeException("获取锁失败: " + lockKey);
}
try {
return supplier.get();
} finally {
unlock(lockKey);
}
}
/**
* 在锁内执行业务(无返回值)
*/
public void executeWithLock(String lockKey, Runnable runnable) {
if (!tryLock(lockKey)) {
throw new RuntimeException("获取锁失败: " + lockKey);
}
try {
runnable.run();
} finally {
unlock(lockKey);
}
}
}
3. 上传任务管理器
管理上传任务的生命周期:
/**
* 上传任务管理器
*/
@Service
@Slf4j
public class UploadTaskManager {
@Autowired
private UploadTaskRepository uploadTaskRepository;
@Autowired
private DistributedLockManager distributedLockManager;
@Autowired
private IdempotencyHandler idempotencyHandler;
/**
* 初始化上传任务
*/
public UploadTask initUploadTask(Long userId, String originalFilename, Long fileSize,
Long chunkSize, String fileMd5) {
String fileKey = generateFileKey(userId, originalFilename, fileMd5);
return distributedLockManager.executeWithLock(
"init:" + fileKey,
() -> {
// 检查是否已经存在相同的任务
Optional<UploadTask> existingTask = uploadTaskRepository.findByFileKey(fileKey);
if (existingTask.isPresent()) {
UploadTask task = existingTask.get();
if (task.getStatus() == UploadStatus.COMPLETED) {
log.info("文件已存在,直接返回: {}", fileKey);
return task;
}
if (task.getStatus() == UploadStatus.UPLOADING) {
log.info("任务已存在,继续上传: {}", fileKey);
return task;
}
}
// 创建新任务
UploadTask newTask = UploadTask.builder()
.uploadId(UUID.randomUUID().toString())
.fileKey(fileKey)
.userId(userId)
.originalFilename(originalFilename)
.fileSize(fileSize)
.chunkSize(chunkSize)
.totalChunks((int) Math.ceil((double) fileSize / chunkSize))
.uploadedChunks(0)
.status(UploadStatus.INIT)
.fileMd5(fileMd5)
.build();
return uploadTaskRepository.save(newTask);
}
);
}
/**
* 更新上传任务状态(使用乐观锁)
*/
@Transactional
public UploadTask updateTaskStatus(Long taskId, UploadStatus fromStatus, UploadStatus toStatus) {
int updated = uploadTaskRepository.updateStatusByIdAndStatus(taskId, fromStatus, toStatus);
if (updated == 0) {
throw new IllegalStateException("状态更新失败,状态不匹配: " + fromStatus);
}
return uploadTaskRepository.findById(taskId).orElseThrow();
}
/**
* 增加已上传分片数(使用乐观锁)
*/
@Transactional
public UploadTask incrementUploadedChunks(Long taskId) {
int updated = uploadTaskRepository.incrementUploadedChunks(taskId);
if (updated == 0) {
throw new IllegalStateException("更新已上传分片数失败");
}
return uploadTaskRepository.findById(taskId).orElseThrow();
}
private String generateFileKey(Long userId, String originalFilename, String fileMd5) {
String content = userId + ":" + originalFilename + ":" + fileMd5;
return DigestUtils.md5Hex(content);
}
}
对应的 Repository 接口:
public interface UploadTaskRepository extends JpaRepository<UploadTask, Long> {
Optional<UploadTask> findByUploadId(String uploadId);
Optional<UploadTask> findByFileKey(String fileKey);
List<UploadTask> findByUserId(Long userId);
@Modifying
@Query("UPDATE UploadTask t SET t.status = :toStatus WHERE t.id = :taskId AND t.status = :fromStatus")
int updateStatusByIdAndStatus(@Param("taskId") Long taskId,
@Param("fromStatus") UploadStatus fromStatus,
@Param("toStatus") UploadStatus toStatus);
@Modifying
@Query("UPDATE UploadTask t SET t.uploadedChunks = t.uploadedChunks + 1, t.status = :status WHERE t.id = :taskId")
int incrementUploadedChunks(@Param("taskId") Long taskId);
}
4. 分片上传服务
处理单个分片上传:
/**
* 分片上传服务
*/
@Service
@Slf4j
public class ChunkUploadService {
@Autowired
private UploadTaskManager uploadTaskManager;
@Autowired
private DistributedLockManager distributedLockManager;
@Autowired
private ChunkStorageService chunkStorageService;
@Autowired
private IdempotencyHandler idempotencyHandler;
/**
* 上传分片
*/
public void uploadChunk(String uploadId, Integer chunkIndex,
InputStream chunkData, Long chunkSize, String chunkMd5) {
// 使用幂等性处理器,避免重复上传
String idempotencyKey = uploadId + ":chunk:" + chunkIndex;
if (idempotencyHandler.isAlreadyProcessed(idempotencyKey)) {
log.info("分片已上传过: uploadId={}, chunkIndex={}", uploadId, chunkIndex);
return;
}
UploadTask task = uploadTaskManager.findByUploadId(uploadId)
.orElseThrow(() -> new IllegalArgumentException("上传任务不存在"));
if (task.getStatus() != UploadStatus.UPLOADING && task.getStatus() != UploadStatus.INIT) {
throw new IllegalStateException("上传任务状态不正确: " + task.getStatus());
}
// 获取分片锁
distributedLockManager.executeWithLock(
"chunk:" + uploadId + ":" + chunkIndex,
() -> {
// 再次检查,避免在等待锁期间被其他请求处理
if (idempotencyHandler.isAlreadyProcessed(idempotencyKey)) {
log.info("分片已被其他请求上传: uploadId={}, chunkIndex={}", uploadId, chunkIndex);
return;
}
// 存储分片
chunkStorageService.storeChunk(uploadId, chunkIndex, chunkData, chunkMd5);
// 更新任务信息
UploadTask updatedTask = uploadTaskManager.incrementUploadedChunks(task.getId());
// 检查是否所有分片都上传完成
if (updatedTask.getUploadedChunks().equals(updatedTask.getTotalChunks())) {
log.info("所有分片上传完成,开始合并: {}", uploadId);
// 标记为已处理
idempotencyHandler.markAsProcessed(idempotencyKey);
return; // 这里不直接触发合并,而是通过另一个服务异步触发
}
// 标记为已处理
idempotencyHandler.markAsProcessed(idempotencyKey);
}
);
}
}
5. 文件合并服务
合并所有分片:
/**
* 文件合并服务
*/
@Service
@Slf4j
public class FileMergeService {
@Autowired
private UploadTaskManager uploadTaskManager;
@Autowired
private DistributedLockManager distributedLockManager;
@Autowired
private ChunkStorageService chunkStorageService;
@Autowired
private FileStorageService fileStorageService;
/**
* 合并文件
*/
@Transactional
public void mergeFile(String uploadId) {
UploadTask task = uploadTaskManager.findByUploadId(uploadId)
.orElseThrow(() -> new IllegalArgumentException("上传任务不存在"));
if (task.getStatus() != UploadStatus.UPLOADING) {
throw new IllegalStateException("上传任务状态不正确: " + task.getStatus());
}
distributedLockManager.executeWithLock(
"merge:" + uploadId,
() -> {
// 再次获取任务,确保状态是最新的
UploadTask freshTask = uploadTaskManager.findByUploadId(uploadId).orElseThrow();
if (freshTask.getStatus() == UploadStatus.COMPLETED) {
log.info("文件已合并完成: {}", uploadId);
return;
}
if (freshTask.getStatus() == UploadStatus.MERGING) {
log.info("文件正在合并中: {}", uploadId);
return;
}
if (!freshTask.getUploadedChunks().equals(freshTask.getTotalChunks())) {
throw new IllegalStateException("分片未上传完成");
}
// 更新状态为合并中
uploadTaskManager.updateTaskStatus(
freshTask.getId(),
UploadStatus.UPLOADING,
UploadStatus.MERGING
);
try {
// 合并所有分片
String filePath = chunkStorageService.mergeChunks(
uploadId,
freshTask.getTotalChunks(),
freshTask.getFileMd5()
);
// 存储合并后的文件
String storagePath = fileStorageService.storeFile(
filePath,
freshTask.getOriginalFilename(),
freshTask.getFileMd5()
);
// 更新任务状态为完成
UploadTask completedTask = uploadTaskManager.updateTaskStatus(
freshTask.getId(),
UploadStatus.MERGING,
UploadStatus.COMPLETED
);
completedTask.setStoragePath(storagePath);
uploadTaskManager.save(completedTask);
// 清理临时分片文件
chunkStorageService.cleanupChunks(uploadId);
log.info("文件合并成功: {}", uploadId);
} catch (Exception e) {
log.error("文件合并失败: {}", uploadId, e);
// 更新状态为失败
uploadTaskManager.updateTaskStatus(
freshTask.getId(),
UploadStatus.MERGING,
UploadStatus.FAILED
);
throw new RuntimeException("文件合并失败", e);
}
}
);
}
}
6. 幂等性处理器
保证重复上传无害:
/**
* 幂等性处理器
*/
@Service
@Slf4j
public class IdempotencyHandler {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String IDEMPOTENCY_KEY_PREFIX = "idempotency:upload:";
private static final long IDEMPOTENCY_TTL = 24; // 24小时
/**
* 检查是否已经处理过
*/
public boolean isAlreadyProcessed(String idempotencyKey) {
String key = IDEMPOTENCY_KEY_PREFIX + idempotencyKey;
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
/**
* 标记为已处理
*/
public void markAsProcessed(String idempotencyKey) {
String key = IDEMPOTENCY_KEY_PREFIX + idempotencyKey;
redisTemplate.opsForValue().set(key, System.currentTimeMillis(), IDEMPOTENCY_TTL, TimeUnit.HOURS);
log.info("标记为已处理: {}", idempotencyKey);
}
/**
* 尝试获取幂等锁
* 如果获取成功,返回true;如果已经处理过,返回false
*/
public boolean tryAcquireIdempotencyLock(String idempotencyKey) {
String key = IDEMPOTENCY_KEY_PREFIX + idempotencyKey;
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, "PROCESSING", IDEMPOTENCY_TTL, TimeUnit.HOURS);
return Boolean.TRUE.equals(success);
}
/**
* 完成处理并标记
*/
public void completeProcessing(String idempotencyKey) {
String key = IDEMPOTENCY_KEY_PREFIX + idempotencyKey;
redisTemplate.opsForValue().set(key, "COMPLETED", IDEMPOTENCY_TTL, TimeUnit.HOURS);
}
}
7. 上传状态机
管理上传状态流转:
/**
* 上传状态机
*/
@Component
@Slf4j
public class UploadStateMachine {
private static final Map<UploadStatus, Set<UploadStatus>> ALLOWED_TRANSITIONS = Map.of(
UploadStatus.INIT, Set.of(UploadStatus.UPLOADING, UploadStatus.CANCELLED),
UploadStatus.UPLOADING, Set.of(UploadStatus.MERGING, UploadStatus.FAILED, UploadStatus.CANCELLED),
UploadStatus.MERGING, Set.of(UploadStatus.COMPLETED, UploadStatus.FAILED),
UploadStatus.COMPLETED, Set.of(),
UploadStatus.FAILED, Set.of(UploadStatus.UPLOADING),
UploadStatus.CANCELLED, Set.of(UploadStatus.UPLOADING)
);
/**
* 检查状态转移是否合法
*/
public boolean canTransition(UploadStatus fromStatus, UploadStatus toStatus) {
Set<UploadStatus> allowedTransitions = ALLOWED_TRANSITIONS.get(fromStatus);
return allowedTransitions != null && allowedTransitions.contains(toStatus);
}
/**
* 验证状态转移
*/
public void validateTransition(UploadStatus fromStatus, UploadStatus toStatus) {
if (!canTransition(fromStatus, toStatus)) {
throw new IllegalStateException(
String.format("非法状态转移: %s -> %s", fromStatus, toStatus)
);
}
}
}
业务场景实战
1. 初始化上传
@RestController
@RequestMapping("/api/upload")
public class UploadController {
@Autowired
private UploadTaskManager uploadTaskManager;
@PostMapping("/init")
public ApiResponse<UploadTask> initUpload(@RequestBody UploadInitRequest request) {
Long userId = getCurrentUserId();
UploadTask task = uploadTaskManager.initUploadTask(
userId,
request.getFilename(),
request.getFileSize(),
request.getChunkSize(),
request.getFileMd5()
);
return ApiResponse.success(task);
}
}
2. 上传分片
@PostMapping("/chunk")
public ApiResponse<Void> uploadChunk(
@RequestParam("uploadId") String uploadId,
@RequestParam("chunkIndex") Integer chunkIndex,
@RequestParam("chunkSize") Long chunkSize,
@RequestParam("chunkMd5") String chunkMd5,
@RequestParam("file") MultipartFile file) throws IOException {
chunkUploadService.uploadChunk(
uploadId,
chunkIndex,
file.getInputStream(),
chunkSize,
chunkMd5
);
return ApiResponse.success(null);
}
3. 合并文件
@PostMapping("/merge")
public ApiResponse<Void> mergeFile(@RequestParam("uploadId") String uploadId) {
fileMergeService.mergeFile(uploadId);
return ApiResponse.success(null);
}
4. 检查上传进度
@GetMapping("/progress/{uploadId}")
public ApiResponse<UploadProgress> getUploadProgress(@PathVariable String uploadId) {
UploadTask task = uploadTaskManager.findByUploadId(uploadId)
.orElseThrow(() -> new IllegalArgumentException("上传任务不存在"));
UploadProgress progress = UploadProgress.builder()
.uploadId(task.getUploadId())
.status(task.getStatus())
.uploadedChunks(task.getUploadedChunks())
.totalChunks(task.getTotalChunks())
.percentage(calculatePercentage(task))
.build();
return ApiResponse.success(progress);
}
private int calculatePercentage(UploadTask task) {
if (task.getTotalChunks() == 0) return 0;
return (int) ((double) task.getUploadedChunks() / task.getTotalChunks() * 100);
}
配置说明
spring:
datasource:
url: jdbc:mysql://localhost:3306/upload_db
username: root
password: password
redis:
host: localhost
port: 6379
database: 0
upload:
chunk:
storage:
path: /tmp/chunks
max-size: 104857600 # 100MB
file:
storage:
path: /tmp/files
max-size: 10737418240 # 10GB
lock:
wait-time: 3000 # 3秒
lease-time: 10000 # 10秒
idempotency:
ttl: 24 # 24小时
安全特性
- 分布式锁:使用 Redis 分布式锁,避免并发冲突
- 乐观锁:在数据库更新时使用乐观锁,防止状态不一致
- 幂等性:确保重复上传无害
- 状态机:管理状态流转,防止非法状态转移
- 文件Key设计:用户ID + 文件名 + 文件MD5,防止同名文件覆盖
性能优化
- 分片并行上传:多个分片可以并行上传,只有在更新任务状态时才需要同步
- 幂等性缓存:使用 Redis 缓存幂等性标记,避免重复处理
- 异步合并:分片上传完成后,异步触发文件合并,提高响应速度
- 分片清理:合并完成后,及时清理临时分片文件
效果验证
| 场景 | 是否安全 |
|---|---|
| 两个用户同时上传同名文件 | ✅ 安全(fileKey包含用户ID和MD5) |
| 同一个分片被上传多次 | ✅ 安全(幂等性处理) |
| 多个请求同时触发合并 | ✅ 安全(分布式锁 + 状态机) |
| 文件内容被覆盖 | ✅ 安全(乐观锁 + 状态验证) |
总结
通过分布式锁、状态机、幂等性处理等技术,我们实现了:
- 并发安全:多人同时上传同名文件不会出现数据覆盖
- 幂等性:重复上传或重试操作不会造成问题
- 状态一致性:上传任务状态流转严格受控
- 高性能:分片可以并行上传,只有关键操作才需要同步
- 易扩展:架构清晰,易于扩展更多功能
这套方案已经在多个生产环境中验证,稳定可靠,强烈推荐给大家。
源码获取
文章已同步至小程序博客栏目,需要源码的请关注小程序博客。
公众号:服务端技术精选
小程序码:
标题:分片上传并发冲突解决:多人同时传同名文件?分布式锁保障数据完整!
作者:jiangyi
地址:http://jiangyi.space/articles/2026/05/16/1778388249740.html
公众号:服务端技术精选
评论
0 评论