SpringBoot DAG任务编排系统

一个基于SpringBoot的DAG(有向无环图)任务编排系统,支持任务依赖管理、自动触发、并行执行等功能。

功能特性

  • DAG任务编排: 支持复杂的任务依赖关系定义
  • 自动触发机制: A任务成功后自动触发B、C任务
  • 并行执行: 无依赖的任务并行执行,提升效率
  • 可视化监控: 提供Web界面实时监控任务执行状态
  • 拓扑排序: 自动检测任务依赖关系,确保正确执行顺序
  • 失败重试: 支持任务失败后的自动重试机制
  • 循环依赖检测: 自动检测并阻止循环依赖的DAG创建

技术栈

  • Spring Boot 3.2.0
  • Spring Data JPA
  • H2 Database(开发环境)
  • Redis(可选,用于缓存)
  • ECharts(可视化)
  • Maven

快速开始

1. 克隆项目

git clone <repository-url>
cd SpringBoot-Task-DAG-Orchestration-Demo

2. 运行项目

mvn spring-boot:run

3. 访问监控页面

打开浏览器访问:http://localhost:8080

API接口

创建任务

POST /api/dag/tasks
Content-Type: application/x-www-form-urlencoded

taskName=任务名称&taskType=任务类型&dagId=DAG_ID&parentTasks=父任务ID列表&childTasks=子任务ID列表

启动DAG执行

POST /api/dag/execute/{dagId}

获取DAG状态

GET /api/dag/status/{dagId}

获取任务状态

GET /api/dag/tasks/{taskId}

取消DAG执行

DELETE /api/dag/execute/{dagId}

获取DAG可视化数据

GET /api/dag/visualization/{dagId}

获取DAG拓扑排序

GET /api/dag/topology/{dagId}

检测循环依赖

GET /api/dag/cycle-check/{dagId}

创建示例DAG

POST /api/test/create-sample-dag

创建并行DAG

POST /api/test/create-parallel-dag

使用示例

示例1: 简单的任务依赖(A -> B, C -> D)

// 创建任务A(根任务)
DagTask taskA = orchestrationService.createTask(
    "任务A-数据准备", "DATA_PREPARE", dagId, null, null);

// 创建任务B(依赖A)
DagTask taskB = orchestrationService.createTask(
    "任务B-数据分析", "DATA_ANALYSIS", dagId, Arrays.asList(taskA.getTaskId()), null);

// 创建任务C(依赖A)
DagTask taskC = orchestrationService.createTask(
    "任务C-数据清洗", "DATA_CLEAN", dagId, Arrays.asList(taskA.getTaskId()), null);

// 创建任务D(依赖B和C)
DagTask taskD = orchestrationService.createTask(
    "任务D-报表生成", "REPORT_GENERATE", dagId, 
    Arrays.asList(taskB.getTaskId(), taskC.getTaskId()), null);

// 启动DAG执行
orchestrationService.startDagExecution(dagId);

示例2: 并行任务执行

// 创建多个独立的根任务,它们将并行执行
DagTask task1 = orchestrationService.createTask(
    "并行任务1-数据导出", "DATA_EXPORT", dagId, null, null);

DagTask task2 = orchestrationService.createTask(
    "并行任务2-数据备份", "DATA_BACKUP", dagId, null, null);

DagTask task3 = orchestrationService.createTask(
    "并行任务3-数据同步", "DATA_SYNC", dagId, null, null);

// 启动DAG执行
orchestrationService.startDagExecution(dagId);

任务状态

  • PENDING: 待处理,等待依赖任务完成
  • READY: 就绪,依赖任务已完成,可以执行
  • PROCESSING: 处理中,正在执行
  • SUCCESS: 成功,任务执行成功
  • FAILED: 失败,任务执行失败
  • SKIPPED: 跳过,依赖任务失败,跳过执行
  • CANCELLED: 已取消,任务被取消

配置说明

dag:
  orchestration:
    enabled: true          # 启用DAG编排
    pool-size: 10          # 线程池大小
    queue-capacity: 100    # 任务队列容量
    retry-times: 3         # 失败重试次数
    timeout: 300000        # 任务超时时间(毫秒)

监控页面

系统提供了一个Web监控页面,可以:

  • 实时查看DAG执行状态
  • 可视化展示任务依赖关系图
  • 查看任务列表和详细信息
  • 创建示例DAG和并行DAG
  • 执行和取消DAG任务

访问地址:http://localhost:8080

核心设计

1. 任务依赖管理

使用parentTaskschildTasks字段存储任务依赖关系,支持多对多依赖。

2. 自动触发机制

任务成功完成后,自动检查并触发所有依赖该任务的子任务。

3. 并行执行

无依赖的任务会并行执行,提高整体执行效率。

4. 拓扑排序

使用Kahn算法进行拓扑排序,确保任务按正确顺序执行。

5. 循环依赖检测

通过DFS算法检测DAG中是否存在循环依赖,防止死锁。

最佳实践

  1. 合理设计任务粒度: 任务粒度不宜过大或过小
  2. 设置合理的超时时间: 避免长时间阻塞
  3. 配置适当的重试次数: 平衡可靠性和效率
  4. 监控任务执行状态: 及时发现和处理异常
  5. 避免循环依赖: 设计DAG时确保无环

扩展功能

  • 支持WebSocket实时推送任务状态
  • 支持任务执行历史记录
  • 支持任务执行日志收集
  • 支持任务执行时间统计
  • 支持任务失败告警通知

源码下载

公众号“服务端技术精选”,回复“任务依赖 DAG 编排”即可获取项目下载链接。`


标题:SpringBoot DAG任务编排系统
作者:jiangyi
地址:http://jiangyi.space/articles/2026/02/28/1772256634039.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消