SpringBoot DAG任务编排系统
一个基于SpringBoot的DAG(有向无环图)任务编排系统,支持任务依赖管理、自动触发、并行执行等功能。
功能特性
- DAG任务编排: 支持复杂的任务依赖关系定义
- 自动触发机制: A任务成功后自动触发B、C任务
- 并行执行: 无依赖的任务并行执行,提升效率
- 可视化监控: 提供Web界面实时监控任务执行状态
- 拓扑排序: 自动检测任务依赖关系,确保正确执行顺序
- 失败重试: 支持任务失败后的自动重试机制
- 循环依赖检测: 自动检测并阻止循环依赖的DAG创建
技术栈
- Spring Boot 3.2.0
- Spring Data JPA
- H2 Database(开发环境)
- Redis(可选,用于缓存)
- ECharts(可视化)
- Maven
快速开始
1. 克隆项目
git clone <repository-url>
cd SpringBoot-Task-DAG-Orchestration-Demo
2. 运行项目
mvn spring-boot:run
3. 访问监控页面
打开浏览器访问:http://localhost:8080
API接口
创建任务
POST /api/dag/tasks
Content-Type: application/x-www-form-urlencoded
taskName=任务名称&taskType=任务类型&dagId=DAG_ID&parentTasks=父任务ID列表&childTasks=子任务ID列表
启动DAG执行
POST /api/dag/execute/{dagId}
获取DAG状态
GET /api/dag/status/{dagId}
获取任务状态
GET /api/dag/tasks/{taskId}
取消DAG执行
DELETE /api/dag/execute/{dagId}
获取DAG可视化数据
GET /api/dag/visualization/{dagId}
获取DAG拓扑排序
GET /api/dag/topology/{dagId}
检测循环依赖
GET /api/dag/cycle-check/{dagId}
创建示例DAG
POST /api/test/create-sample-dag
创建并行DAG
POST /api/test/create-parallel-dag
使用示例
示例1: 简单的任务依赖(A -> B, C -> D)
// 创建任务A(根任务)
DagTask taskA = orchestrationService.createTask(
"任务A-数据准备", "DATA_PREPARE", dagId, null, null);
// 创建任务B(依赖A)
DagTask taskB = orchestrationService.createTask(
"任务B-数据分析", "DATA_ANALYSIS", dagId, Arrays.asList(taskA.getTaskId()), null);
// 创建任务C(依赖A)
DagTask taskC = orchestrationService.createTask(
"任务C-数据清洗", "DATA_CLEAN", dagId, Arrays.asList(taskA.getTaskId()), null);
// 创建任务D(依赖B和C)
DagTask taskD = orchestrationService.createTask(
"任务D-报表生成", "REPORT_GENERATE", dagId,
Arrays.asList(taskB.getTaskId(), taskC.getTaskId()), null);
// 启动DAG执行
orchestrationService.startDagExecution(dagId);
示例2: 并行任务执行
// 创建多个独立的根任务,它们将并行执行
DagTask task1 = orchestrationService.createTask(
"并行任务1-数据导出", "DATA_EXPORT", dagId, null, null);
DagTask task2 = orchestrationService.createTask(
"并行任务2-数据备份", "DATA_BACKUP", dagId, null, null);
DagTask task3 = orchestrationService.createTask(
"并行任务3-数据同步", "DATA_SYNC", dagId, null, null);
// 启动DAG执行
orchestrationService.startDagExecution(dagId);
任务状态
- PENDING: 待处理,等待依赖任务完成
- READY: 就绪,依赖任务已完成,可以执行
- PROCESSING: 处理中,正在执行
- SUCCESS: 成功,任务执行成功
- FAILED: 失败,任务执行失败
- SKIPPED: 跳过,依赖任务失败,跳过执行
- CANCELLED: 已取消,任务被取消
配置说明
dag:
orchestration:
enabled: true # 启用DAG编排
pool-size: 10 # 线程池大小
queue-capacity: 100 # 任务队列容量
retry-times: 3 # 失败重试次数
timeout: 300000 # 任务超时时间(毫秒)
监控页面
系统提供了一个Web监控页面,可以:
- 实时查看DAG执行状态
- 可视化展示任务依赖关系图
- 查看任务列表和详细信息
- 创建示例DAG和并行DAG
- 执行和取消DAG任务
核心设计
1. 任务依赖管理
使用parentTasks和childTasks字段存储任务依赖关系,支持多对多依赖。
2. 自动触发机制
任务成功完成后,自动检查并触发所有依赖该任务的子任务。
3. 并行执行
无依赖的任务会并行执行,提高整体执行效率。
4. 拓扑排序
使用Kahn算法进行拓扑排序,确保任务按正确顺序执行。
5. 循环依赖检测
通过DFS算法检测DAG中是否存在循环依赖,防止死锁。
最佳实践
- 合理设计任务粒度: 任务粒度不宜过大或过小
- 设置合理的超时时间: 避免长时间阻塞
- 配置适当的重试次数: 平衡可靠性和效率
- 监控任务执行状态: 及时发现和处理异常
- 避免循环依赖: 设计DAG时确保无环
扩展功能
- 支持WebSocket实时推送任务状态
- 支持任务执行历史记录
- 支持任务执行日志收集
- 支持任务执行时间统计
- 支持任务失败告警通知
源码下载
公众号“服务端技术精选”,回复“任务依赖 DAG 编排”即可获取项目下载链接。`
标题:SpringBoot DAG任务编排系统
作者:jiangyi
地址:http://jiangyi.space/articles/2026/02/28/1772256634039.html
公众号:服务端技术精选
评论
0 评论