SpringBoot + 任务依赖 DAG 编排:A 任务成功后自动触发 B、C,并行执行提效率

导语

在日常开发中,你是否遇到过这样的场景:

  • 数据导入任务完成后,需要自动触发数据清洗、数据分析等多个后续任务
  • 报表生成需要依赖多个数据源的准备完成
  • 某些任务可以并行执行,而另一些必须串行等待

传统的定时任务或简单的队列很难优雅地处理这种复杂的任务依赖关系。今天,我们就来聊聊如何用SpringBoot + DAG(有向无环图)实现一个强大的任务编排系统,让A任务成功后自动触发B、C任务,并行执行大幅提升效率。


一、为什么需要DAG任务编排?

1.1 传统方案的痛点

串行执行的问题:

// 传统串行执行
public void executeTasks() {
    taskA.execute();  // 耗时5分钟
    taskB.execute();  // 耗时3分钟
    taskC.execute();  // 耗时4分钟
    taskD.execute();  // 耗时2分钟
    // 总耗时:14分钟
}

问题:

  • B和C其实可以并行执行,但串行方式白白浪费了时间
  • 任务依赖关系硬编码,难以维护和扩展
  • 某个任务失败,后续任务无法自动处理

1.2 DAG的优势

DAG(Directed Acyclic Graph,有向无环图) 是一种图形结构,其中:

  • 节点 代表任务
  • 代表任务之间的依赖关系
  • 有向 表示依赖方向
  • 无环 确保不会出现循环依赖

优势:

  • 自动依赖解析:系统自动识别哪些任务可以并行执行
  • 可视化依赖关系:通过图形直观展示任务流程
  • 失败自动处理:某个任务失败可以自动重试或跳过依赖任务
  • 灵活扩展:新增任务只需配置依赖关系,无需修改代码

1.3 实际应用场景

场景任务依赖关系DAG优势
数据ETL数据抽取 → 数据清洗 → 数据转换 → 数据加载清洗和转换可以并行
报表生成数据源A准备 → 报表生成 ← 数据源B准备多个数据源并行准备
订单处理订单创建 → 库存扣减 + 支付处理 → 发货库存和支付并行处理
机器学习数据预处理 → 特征工程 → 模型训练 → 模型评估特征工程可并行

二、核心设计思路

2.1 系统架构

┌─────────────────────────────────────────────────────────────┐
│                    DAG任务编排系统                           │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐  │
│  │   任务创建   │  │   DAG执行   │  │     可视化监控       │  │
│  └─────────────┘  └─────────────┘  └─────────────────────┘  │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐  │
│  │  任务调度器   │  │  依赖检查器   │  │     状态管理器       │  │
│  └─────────────┘  └─────────────┘  └─────────────────────┘  │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐  │
│  │   任务存储   │  │   DAG定义    │  │     执行日志        │  │
│  └─────────────┘  └─────────────┘  └─────────────────────┘  │
└─────────────────────────────────────────────────────────────┘

2.2 核心组件

1. 任务定义(DagTask)

@Entity
public class DagTask {
    private String taskId;      // 任务唯一标识
    private String taskName;    // 任务名称
    private String taskType;    // 任务类型
    private String status;      // 任务状态
    private String dagId;       // 所属DAG
    private String parentTasks; // 父任务ID列表
    private String childTasks;  // 子任务ID列表
    // ... 其他字段
}

2. 任务状态机

PENDING → READY → PROCESSING → SUCCESS
                              ↓
                           FAILED → RETRY
                              ↓
                           SKIPPED

3. 依赖检查逻辑

private boolean checkDependenciesCompleted(DagTask task) {
    if (task.getParentTasks() == null || task.getParentTasks().isEmpty()) {
        return true; // 无父任务,可以直接执行
    }

    String[] parentTaskIds = task.getParentTasks().split(",");
    for (String parentTaskId : parentTaskIds) {
        DagTask parentTask = taskRepository.findByTaskId(parentTaskId);
        if (!parentTask.getStatus().equals("SUCCESS")) {
            return false; // 有父任务未完成
        }
    }
    return true; // 所有父任务都已完成
}

2.3 执行流程

1. 创建DAG任务
   ↓
2. 启动DAG执行
   ↓
3. 识别根任务(无父任务)
   ↓
4. 并行执行根任务
   ↓
5. 任务完成 → 检查子任务依赖
   ↓
6. 依赖满足 → 触发子任务执行
   ↓
7. 重复5-6直到所有任务完成

三、实战:构建DAG任务编排系统

3.1 项目结构

SpringBoot-Task-DAG-Orchestration-Demo/
├── src/main/java/com/example/dag/
│   ├── entity/              # 实体类
│   │   ├── DagTask.java     # 任务实体
│   │   ├── TaskStatus.java  # 任务状态枚举
│   │   └── DagDefinition.java # DAG定义
│   ├── repository/          # 数据访问层
│   │   ├── DagTaskRepository.java
│   │   └── DagDefinitionRepository.java
│   ├── service/             # 业务逻辑层
│   │   ├── DagOrchestrationService.java  # 核心编排服务
│   │   └── DagVisualizationService.java  # 可视化服务
│   ├── controller/          # 控制器层
│   │   ├── DagController.java
│   │   └── TestController.java
│   └── config/              # 配置类
│       ├── DagOrchestrationProperties.java
│       └── AsyncConfig.java
├── src/main/resources/
│   ├── static/index.html    # 监控页面
│   └── application.yml      # 配置文件
└── pom.xml

3.2 核心实现代码

任务编排服务(核心)

@Service
@Slf4j
public class DagOrchestrationService {

    /**
     * 启动DAG执行
     */
    @Transactional
    public void startDagExecution(String dagId) {
        // 获取所有根任务(没有父任务的任务)
        List<DagTask> rootTasks = taskRepository.findRootTasks(dagId);
        
        // 将所有根任务标记为就绪状态
        for (DagTask task : rootTasks) {
            task.setStatus(TaskStatus.READY.name());
            taskRepository.save(task);
            
            // 异步执行就绪的任务
            executeTaskAsync(task.getTaskId());
        }
    }

    /**
     * 执行任务(核心逻辑)
     */
    @Transactional
    public void executeTask(String taskId) {
        DagTask task = taskRepository.findByTaskId(taskId)
                .orElseThrow(() -> new RuntimeException("任务不存在"));

        // 检查依赖任务是否完成
        if (!checkDependenciesCompleted(task)) {
            log.info("任务 {} 的依赖任务未完成,等待中", taskId);
            return;
        }

        // 更新任务状态为处理中
        task.setStatus(TaskStatus.PROCESSING.name());
        task.setStartTime(LocalDateTime.now());
        taskRepository.save(task);

        try {
            // 执行业务逻辑
            processTaskLogic(task);

            // 任务成功完成
            task.setStatus(TaskStatus.SUCCESS.name());
            task.setEndTime(LocalDateTime.now());
            taskRepository.save(task);

            // 触发子任务执行
            triggerChildTasks(task);

        } catch (Exception e) {
            handleTaskFailure(task, e);
        }
    }

    /**
     * 触发子任务执行
     */
    private void triggerChildTasks(DagTask parentTask) {
        if (parentTask.getChildTasks() == null) {
            return;
        }

        String[] childTaskIds = parentTask.getChildTasks().split(",");
        
        for (String childTaskId : childTaskIds) {
            DagTask childTask = taskRepository.findByTaskId(childTaskId.trim())
                    .orElse(null);
            
            if (childTask != null && checkDependenciesCompleted(childTask)) {
                childTask.setStatus(TaskStatus.READY.name());
                taskRepository.save(childTask);
                
                // 异步执行子任务
                executeTaskAsync(childTask.getTaskId());
            }
        }
    }
}

拓扑排序实现

@Service
public class DagVisualizationService {

    /**
     * 获取DAG的拓扑排序(Kahn算法)
     */
    public List<String> getTopologicalOrder(String dagId) {
        List<DagTask> allTasks = taskRepository.findByDagId(dagId);
        Map<String, List<String>> adjacencyList = new HashMap<>();
        Map<String, Integer> inDegree = new HashMap<>();

        // 构建邻接表和入度表
        for (DagTask task : allTasks) {
            adjacencyList.putIfAbsent(task.getTaskId(), new ArrayList<>());
            inDegree.putIfAbsent(task.getTaskId(), 0);

            if (task.getChildTasks() != null) {
                String[] children = task.getChildTasks().split(",");
                for (String child : children) {
                    String childId = child.trim();
                    adjacencyList.get(task.getTaskId()).add(childId);
                    inDegree.put(childId, inDegree.getOrDefault(childId, 0) + 1);
                }
            }
        }

        // 拓扑排序
        Queue<String> queue = new LinkedList<>();
        List<String> result = new ArrayList<>();

        // 入度为0的节点入队(根任务)
        for (Map.Entry<String, Integer> entry : inDegree.entrySet()) {
            if (entry.getValue() == 0) {
                queue.offer(entry.getKey());
            }
        }

        while (!queue.isEmpty()) {
            String current = queue.poll();
            result.add(current);

            for (String neighbor : adjacencyList.getOrDefault(current, new ArrayList<>())) {
                inDegree.put(neighbor, inDegree.get(neighbor) - 1);
                if (inDegree.get(neighbor) == 0) {
                    queue.offer(neighbor);
                }
            }
        }

        return result;
    }
}

3.3 可视化监控

系统提供了一个Web监控页面,可以:

  1. 实时查看DAG执行状态:任务状态、执行时间、依赖关系
  2. 可视化展示任务依赖关系图:使用ECharts展示DAG图
  3. 创建示例DAG:一键创建示例任务流
  4. 执行和取消DAG任务:手动控制任务执行

监控页面截图示意:

┌─────────────────────────────────────────────────────────────┐
│              DAG任务编排监控平台                              │
├─────────────────────────────────────────────────────────────┤
│  系统状态: ● 正常    操作面板: [创建示例DAG] [创建并行DAG]    │
├─────────────────────────────────────────────────────────────┤
│  DAG可视化图(ECharts展示)                                   │
│                                                             │
│        ┌─────┐                                              │
│        │  A  │                                              │
│        └──┬──┘                                              │
│           │                                                 │
│      ┌────┴────┐                                            │
│      ▼         ▼                                            │
│   ┌─────┐   ┌─────┐                                         │
│   │  B  │   │  C  │                                         │
│   └──┬──┘   └─────┘                                         │
│      │                                                      │
│      ▼                                                      │
│   ┌─────┐                                                   │
│   │  D  │                                                   │
│   └─────┘                                                   │
│                                                             │
├─────────────────────────────────────────────────────────────┤
│  任务列表                                                    │
│  ID | 任务ID | 任务名称 | 状态 | 操作                        │
│  1  | xxx   | 任务A    | 成功 | [查看]                       │
│  2  | yyy   | 任务B    | 处理中| [查看]                       │
│  3  | zzz   | 任务C    | 就绪  | [查看]                       │
└─────────────────────────────────────────────────────────────┘

四、实战效果验证

4.1 性能对比

场景:数据ETL流程

  • 任务A:数据抽取(5分钟)
  • 任务B:数据清洗(3分钟)
  • 任务C:数据转换(4分钟)
  • 任务D:数据加载(2分钟)

串行执行:

总耗时 = 5 + 3 + 4 + 2 = 14分钟

DAG并行执行:

时间线:
0-5分钟:   A执行
5-8分钟:   B执行(依赖A完成)
5-9分钟:   C执行(依赖A完成,与B并行)
9-11分钟:  D执行(依赖B和C完成)

总耗时 = 11分钟
效率提升 = (14-11)/14 = 21.4%

4.2 实际案例

电商订单处理流程:

订单创建 → [库存扣减 + 支付处理] → 发货 → 通知

优化前(串行):

  • 订单创建:100ms
  • 库存扣减:200ms
  • 支付处理:500ms
  • 发货:300ms
  • 通知:100ms
  • 总耗时:1200ms

优化后(DAG并行):

  • 订单创建:100ms
  • 库存扣减 + 支付处理:max(200, 500) = 500ms(并行)
  • 发货:300ms
  • 通知:100ms
  • 总耗时:1000ms
  • 性能提升:16.7%

五、最佳实践与注意事项

5.1 任务设计原则

1. 合理划分任务粒度

  • 任务粒度不宜过大,否则失去并行优势
  • 任务粒度不宜过小,否则增加调度开销
  • 建议:单个任务执行时间在1-10分钟之间

2. 明确任务依赖关系

// 好的设计:明确依赖
A -> B -> C
A -> D -> C

// 避免:隐式依赖
A -> B -> C
A -> D(D内部调用C)

3. 设置合理的超时和重试

dag:
  orchestration:
    retry-times: 3        # 失败重试3次
    timeout: 300000       # 5分钟超时

5.2 常见问题处理

1. 循环依赖检测

public boolean hasCycle(String dagId) {
    try {
        getTopologicalOrder(dagId);
        return false;
    } catch (Exception e) {
        return true; // 存在循环依赖
    }
}

2. 任务失败处理

private void handleTaskFailure(DagTask task, Exception e) {
    if (task.getRetryCount() < maxRetryTimes) {
        // 重试
        retryTask(task);
    } else {
        // 跳过子任务
        skipChildTasks(task);
    }
}

3. 并发控制

@Bean("dagTaskExecutor")
public Executor dagTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);    // 核心线程数
    executor.setMaxPoolSize(20);     // 最大线程数
    executor.setQueueCapacity(100);  // 队列容量
    return executor;
}

5.3 监控与告警

关键监控指标:

  • DAG执行成功率
  • 平均执行时间
  • 并行度(同时执行的任务数)
  • 任务失败率
  • 重试次数分布

告警策略:

  • DAG执行失败立即告警
  • 任务执行时间超过阈值告警
  • 连续失败次数过多告警

六、总结与展望

6.1 核心收益

  1. 效率提升:通过并行执行,整体任务执行时间显著缩短
  2. 可维护性:依赖关系配置化,无需修改代码
  3. 可视化:任务流程一目了然,便于理解和维护
  4. 容错性:自动重试和失败处理机制

6.2 适用场景

  • 数据ETL流程
  • 报表生成
  • 订单处理
  • 机器学习流水线
  • CI/CD流程
  • 工作流引擎

6.3 未来扩展

1. 分布式DAG

  • 支持跨服务的任务编排
  • 使用分布式锁保证一致性

2. 动态DAG

  • 运行时动态调整任务依赖
  • 支持条件分支和循环

3. 智能调度

  • 基于历史数据预测任务执行时间
  • 智能分配资源,优化并行度

七、源码获取

完整项目代码已上传,包含:

  • 完整的SpringBoot项目
  • 可视化监控页面
  • 详细的API文档
  • 单元测试和集成测试

项目地址: 公众号“服务端技术精选”,回复“任务依赖 DAG 编排”即可获取项目下载链接。


互动话题

  1. 你在项目中遇到过哪些复杂的任务依赖场景?
  2. 除了DAG,你还用过哪些任务编排方案?
  3. 对于分布式任务编排,你有什么经验和建议?

欢迎在评论区留言讨论!如果觉得文章对你有帮助,别忘了点赞、在看、转发三连支持一下~


标题:SpringBoot + 任务依赖 DAG 编排:A 任务成功后自动触发 B、C,并行执行提效率
作者:jiangyi
地址:http://jiangyi.space/articles/2026/03/01/1772256519859.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消