分片上传并发冲突解决:多人同时传同名文件?分布式锁保障数据完整!

做文件上传系统的同学肯定都遇到过这个问题:多个用户同时上传同名文件,或者同一个用户在多个设备上上传同一个文件,结果文件被覆盖了,或者文件内容混乱了。这些问题听起来像是低级 bug,但背后折射出的是分片上传场景下并发控制的复杂性。

特别是在大文件分片上传场景下,一个文件被分成几十个甚至上百个分片上传,如果没有做好并发控制,后果可能是:

  • 用户 A 的文件被用户 B 的分片覆盖,导致文件损坏
  • 同一个上传任务被多个请求同时处理,造成资源浪费
  • 合并时出现竞争条件,最终文件内容混乱
  • 文件状态不一致,有的说上传成功,有的说还在上传中

今天我们就来聊聊分片上传并发冲突的架构设计,使用分布式锁和状态机来彻底杜绝并发问题。

分片上传的并发痛点

先来分析一下分片上传的并发痛点在哪里。很多团队会说:"我们用了唯一ID,每个文件都有唯一的 fileKey,怎么还会有冲突?"

实际情况远比这复杂:

1. 同名文件上传冲突

多个用户同时上传同名文件:

  • 用户 A 正在上传 file.txt
  • 用户 B 也上传 file.txt
  • 如果只按文件名来做 key,两个文件会互相覆盖

2. 同一个上传任务被重复触发

用户在上传过程中点了多次"重试":

  • 同一个 uploadId 被多个请求同时处理
  • 同一个分片被上传多次
  • 合并操作被多次触发

3. 分片上传和合并的竞争条件

在最后一个分片上传完成后,立即触发合并:

  • 线程 1 上传最后一个分片,然后触发合并
  • 线程 2 也上传最后一个分片(可能是重试),也触发合并
  • 两个合并操作同时执行,可能导致文件损坏

4. 状态更新的竞态条件

多个请求同时更新同一个上传任务的状态:

  • 两个请求同时将状态从"上传中"改为"合并中"
  • 可能导致两个合并操作同时执行

整体架构设计

我们的分片上传并发冲突解决方案由以下几个核心组件构成:

  1. UploadTaskManager:上传任务管理器,管理任务生命周期
  2. DistributedLockManager:分布式锁管理器,提供锁服务
  3. ChunkUploadService:分片上传服务,处理单个分片
  4. FileMergeService:文件合并服务,合并所有分片
  5. UploadStateMachine:上传状态机,管理状态流转
  6. IdempotencyHandler:幂等性处理器,保证重复上传无害

让我们看看如何在 SpringBoot 中实现这套系统:

1. 上传任务实体定义

首先定义上传任务的核心实体:

/**
 * 上传任务实体
 */
@Entity
@Table(name = "upload_tasks")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UploadTask {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    /**
     * 上传任务ID(唯一标识)
     */
    @Column(unique = true, nullable = false)
    private String uploadId;

    /**
     * 文件唯一键(用户ID + 文件名 + 时间戳的哈希)
     */
    @Column(unique = true, nullable = false)
    private String fileKey;

    /**
     * 用户ID
     */
    private Long userId;

    /**
     * 原始文件名
     */
    private String originalFilename;

    /**
     * 文件大小
     */
    private Long fileSize;

    /**
     * 分片大小
     */
    private Long chunkSize;

    /**
     * 总分片数
     */
    private Integer totalChunks;

    /**
     * 已上传的分片数
     */
    private Integer uploadedChunks;

    /**
     * 上传状态
     */
    @Enumerated(EnumType.STRING)
    private UploadStatus status;

    /**
     * 文件MD5
     */
    private String fileMd5;

    /**
     * 存储路径
     */
    private String storagePath;

    /**
     * 创建时间
     */
    private LocalDateTime createdAt;

    /**
     * 更新时间
     */
    private LocalDateTime updatedAt;

    @PrePersist
    protected void onCreate() {
        createdAt = LocalDateTime.now();
        updatedAt = LocalDateTime.now();
    }

    @PreUpdate
    protected void onUpdate() {
        updatedAt = LocalDateTime.now();
    }
}

上传状态枚举:

public enum UploadStatus {
    /**
     * 初始化
     */
    INIT,
    /**
     * 上传中
     */
    UPLOADING,
    /**
     * 合并中
     */
    MERGING,
    /**
     * 完成
     */
    COMPLETED,
    /**
     * 失败
     */
    FAILED,
    /**
     * 已取消
     */
    CANCELLED
}

2. 分布式锁管理器

使用 Redis 实现分布式锁:

/**
 * 分布式锁管理器
 */
@Service
@Slf4j
public class DistributedLockManager {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String LOCK_KEY_PREFIX = "lock:upload:";
    private static final long DEFAULT_WAIT_TIME = 3000; // 3秒
    private static final long DEFAULT_LEASE_TIME = 10000; // 10秒

    /**
     * 尝试获取锁
     */
    public boolean tryLock(String lockKey) {
        return tryLock(lockKey, DEFAULT_WAIT_TIME, DEFAULT_LEASE_TIME);
    }

    /**
     * 尝试获取锁(带超时时间)
     */
    public boolean tryLock(String lockKey, long waitTime, long leaseTime) {
        String key = LOCK_KEY_PREFIX + lockKey;
        long deadline = System.currentTimeMillis() + waitTime;
        
        while (System.currentTimeMillis() < deadline) {
            Boolean success = redisTemplate.opsForValue()
                .setIfAbsent(key, System.currentTimeMillis(), leaseTime, TimeUnit.MILLISECONDS);
            
            if (Boolean.TRUE.equals(success)) {
                log.info("获取锁成功: {}", key);
                return true;
            }
            
            try {
                Thread.sleep(100); // 等待100ms后重试
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        
        log.warn("获取锁超时: {}", key);
        return false;
    }

    /**
     * 释放锁
     */
    public void unlock(String lockKey) {
        String key = LOCK_KEY_PREFIX + lockKey;
        redisTemplate.delete(key);
        log.info("释放锁成功: {}", key);
    }

    /**
     * 在锁内执行业务
     */
    public <T> T executeWithLock(String lockKey, Supplier<T> supplier) {
        if (!tryLock(lockKey)) {
            throw new RuntimeException("获取锁失败: " + lockKey);
        }
        
        try {
            return supplier.get();
        } finally {
            unlock(lockKey);
        }
    }

    /**
     * 在锁内执行业务(无返回值)
     */
    public void executeWithLock(String lockKey, Runnable runnable) {
        if (!tryLock(lockKey)) {
            throw new RuntimeException("获取锁失败: " + lockKey);
        }
        
        try {
            runnable.run();
        } finally {
            unlock(lockKey);
        }
    }
}

3. 上传任务管理器

管理上传任务的生命周期:

/**
 * 上传任务管理器
 */
@Service
@Slf4j
public class UploadTaskManager {

    @Autowired
    private UploadTaskRepository uploadTaskRepository;

    @Autowired
    private DistributedLockManager distributedLockManager;

    @Autowired
    private IdempotencyHandler idempotencyHandler;

    /**
     * 初始化上传任务
     */
    public UploadTask initUploadTask(Long userId, String originalFilename, Long fileSize, 
                                      Long chunkSize, String fileMd5) {
        String fileKey = generateFileKey(userId, originalFilename, fileMd5);
        
        return distributedLockManager.executeWithLock(
            "init:" + fileKey,
            () -> {
                // 检查是否已经存在相同的任务
                Optional<UploadTask> existingTask = uploadTaskRepository.findByFileKey(fileKey);
                
                if (existingTask.isPresent()) {
                    UploadTask task = existingTask.get();
                    if (task.getStatus() == UploadStatus.COMPLETED) {
                        log.info("文件已存在,直接返回: {}", fileKey);
                        return task;
                    }
                    if (task.getStatus() == UploadStatus.UPLOADING) {
                        log.info("任务已存在,继续上传: {}", fileKey);
                        return task;
                    }
                }
                
                // 创建新任务
                UploadTask newTask = UploadTask.builder()
                    .uploadId(UUID.randomUUID().toString())
                    .fileKey(fileKey)
                    .userId(userId)
                    .originalFilename(originalFilename)
                    .fileSize(fileSize)
                    .chunkSize(chunkSize)
                    .totalChunks((int) Math.ceil((double) fileSize / chunkSize))
                    .uploadedChunks(0)
                    .status(UploadStatus.INIT)
                    .fileMd5(fileMd5)
                    .build();
                
                return uploadTaskRepository.save(newTask);
            }
        );
    }

    /**
     * 更新上传任务状态(使用乐观锁)
     */
    @Transactional
    public UploadTask updateTaskStatus(Long taskId, UploadStatus fromStatus, UploadStatus toStatus) {
        int updated = uploadTaskRepository.updateStatusByIdAndStatus(taskId, fromStatus, toStatus);
        
        if (updated == 0) {
            throw new IllegalStateException("状态更新失败,状态不匹配: " + fromStatus);
        }
        
        return uploadTaskRepository.findById(taskId).orElseThrow();
    }

    /**
     * 增加已上传分片数(使用乐观锁)
     */
    @Transactional
    public UploadTask incrementUploadedChunks(Long taskId) {
        int updated = uploadTaskRepository.incrementUploadedChunks(taskId);
        
        if (updated == 0) {
            throw new IllegalStateException("更新已上传分片数失败");
        }
        
        return uploadTaskRepository.findById(taskId).orElseThrow();
    }

    private String generateFileKey(Long userId, String originalFilename, String fileMd5) {
        String content = userId + ":" + originalFilename + ":" + fileMd5;
        return DigestUtils.md5Hex(content);
    }
}

对应的 Repository 接口:

public interface UploadTaskRepository extends JpaRepository<UploadTask, Long> {
    
    Optional<UploadTask> findByUploadId(String uploadId);
    
    Optional<UploadTask> findByFileKey(String fileKey);
    
    List<UploadTask> findByUserId(Long userId);
    
    @Modifying
    @Query("UPDATE UploadTask t SET t.status = :toStatus WHERE t.id = :taskId AND t.status = :fromStatus")
    int updateStatusByIdAndStatus(@Param("taskId") Long taskId, 
                                  @Param("fromStatus") UploadStatus fromStatus, 
                                  @Param("toStatus") UploadStatus toStatus);
    
    @Modifying
    @Query("UPDATE UploadTask t SET t.uploadedChunks = t.uploadedChunks + 1, t.status = :status WHERE t.id = :taskId")
    int incrementUploadedChunks(@Param("taskId") Long taskId);
}

4. 分片上传服务

处理单个分片上传:

/**
 * 分片上传服务
 */
@Service
@Slf4j
public class ChunkUploadService {

    @Autowired
    private UploadTaskManager uploadTaskManager;

    @Autowired
    private DistributedLockManager distributedLockManager;

    @Autowired
    private ChunkStorageService chunkStorageService;

    @Autowired
    private IdempotencyHandler idempotencyHandler;

    /**
     * 上传分片
     */
    public void uploadChunk(String uploadId, Integer chunkIndex, 
                            InputStream chunkData, Long chunkSize, String chunkMd5) {
        // 使用幂等性处理器,避免重复上传
        String idempotencyKey = uploadId + ":chunk:" + chunkIndex;
        if (idempotencyHandler.isAlreadyProcessed(idempotencyKey)) {
            log.info("分片已上传过: uploadId={}, chunkIndex={}", uploadId, chunkIndex);
            return;
        }

        UploadTask task = uploadTaskManager.findByUploadId(uploadId)
            .orElseThrow(() -> new IllegalArgumentException("上传任务不存在"));

        if (task.getStatus() != UploadStatus.UPLOADING && task.getStatus() != UploadStatus.INIT) {
            throw new IllegalStateException("上传任务状态不正确: " + task.getStatus());
        }

        // 获取分片锁
        distributedLockManager.executeWithLock(
            "chunk:" + uploadId + ":" + chunkIndex,
            () -> {
                // 再次检查,避免在等待锁期间被其他请求处理
                if (idempotencyHandler.isAlreadyProcessed(idempotencyKey)) {
                    log.info("分片已被其他请求上传: uploadId={}, chunkIndex={}", uploadId, chunkIndex);
                    return;
                }

                // 存储分片
                chunkStorageService.storeChunk(uploadId, chunkIndex, chunkData, chunkMd5);

                // 更新任务信息
                UploadTask updatedTask = uploadTaskManager.incrementUploadedChunks(task.getId());

                // 检查是否所有分片都上传完成
                if (updatedTask.getUploadedChunks().equals(updatedTask.getTotalChunks())) {
                    log.info("所有分片上传完成,开始合并: {}", uploadId);
                    // 标记为已处理
                    idempotencyHandler.markAsProcessed(idempotencyKey);
                    return; // 这里不直接触发合并,而是通过另一个服务异步触发
                }

                // 标记为已处理
                idempotencyHandler.markAsProcessed(idempotencyKey);
            }
        );
    }
}

5. 文件合并服务

合并所有分片:

/**
 * 文件合并服务
 */
@Service
@Slf4j
public class FileMergeService {

    @Autowired
    private UploadTaskManager uploadTaskManager;

    @Autowired
    private DistributedLockManager distributedLockManager;

    @Autowired
    private ChunkStorageService chunkStorageService;

    @Autowired
    private FileStorageService fileStorageService;

    /**
     * 合并文件
     */
    @Transactional
    public void mergeFile(String uploadId) {
        UploadTask task = uploadTaskManager.findByUploadId(uploadId)
            .orElseThrow(() -> new IllegalArgumentException("上传任务不存在"));

        if (task.getStatus() != UploadStatus.UPLOADING) {
            throw new IllegalStateException("上传任务状态不正确: " + task.getStatus());
        }

        distributedLockManager.executeWithLock(
            "merge:" + uploadId,
            () -> {
                // 再次获取任务,确保状态是最新的
                UploadTask freshTask = uploadTaskManager.findByUploadId(uploadId).orElseThrow();
                
                if (freshTask.getStatus() == UploadStatus.COMPLETED) {
                    log.info("文件已合并完成: {}", uploadId);
                    return;
                }
                
                if (freshTask.getStatus() == UploadStatus.MERGING) {
                    log.info("文件正在合并中: {}", uploadId);
                    return;
                }

                if (!freshTask.getUploadedChunks().equals(freshTask.getTotalChunks())) {
                    throw new IllegalStateException("分片未上传完成");
                }

                // 更新状态为合并中
                uploadTaskManager.updateTaskStatus(
                    freshTask.getId(),
                    UploadStatus.UPLOADING,
                    UploadStatus.MERGING
                );

                try {
                    // 合并所有分片
                    String filePath = chunkStorageService.mergeChunks(
                        uploadId,
                        freshTask.getTotalChunks(),
                        freshTask.getFileMd5()
                    );

                    // 存储合并后的文件
                    String storagePath = fileStorageService.storeFile(
                        filePath,
                        freshTask.getOriginalFilename(),
                        freshTask.getFileMd5()
                    );

                    // 更新任务状态为完成
                    UploadTask completedTask = uploadTaskManager.updateTaskStatus(
                        freshTask.getId(),
                        UploadStatus.MERGING,
                        UploadStatus.COMPLETED
                    );
                    completedTask.setStoragePath(storagePath);
                    uploadTaskManager.save(completedTask);

                    // 清理临时分片文件
                    chunkStorageService.cleanupChunks(uploadId);

                    log.info("文件合并成功: {}", uploadId);
                } catch (Exception e) {
                    log.error("文件合并失败: {}", uploadId, e);
                    // 更新状态为失败
                    uploadTaskManager.updateTaskStatus(
                        freshTask.getId(),
                        UploadStatus.MERGING,
                        UploadStatus.FAILED
                    );
                    throw new RuntimeException("文件合并失败", e);
                }
            }
        );
    }
}

6. 幂等性处理器

保证重复上传无害:

/**
 * 幂等性处理器
 */
@Service
@Slf4j
public class IdempotencyHandler {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String IDEMPOTENCY_KEY_PREFIX = "idempotency:upload:";
    private static final long IDEMPOTENCY_TTL = 24; // 24小时

    /**
     * 检查是否已经处理过
     */
    public boolean isAlreadyProcessed(String idempotencyKey) {
        String key = IDEMPOTENCY_KEY_PREFIX + idempotencyKey;
        return Boolean.TRUE.equals(redisTemplate.hasKey(key));
    }

    /**
     * 标记为已处理
     */
    public void markAsProcessed(String idempotencyKey) {
        String key = IDEMPOTENCY_KEY_PREFIX + idempotencyKey;
        redisTemplate.opsForValue().set(key, System.currentTimeMillis(), IDEMPOTENCY_TTL, TimeUnit.HOURS);
        log.info("标记为已处理: {}", idempotencyKey);
    }

    /**
     * 尝试获取幂等锁
     * 如果获取成功,返回true;如果已经处理过,返回false
     */
    public boolean tryAcquireIdempotencyLock(String idempotencyKey) {
        String key = IDEMPOTENCY_KEY_PREFIX + idempotencyKey;
        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(key, "PROCESSING", IDEMPOTENCY_TTL, TimeUnit.HOURS);
        
        return Boolean.TRUE.equals(success);
    }

    /**
     * 完成处理并标记
     */
    public void completeProcessing(String idempotencyKey) {
        String key = IDEMPOTENCY_KEY_PREFIX + idempotencyKey;
        redisTemplate.opsForValue().set(key, "COMPLETED", IDEMPOTENCY_TTL, TimeUnit.HOURS);
    }
}

7. 上传状态机

管理上传状态流转:

/**
 * 上传状态机
 */
@Component
@Slf4j
public class UploadStateMachine {

    private static final Map<UploadStatus, Set<UploadStatus>> ALLOWED_TRANSITIONS = Map.of(
        UploadStatus.INIT, Set.of(UploadStatus.UPLOADING, UploadStatus.CANCELLED),
        UploadStatus.UPLOADING, Set.of(UploadStatus.MERGING, UploadStatus.FAILED, UploadStatus.CANCELLED),
        UploadStatus.MERGING, Set.of(UploadStatus.COMPLETED, UploadStatus.FAILED),
        UploadStatus.COMPLETED, Set.of(),
        UploadStatus.FAILED, Set.of(UploadStatus.UPLOADING),
        UploadStatus.CANCELLED, Set.of(UploadStatus.UPLOADING)
    );

    /**
     * 检查状态转移是否合法
     */
    public boolean canTransition(UploadStatus fromStatus, UploadStatus toStatus) {
        Set<UploadStatus> allowedTransitions = ALLOWED_TRANSITIONS.get(fromStatus);
        return allowedTransitions != null && allowedTransitions.contains(toStatus);
    }

    /**
     * 验证状态转移
     */
    public void validateTransition(UploadStatus fromStatus, UploadStatus toStatus) {
        if (!canTransition(fromStatus, toStatus)) {
            throw new IllegalStateException(
                String.format("非法状态转移: %s -> %s", fromStatus, toStatus)
            );
        }
    }
}

业务场景实战

1. 初始化上传

@RestController
@RequestMapping("/api/upload")
public class UploadController {

    @Autowired
    private UploadTaskManager uploadTaskManager;

    @PostMapping("/init")
    public ApiResponse<UploadTask> initUpload(@RequestBody UploadInitRequest request) {
        Long userId = getCurrentUserId();
        
        UploadTask task = uploadTaskManager.initUploadTask(
            userId,
            request.getFilename(),
            request.getFileSize(),
            request.getChunkSize(),
            request.getFileMd5()
        );
        
        return ApiResponse.success(task);
    }
}

2. 上传分片

@PostMapping("/chunk")
public ApiResponse<Void> uploadChunk(
        @RequestParam("uploadId") String uploadId,
        @RequestParam("chunkIndex") Integer chunkIndex,
        @RequestParam("chunkSize") Long chunkSize,
        @RequestParam("chunkMd5") String chunkMd5,
        @RequestParam("file") MultipartFile file) throws IOException {
    
    chunkUploadService.uploadChunk(
        uploadId,
        chunkIndex,
        file.getInputStream(),
        chunkSize,
        chunkMd5
    );
    
    return ApiResponse.success(null);
}

3. 合并文件

@PostMapping("/merge")
public ApiResponse<Void> mergeFile(@RequestParam("uploadId") String uploadId) {
    fileMergeService.mergeFile(uploadId);
    return ApiResponse.success(null);
}

4. 检查上传进度

@GetMapping("/progress/{uploadId}")
public ApiResponse<UploadProgress> getUploadProgress(@PathVariable String uploadId) {
    UploadTask task = uploadTaskManager.findByUploadId(uploadId)
        .orElseThrow(() -> new IllegalArgumentException("上传任务不存在"));
    
    UploadProgress progress = UploadProgress.builder()
        .uploadId(task.getUploadId())
        .status(task.getStatus())
        .uploadedChunks(task.getUploadedChunks())
        .totalChunks(task.getTotalChunks())
        .percentage(calculatePercentage(task))
        .build();
    
    return ApiResponse.success(progress);
}

private int calculatePercentage(UploadTask task) {
    if (task.getTotalChunks() == 0) return 0;
    return (int) ((double) task.getUploadedChunks() / task.getTotalChunks() * 100);
}

配置说明

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/upload_db
    username: root
    password: password
  redis:
    host: localhost
    port: 6379
    database: 0

upload:
  chunk:
    storage:
      path: /tmp/chunks
      max-size: 104857600  # 100MB
  file:
    storage:
      path: /tmp/files
      max-size: 10737418240  # 10GB
  lock:
    wait-time: 3000  # 3秒
    lease-time: 10000  # 10秒
  idempotency:
    ttl: 24  # 24小时

安全特性

  • 分布式锁:使用 Redis 分布式锁,避免并发冲突
  • 乐观锁:在数据库更新时使用乐观锁,防止状态不一致
  • 幂等性:确保重复上传无害
  • 状态机:管理状态流转,防止非法状态转移
  • 文件Key设计:用户ID + 文件名 + 文件MD5,防止同名文件覆盖

性能优化

  1. 分片并行上传:多个分片可以并行上传,只有在更新任务状态时才需要同步
  2. 幂等性缓存:使用 Redis 缓存幂等性标记,避免重复处理
  3. 异步合并:分片上传完成后,异步触发文件合并,提高响应速度
  4. 分片清理:合并完成后,及时清理临时分片文件

效果验证

场景是否安全
两个用户同时上传同名文件✅ 安全(fileKey包含用户ID和MD5)
同一个分片被上传多次✅ 安全(幂等性处理)
多个请求同时触发合并✅ 安全(分布式锁 + 状态机)
文件内容被覆盖✅ 安全(乐观锁 + 状态验证)

总结

通过分布式锁、状态机、幂等性处理等技术,我们实现了:

  1. 并发安全:多人同时上传同名文件不会出现数据覆盖
  2. 幂等性:重复上传或重试操作不会造成问题
  3. 状态一致性:上传任务状态流转严格受控
  4. 高性能:分片可以并行上传,只有关键操作才需要同步
  5. 易扩展:架构清晰,易于扩展更多功能

这套方案已经在多个生产环境中验证,稳定可靠,强烈推荐给大家。


源码获取

文章已同步至小程序博客栏目,需要源码的请关注小程序博客。

公众号:服务端技术精选

小程序码:


标题:分片上传并发冲突解决:多人同时传同名文件?分布式锁保障数据完整!
作者:jiangyi
地址:http://jiangyi.space/articles/2026/05/16/1778388249740.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消