SpringBoot + 异步任务结果持久化 + 查询接口:用户可随时查看长时间任务进度与结果

前言

你是否遇到过这样的场景:

  • 用户上传一个 1GB 的 Excel 文件,需要 5 分钟才能处理完成
  • 导出 10 万条数据到 Excel,需要等待 2 分钟
  • 批量处理 1000 个订单,每个订单需要调用 3 个第三方接口

这些长时间运行的任务,如果让用户一直等待页面响应,体验极差。更糟糕的是,如果系统崩溃或重启,任务进度全部丢失,用户需要重新提交。

今天要介绍的「异步任务结果持久化 + 查询接口」方案,将彻底解决这个问题——任务进度实时可查,系统重启不丢失


一、传统异步任务的痛点

场景重现

产品经理说:「用户需要导出 10 万条订单数据,这个功能要尽快上线。」

你很快写出了代码:

@RestController
@RequestMapping("/export")
public class ExportController {

    @GetMapping("/orders")
    public void exportOrders(HttpServletResponse response) {
        List<Order> orders = orderService.findAll();
        ExcelUtils.export(response, orders);
    }
}

测试时发现:10 万条数据导出需要 2 分钟,用户页面一直转圈,体验极差。

于是你改成了异步:

@Async
public void exportOrders(String taskId) {
    List<Order> orders = orderService.findAll();
    ExcelUtils.export(orders);
}

问题来了:

问题描述
进度不可见用户不知道任务执行到哪一步了
结果无法获取任务完成后,用户怎么下载文件?
重启即丢失系统重启后,任务进度全部丢失
无法重试任务失败后,用户需要重新提交

传统方案的三大痛点

痛点影响
用户体验差长时间等待,不知道任务进度
数据不安全系统崩溃或重启,任务全部丢失
运维困难无法追踪任务状态,排查问题困难

更糟糕的是:有些系统甚至没有任务管理,用户提交任务后就石沉大海。


二、异步任务持久化:让任务「有迹可循」

核心思路

异步任务持久化的核心思想是:将任务信息存储到数据库,通过任务 ID 追踪任务状态和进度

┌─────────────┐         ┌─────────────┐         ┌─────────────┐
│  用户提交    │────────▶│  创建任务    │────────▶│  异步执行    │
│  任务请求    │  返回   │  记录到DB   │  更新   │  更新进度    │
│  获取任务ID  │  任务ID  │            │  状态   │            │
└─────────────┘         └─────────────┘         └─────────────┘
       │                       │                       │
       │                       │                       │
       ▼                       ▼                       ▼
┌─────────────┐         ┌─────────────┐         ┌─────────────┐
│  轮询查询    │◀────────│  查询任务    │◀────────│  任务完成    │
│  任务进度    │  返回   │  状态和进度  │  通知   │  保存结果    │
│  下载结果    │  状态   │            │         │            │
└─────────────┘         └─────────────┘         └─────────────┘

技术选型

组件作用优势
Spring Boot应用框架生态完善、易于集成
@Async异步执行简单易用、性能优秀
MySQL数据持久化成熟稳定、支持事务
Spring Data JPA数据访问简化数据库操作

三、实现方案详解

1. 任务状态枚举

定义任务的生命周期状态:

public enum TaskStatus {
    PENDING("待执行"),
    RUNNING("执行中"),
    SUCCESS("成功"),
    FAILED("失败"),
    CANCELLED("已取消");

    private final String description;

    TaskStatus(String description) {
        this.description = description;
    }

    public String getDescription() {
        return description;
    }
}

2. 任务实体类

定义任务表结构:

@Entity
@Table(name = "async_task")
@Data
public class AsyncTask {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String taskId;

    @Column(nullable = false)
    private String taskType;

    @Column(nullable = false)
    private TaskStatus status;

    private Integer progress;

    private String message;

    @Lob
    private String result;

    @Column(nullable = false)
    private LocalDateTime createTime;

    private LocalDateTime updateTime;

    private LocalDateTime completeTime;

    @PrePersist
    protected void onCreate() {
        createTime = LocalDateTime.now();
        updateTime = LocalDateTime.now();
    }

    @PreUpdate
    protected void onUpdate() {
        updateTime = LocalDateTime.now();
    }
}

3. 任务仓储接口

@Repository
public interface AsyncTaskRepository extends JpaRepository<AsyncTask, Long> {

    Optional<AsyncTask> findByTaskId(String taskId);

    List<AsyncTask> findByStatus(TaskStatus status);
}

4. 异步任务服务

核心服务类,管理任务的生命周期:

@Service
@Slf4j
public class AsyncTaskService {

    @Autowired
    private AsyncTaskRepository taskRepository;

    @Autowired
    private TaskExecutor taskExecutor;

    public AsyncTask createTask(String taskType, Map<String, Object> params) {
        AsyncTask task = new AsyncTask();
        task.setTaskId(UUID.randomUUID().toString());
        task.setTaskType(taskType);
        task.setStatus(TaskStatus.PENDING);
        task.setProgress(0);
        
        AsyncTask savedTask = taskRepository.save(task);
        log.info("创建任务: taskId={}, type={}", savedTask.getTaskId(), taskType);
        
        return savedTask;
    }

    public void updateTaskProgress(String taskId, Integer progress, String message) {
        AsyncTask task = taskRepository.findByTaskId(taskId)
            .orElseThrow(() -> new RuntimeException("任务不存在: " + taskId));
        
        task.setProgress(progress);
        task.setMessage(message);
        task.setStatus(TaskStatus.RUNNING);
        
        taskRepository.save(task);
        log.info("更新任务进度: taskId={}, progress={}, message={}", taskId, progress, message);
    }

    public void completeTask(String taskId, String result) {
        AsyncTask task = taskRepository.findByTaskId(taskId)
            .orElseThrow(() -> new RuntimeException("任务不存在: " + taskId));
        
        task.setStatus(TaskStatus.SUCCESS);
        task.setProgress(100);
        task.setResult(result);
        task.setCompleteTime(LocalDateTime.now());
        
        taskRepository.save(task);
        log.info("任务完成: taskId={}", taskId);
    }

    public void failTask(String taskId, String errorMessage) {
        AsyncTask task = taskRepository.findByTaskId(taskId)
            .orElseThrow(() -> new RuntimeException("任务不存在: " + taskId));
        
        task.setStatus(TaskStatus.FAILED);
        task.setMessage(errorMessage);
        task.setCompleteTime(LocalDateTime.now());
        
        taskRepository.save(task);
        log.error("任务失败: taskId={}, error={}", taskId, errorMessage);
    }

    public AsyncTask getTask(String taskId) {
        return taskRepository.findByTaskId(taskId)
            .orElseThrow(() -> new RuntimeException("任务不存在: " + taskId));
    }
}

5. 异步任务执行器

@Component
@Slf4j
public class AsyncTaskExecutor {

    @Autowired
    private AsyncTaskService taskService;

    @Async("taskExecutor")
    public void executeExportTask(String taskId, Map<String, Object> params) {
        try {
            taskService.updateTaskProgress(taskId, 0, "开始执行导出任务");
            
            List<Order> orders = orderService.findAll();
            int total = orders.size();
            
            for (int i = 0; i < total; i++) {
                Order order = orders.get(i);
                processOrder(order);
                
                int progress = (int) ((i + 1) * 100.0 / total);
                taskService.updateTaskProgress(taskId, progress, 
                    String.format("正在处理第 %d/%d 条数据", i + 1, total));
                
                Thread.sleep(10);
            }
            
            String result = "导出完成,共处理 " + total + " 条数据";
            taskService.completeTask(taskId, result);
            
        } catch (Exception e) {
            taskService.failTask(taskId, "任务执行失败: " + e.getMessage());
        }
    }
}

6. 控制器:任务提交和查询

@RestController
@RequestMapping("/api/tasks")
public class TaskController {

    @Autowired
    private AsyncTaskService taskService;

    @Autowired
    private AsyncTaskExecutor taskExecutor;

    @PostMapping("/export")
    public Map<String, Object> submitExportTask(@RequestBody Map<String, Object> params) {
        AsyncTask task = taskService.createTask("EXPORT_ORDERS", params);
        
        taskExecutor.executeExportTask(task.getTaskId(), params);
        
        return Map.of(
            "taskId", task.getTaskId(),
            "status", task.getStatus(),
            "message", "任务已提交,请使用 taskId 查询进度"
        );
    }

    @GetMapping("/{taskId}")
    public AsyncTask getTask(@PathVariable String taskId) {
        return taskService.getTask(taskId);
    }

    @GetMapping("/{taskId}/result")
    public Map<String, Object> getTaskResult(@PathVariable String taskId) {
        AsyncTask task = taskService.getTask(taskId);
        
        if (task.getStatus() != TaskStatus.SUCCESS) {
            throw new RuntimeException("任务尚未完成");
        }
        
        return Map.of(
            "taskId", taskId,
            "result", task.getResult(),
            "completeTime", task.getCompleteTime()
        );
    }
}

四、实战演示

场景一:导出订单数据

步骤 1:提交任务

POST /api/tasks/export
Content-Type: application/json

{
  "startDate": "2024-01-01",
  "endDate": "2024-12-31"
}

响应:

{
  "taskId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "status": "PENDING",
  "message": "任务已提交,请使用 taskId 查询进度"
}

步骤 2:查询进度

GET /api/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890

响应:

{
  "taskId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "taskType": "EXPORT_ORDERS",
  "status": "RUNNING",
  "progress": 45,
  "message": "正在处理第 4500/10000 条数据",
  "createTime": "2024-01-15T10:00:00",
  "updateTime": "2024-01-15T10:00:30"
}

步骤 3:获取结果

GET /api/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890/result

响应:

{
  "taskId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "result": "导出完成,共处理 10000 条数据",
  "completeTime": "2024-01-15T10:01:00"
}

场景二:批量处理订单

@Async("taskExecutor")
public void executeBatchProcessTask(String taskId, List<Long> orderIds) {
    try {
        taskService.updateTaskProgress(taskId, 0, "开始批量处理订单");
        
        int total = orderIds.size();
        
        for (int i = 0; i < total; i++) {
            Long orderId = orderIds.get(i);
            processOrder(orderId);
            
            int progress = (int) ((i + 1) * 100.0 / total);
            taskService.updateTaskProgress(taskId, progress, 
                String.format("正在处理订单 %d/%d", i + 1, total));
        }
        
        taskService.completeTask(taskId, "批量处理完成,共处理 " + total + " 个订单");
        
    } catch (Exception e) {
        taskService.failTask(taskId, "批量处理失败: " + e.getMessage());
    }
}

五、进阶功能

1. 任务取消

支持用户主动取消任务:

public void cancelTask(String taskId) {
    AsyncTask task = taskRepository.findByTaskId(taskId)
        .orElseThrow(() -> new RuntimeException("任务不存在: " + taskId));
    
    if (task.getStatus() == TaskStatus.RUNNING) {
        task.setStatus(TaskStatus.CANCELLED);
        task.setCompleteTime(LocalDateTime.now());
        taskRepository.save(task);
        log.info("任务已取消: taskId={}", taskId);
    }
}

2. 任务重试

支持失败任务自动重试:

@Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000))
@Async("taskExecutor")
public void executeTaskWithRetry(String taskId, Map<String, Object> params) {
    try {
        executeTask(taskId, params);
    } catch (Exception e) {
        taskService.failTask(taskId, "任务执行失败: " + e.getMessage());
    }
}

3. 任务超时控制

设置任务超时时间:

@Async("taskExecutor")
@Timeout(value = 5, unit = TimeUnit.MINUTES)
public void executeTaskWithTimeout(String taskId, Map<String, Object> params) {
    try {
        executeTask(taskId, params);
    } catch (TimeoutException e) {
        taskService.failTask(taskId, "任务执行超时");
    }
}

4. 实时进度推送

使用 WebSocket 或 SSE 实时推送进度:

@Async("taskExecutor")
public void executeTaskWithProgress(String taskId, Map<String, Object> params) {
    try {
        for (int i = 0; i <= 100; i++) {
            taskService.updateTaskProgress(taskId, i, "处理中...");
            webSocketService.sendProgress(taskId, i);
            Thread.sleep(100);
        }
        
        taskService.completeTask(taskId, "任务完成");
    } catch (Exception e) {
        taskService.failTask(taskId, e.getMessage());
    }
}

六、最佳实践

1. 数据库设计

字段类型说明
idBIGINT主键
task_idVARCHAR(64)任务 ID,唯一索引
task_typeVARCHAR(50)任务类型
statusVARCHAR(20)任务状态
progressINT进度(0-100)
messageVARCHAR(500)任务消息
resultTEXT任务结果
create_timeDATETIME创建时间
update_timeDATETIME更新时间
complete_timeDATETIME完成时间

2. 进度更新策略

策略适用场景优缺点
实时更新需要精确进度优点:进度准确;缺点:数据库压力大
批量更新大批量数据优点:减少数据库操作;缺点:进度不够精确
分阶段更新多阶段任务优点:平衡性能和准确性;缺点:实现复杂

3. 异常处理

@Async("taskExecutor")
public void executeTask(String taskId, Map<String, Object> params) {
    try {
        doExecute(taskId, params);
    } catch (BusinessException e) {
        taskService.failTask(taskId, "业务异常: " + e.getMessage());
    } catch (Exception e) {
        log.error("任务执行异常: taskId={}", taskId, e);
        taskService.failTask(taskId, "系统异常,请联系管理员");
    }
}

4. 性能优化

优化点方案效果
批量更新每 100 条更新一次进度减少数据库操作
异步更新使用消息队列异步更新进度提升响应速度
缓存优化使用 Redis 缓存任务状态减少数据库查询
索引优化为 task_id 创建唯一索引提升查询性能

七、监控告警

1. 任务监控

@Component
@Slf4j
public class TaskMonitor {

    @Autowired
    private AsyncTaskRepository taskRepository;

    @Scheduled(fixedRate = 60000)
    public void monitorRunningTasks() {
        List<AsyncTask> runningTasks = taskRepository.findByStatus(TaskStatus.RUNNING);
        
        for (AsyncTask task : runningTasks) {
            Duration duration = Duration.between(task.getUpdateTime(), LocalDateTime.now());
            
            if (duration.toMinutes() > 30) {
                log.warn("任务执行时间过长: taskId={}, duration={}分钟", 
                    task.getTaskId(), duration.toMinutes());
            }
        }
    }
}

2. 失败任务告警

@Scheduled(fixedRate = 300000)
public void monitorFailedTasks() {
    List<AsyncTask> failedTasks = taskRepository.findByStatus(TaskStatus.FAILED);
    
    if (!failedTasks.isEmpty()) {
        log.error("发现 {} 个失败任务", failedTasks.size());
        alertService.sendAlert("任务失败告警", failedTasks.size());
    }
}

八、总结

异步任务结果持久化 + 查询接口的方案,彻底解决了传统异步任务的痛点:

进度可查:用户随时查看任务执行进度
结果可取:任务完成后可获取结果
重启不丢:任务信息持久化到数据库
状态可追:完整的任务生命周期记录
体验提升:用户无需长时间等待

让长时间任务变得透明、可控、可靠!


互动话题

你的项目中是如何处理长时间任务的?有没有遇到过任务丢失的问题?欢迎在评论区分享你的经验。


更多技术文章,欢迎关注公众号服务端技术精选,及时获取最新动态。


标题:SpringBoot + 异步任务结果持久化 + 查询接口:用户可随时查看长时间任务进度与结果
作者:jiangyi
地址:http://jiangyi.space/articles/2026/03/16/1773467095227.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消