SpringBoot + 任务执行链路追踪 + TraceID 透传:从调度到完成,全链路可观测
导语
在分布式系统中,任务的执行往往跨越多个服务和组件,如何追踪任务的完整执行链路,了解每一步的执行状态和耗时,是系统可观测性的重要组成部分。本文将介绍如何在SpringBoot应用中实现任务执行的链路追踪和TraceID透传,从任务调度到执行完成,实现全链路的可观测性。通过这种方式,您可以实时监控任务的执行状态,快速定位问题,提高系统的可靠性和可维护性。
一、任务执行链路追踪的概念
1.1 什么是链路追踪
链路追踪(Distributed Tracing)是一种用于监控和观察分布式系统中请求或任务执行过程的技术。它通过为每个请求或任务分配一个唯一的标识符(TraceID),并在整个执行过程中传递这个标识符,从而实现对整个执行链路的追踪。
1.2 任务执行链路的特点
1. 跨服务
- 任务执行可能涉及多个微服务
- 不同服务之间需要传递上下文信息
- 需要追踪任务在不同服务中的执行状态
2. 异步执行
- 任务可能是异步执行的
- 执行过程可能涉及消息队列
- 需要追踪异步操作的完整链路
3. 长时间运行
- 任务可能运行时间较长
- 需要实时监控任务的执行状态
- 需要记录任务的执行历史
1.3 链路追踪的价值
| 价值 | 描述 |
|---|---|
| 问题定位 | 快速定位任务执行过程中的问题 |
| 性能分析 | 分析任务执行的性能瓶颈 |
| 依赖分析 | 了解任务之间的依赖关系 |
| 系统监控 | 实时监控系统的运行状态 |
| 容量规划 | 基于历史数据进行容量规划 |
二、TraceID 透传的实现
2.1 TraceID 的概念
TraceID 是链路追踪中的核心概念,它是一个全局唯一的标识符,用于标识一个完整的请求或任务执行链路。通过TraceID,我们可以将不同服务、不同组件中的日志和监控数据关联起来,形成一个完整的执行链路视图。
2.2 实现方式
1. 线程本地存储(ThreadLocal)
- 使用ThreadLocal存储当前线程的TraceID
- 适用于同一线程内的TraceID传递
- 简单易用,但不适用于跨线程场景
2. 上下文传递
- 手动将TraceID作为参数传递
- 适用于跨线程、跨服务的场景
- 代码侵入性较高
3. 消息头传递
- 在消息队列中通过消息头传递TraceID
- 适用于异步消息处理场景
- 需要消息队列支持消息头
4. MDC(Mapped Diagnostic Context)
- 使用Log4j或SLF4J的MDC机制
- 自动将TraceID添加到日志中
- 适用于日志关联
2.3 TraceID 生成策略
1. UUID
- 全局唯一
- 实现简单
- 长度较长
2. 雪花算法
- 有序,便于排序
- 长度较短
- 可能存在时钟回拨问题
3. 组合ID
- 结合时间戳、机器ID、序列号
- 可读性较好
- 唯一性有保障
三、SpringBoot 集成链路追踪
3.1 依赖配置
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Micrometer Tracing -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing</artifactId>
</dependency>
<!-- Micrometer Prometheus -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- Sleuth (可选,提供更完整的链路追踪) -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<!-- Log4j2 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
3.2 TraceID 工具类
@Component
public class TraceIdUtil {
private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<>();
private static final String TRACE_ID_HEADER = "X-Trace-Id";
/**
* 生成TraceID
*/
public static String generateTraceId() {
return UUID.randomUUID().toString().replace("-", "");
}
/**
* 设置TraceID
*/
public static void setTraceId(String traceId) {
TRACE_ID.set(traceId);
// 设置到MDC
MDC.put("traceId", traceId);
}
/**
* 获取TraceID
*/
public static String getTraceId() {
String traceId = TRACE_ID.get();
if (traceId == null) {
traceId = generateTraceId();
setTraceId(traceId);
}
return traceId;
}
/**
* 清除TraceID
*/
public static void clearTraceId() {
TRACE_ID.remove();
MDC.remove("traceId");
}
/**
* 从请求头获取TraceID
*/
public static String getTraceIdFromHeader(HttpServletRequest request) {
return request.getHeader(TRACE_ID_HEADER);
}
/**
* 设置TraceID到响应头
*/
public static void setTraceIdToHeader(HttpServletResponse response) {
response.setHeader(TRACE_ID_HEADER, getTraceId());
}
}
3.3 拦截器配置
@Component
public class TraceIdInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 从请求头获取TraceID,如果没有则生成新的
String traceId = TraceIdUtil.getTraceIdFromHeader(request);
if (traceId == null) {
traceId = TraceIdUtil.generateTraceId();
}
// 设置TraceID
TraceIdUtil.setTraceId(traceId);
// 设置到响应头
TraceIdUtil.setTraceIdToHeader(response);
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
// 清理TraceID
TraceIdUtil.clearTraceId();
}
}
3.4 任务执行器配置
@Configuration
public class TaskExecutorConfig {
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("task-executor-");
// 自定义线程工厂,传递TraceID
executor.setThreadFactory(new TraceIdThreadFactory());
executor.initialize();
return executor;
}
/**
* 带有TraceID的线程工厂
*/
private static class TraceIdThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(Runnable r) {
final String traceId = TraceIdUtil.getTraceId();
Thread thread = defaultFactory.newThread(() -> {
try {
TraceIdUtil.setTraceId(traceId);
r.run();
} finally {
TraceIdUtil.clearTraceId();
}
});
thread.setName("task-executor-" + threadNumber.getAndIncrement());
return thread;
}
}
}
四、任务执行链路追踪实现
4.1 任务执行服务
@Service
@Slf4j
public class TaskExecutionService {
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private TaskRepository taskRepository;
/**
* 提交任务
*/
public String submitTask(TaskRequest request) {
String taskId = UUID.randomUUID().toString();
String traceId = TraceIdUtil.getTraceId();
// 保存任务信息
Task task = new Task();
task.setTaskId(taskId);
task.setTraceId(traceId);
task.setStatus(TaskStatus.PENDING);
task.setCreatedTime(LocalDateTime.now());
task.setTaskType(request.getTaskType());
task.setTaskData(request.getTaskData());
taskRepository.save(task);
// 异步执行任务
taskExecutor.execute(() -> {
try {
// 设置TraceID
TraceIdUtil.setTraceId(traceId);
// 执行任务
executeTask(taskId, request);
} catch (Exception e) {
log.error("Task execution failed", e);
updateTaskStatus(taskId, TaskStatus.FAILED, e.getMessage());
} finally {
TraceIdUtil.clearTraceId();
}
});
log.info("Task submitted: taskId={}, traceId={}", taskId, traceId);
return taskId;
}
/**
* 执行任务
*/
private void executeTask(String taskId, TaskRequest request) {
log.info("Starting task execution: taskId={}", taskId);
// 更新任务状态为执行中
updateTaskStatus(taskId, TaskStatus.RUNNING, null);
try {
// 模拟任务执行
Thread.sleep(1000);
// 执行子任务
executeSubTask(taskId, "subTask1");
executeSubTask(taskId, "subTask2");
// 模拟任务完成
Thread.sleep(500);
// 更新任务状态为完成
updateTaskStatus(taskId, TaskStatus.COMPLETED, null);
log.info("Task completed: taskId={}", taskId);
} catch (Exception e) {
log.error("Task execution failed", e);
updateTaskStatus(taskId, TaskStatus.FAILED, e.getMessage());
throw new RuntimeException("Task execution failed", e);
}
}
/**
* 执行子任务
*/
private void executeSubTask(String taskId, String subTaskName) throws InterruptedException {
log.info("Starting sub task: taskId={}, subTask={}", taskId, subTaskName);
// 模拟子任务执行
Thread.sleep(500);
log.info("Sub task completed: taskId={}, subTask={}", taskId, subTaskName);
}
/**
* 更新任务状态
*/
private void updateTaskStatus(String taskId, TaskStatus status, String errorMessage) {
Task task = taskRepository.findByTaskId(taskId);
if (task != null) {
task.setStatus(status);
task.setErrorMessage(errorMessage);
task.setUpdatedTime(LocalDateTime.now());
taskRepository.save(task);
}
}
/**
* 查询任务状态
*/
public TaskStatus getTaskStatus(String taskId) {
Task task = taskRepository.findByTaskId(taskId);
return task != null ? task.getStatus() : TaskStatus.NOT_FOUND;
}
/**
* 查询任务详情
*/
public Task getTask(String taskId) {
return taskRepository.findByTaskId(taskId);
}
}
4.2 消息队列集成
@Service
@Slf4j
public class TaskMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.exchange.task}")
private String taskExchange;
@Value("${rabbitmq.routing-key.task}")
private String taskRoutingKey;
/**
* 发送任务消息
*/
public void sendTaskMessage(TaskRequest request) {
String traceId = TraceIdUtil.getTraceId();
// 创建消息头,包含TraceID
MessageProperties properties = new MessageProperties();
properties.setHeader("X-Trace-Id", traceId);
// 创建消息
TaskMessage message = new TaskMessage();
message.setTaskId(UUID.randomUUID().toString());
message.setTraceId(traceId);
message.setTaskType(request.getTaskType());
message.setTaskData(request.getTaskData());
message.setCreatedTime(LocalDateTime.now());
try {
// 序列化消息
byte[] body = new ObjectMapper().writeValueAsBytes(message);
Message rabbitMessage = new Message(body, properties);
// 发送消息
rabbitTemplate.send(taskExchange, taskRoutingKey, rabbitMessage);
log.info("Task message sent: taskId={}, traceId={}", message.getTaskId(), traceId);
} catch (Exception e) {
log.error("Failed to send task message", e);
throw new RuntimeException("Failed to send task message", e);
}
}
/**
* 接收任务消息
*/
@RabbitListener(queues = "${rabbitmq.queue.task}")
public void receiveTaskMessage(Message message) {
try {
// 从消息头获取TraceID
String traceId = (String) message.getMessageProperties().getHeaders().get("X-Trace-Id");
if (traceId == null) {
traceId = TraceIdUtil.generateTraceId();
}
// 设置TraceID
TraceIdUtil.setTraceId(traceId);
// 反序列化消息
TaskMessage taskMessage = new ObjectMapper().readValue(message.getBody(), TaskMessage.class);
log.info("Received task message: taskId={}, traceId={}", taskMessage.getTaskId(), traceId);
// 处理任务
processTaskMessage(taskMessage);
} catch (Exception e) {
log.error("Failed to process task message", e);
} finally {
TraceIdUtil.clearTraceId();
}
}
/**
* 处理任务消息
*/
private void processTaskMessage(TaskMessage message) {
log.info("Processing task message: taskId={}", message.getTaskId());
// 模拟任务处理
try {
Thread.sleep(1000);
log.info("Task message processed: taskId={}", message.getTaskId());
} catch (InterruptedException e) {
log.error("Task processing interrupted", e);
}
}
}
4.3 控制器实现
@RestController
@RequestMapping("/api/tasks")
public class TaskController {
@Autowired
private TaskExecutionService taskService;
@Autowired
private TaskMessageService messageService;
/**
* 提交任务
*/
@PostMapping
public ResponseEntity<String> submitTask(@RequestBody TaskRequest request) {
String taskId = taskService.submitTask(request);
return ResponseEntity.status(HttpStatus.CREATED).body(taskId);
}
/**
* 通过消息队列提交任务
*/
@PostMapping("/message")
public ResponseEntity<String> submitTaskByMessage(@RequestBody TaskRequest request) {
messageService.sendTaskMessage(request);
return ResponseEntity.status(HttpStatus.ACCEPTED).body("Task message sent");
}
/**
* 查询任务状态
*/
@GetMapping("/{taskId}/status")
public ResponseEntity<TaskStatus> getTaskStatus(@PathVariable String taskId) {
TaskStatus status = taskService.getTaskStatus(taskId);
return ResponseEntity.ok(status);
}
/**
* 查询任务详情
*/
@GetMapping("/{taskId}")
public ResponseEntity<Task> getTask(@PathVariable String taskId) {
Task task = taskService.getTask(taskId);
if (task == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(task);
}
}
五、全链路可观测性实现
5.1 监控指标
@Service
public class TaskMetricsService {
private final MeterRegistry meterRegistry;
private final Counter taskSubmittedCounter;
private final Counter taskCompletedCounter;
private final Counter taskFailedCounter;
private final Timer taskExecutionTimer;
public TaskMetricsService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// 任务提交计数器
this.taskSubmittedCounter = Counter.builder("task.submitted")
.description("Number of tasks submitted")
.tag("type", "task")
.register(meterRegistry);
// 任务完成计数器
this.taskCompletedCounter = Counter.builder("task.completed")
.description("Number of tasks completed")
.tag("type", "task")
.register(meterRegistry);
// 任务失败计数器
this.taskFailedCounter = Counter.builder("task.failed")
.description("Number of tasks failed")
.tag("type", "task")
.register(meterRegistry);
// 任务执行计时器
this.taskExecutionTimer = Timer.builder("task.execution.time")
.description("Task execution time")
.tag("type", "task")
.register(meterRegistry);
}
/**
* 记录任务提交
*/
public void recordTaskSubmitted() {
taskSubmittedCounter.increment();
}
/**
* 记录任务完成
*/
public void recordTaskCompleted() {
taskCompletedCounter.increment();
}
/**
* 记录任务失败
*/
public void recordTaskFailed() {
taskFailedCounter.increment();
}
/**
* 记录任务执行时间
*/
public <T> T recordTaskExecution(Supplier<T> task) {
return taskExecutionTimer.record(task);
}
/**
* 记录任务执行时间
*/
public void recordTaskExecution(Runnable task) {
taskExecutionTimer.record(task);
}
}
5.2 日志配置
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%X{traceId}] %logger{36} - %msg%n"/>
</Console>
<RollingFile name="File" fileName="logs/task-execution.log" filePattern="logs/task-execution-%d{yyyy-MM-dd}.log.gz">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%X{traceId}] %logger{36} - %msg%n"/>
<Policies>
<TimeBasedTriggeringPolicy interval="1" modulate="true"/>
</Policies>
<DefaultRolloverStrategy max="10"/>
</RollingFile>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
<AppenderRef ref="File"/>
</Root>
</Loggers>
</Configuration>
5.3 监控面板
Grafana 面板
创建一个Grafana面板,包含以下指标:
-
任务执行统计
- 任务提交数
- 任务完成数
- 任务失败数
-
任务执行时间
- 平均执行时间
- 95%分位数执行时间
- 99%分位数执行时间
-
任务状态分布
- 待处理任务数
- 执行中任务数
- 完成任务数
- 失败任务数
-
链路追踪
- 基于TraceID的任务执行链路
- 子任务执行时间分布
六、生产级实现
6.1 配置文件
application.yml
# 应用配置
spring:
application:
name: springboot-task-tracing
# RabbitMQ 配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 数据源配置
datasource:
url: jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC
username: root
password: password
driver-class-name: com.mysql.cj.jdbc.Driver
# JPA 配置
jpa:
database-platform: org.hibernate.dialect.MySQL8Dialect
hibernate:
ddl-auto: update
show-sql: false
# 服务器配置
server:
port: 8080
servlet:
context-path: /
# RabbitMQ 配置
rabbitmq:
exchange:
task: task.exchange
routing-key:
task: task.routing.key
queue:
task: task.queue
# 任务执行配置
task:
executor:
core-pool-size: 10
max-pool-size: 50
queue-capacity: 100
# 监控配置
monitor:
enabled: true
interval: 60000 # 60秒
# 监控配置
management:
endpoints:
web:
exposure:
include: "health,info,metrics,prometheus"
endpoint:
health:
show-details: always
# 日志配置
logging:
config: classpath:log4j2.xml
level:
com.example.task: info
6.2 错误处理
@ControllerAdvice
@Slf4j
public class GlobalExceptionHandler {
@ExceptionHandler(Exception.class)
public ResponseEntity<ErrorResponse> handleException(Exception e) {
String traceId = TraceIdUtil.getTraceId();
log.error("Error occurred", e);
ErrorResponse response = new ErrorResponse();
response.setTraceId(traceId);
response.setMessage(e.getMessage());
response.setTimestamp(LocalDateTime.now());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
@Data
static class ErrorResponse {
private String traceId;
private String message;
private LocalDateTime timestamp;
}
}
6.3 安全配置
@Configuration
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.csrf().disable()
.authorizeRequests()
.antMatchers("/api/tasks/**").permitAll()
.antMatchers("/actuator/**").hasRole("ADMIN")
.anyRequest().authenticated()
.and()
.httpBasic();
}
}
七、最佳实践
7.1 TraceID 管理
1. 生成策略
- 使用UUID确保全局唯一
- 保持TraceID的一致性
- 在系统边界处生成TraceID
2. 传递方式
- HTTP请求:通过请求头传递
- 消息队列:通过消息头传递
- 线程池:通过线程工厂传递
- 跨服务:通过服务间调用传递
3. 存储方式
- ThreadLocal:适用于同一线程内
- MDC:适用于日志关联
- 数据库:适用于持久化
7.2 任务执行监控
1. 关键指标
- 任务提交率
- 任务完成率
- 任务失败率
- 任务执行时间
- 任务状态分布
2. 告警配置
- 任务失败率超过阈值
- 任务执行时间过长
- 任务队列积压
3. 日志记录
- 记录任务开始和结束
- 记录任务执行过程中的关键步骤
- 记录任务失败的详细信息
- 关联TraceID便于追踪
7.3 性能优化
1. 任务执行
- 合理配置线程池
- 避免任务阻塞
- 优化任务执行逻辑
2. 监控开销
- 减少监控指标的数量
- 合理设置监控采样率
- 避免监控对系统性能的影响
3. 存储优化
- 合理设置任务数据的存储策略
- 定期清理历史任务数据
- 使用缓存提高查询性能
八、案例分析
8.1 案例一:电商订单处理
场景:
- 用户下单后,系统需要处理订单、扣减库存、生成物流单等多个任务
- 这些任务可能分布在不同的服务中
- 需要追踪整个订单处理的完整链路
解决方案:
- 为每个订单分配一个TraceID
- 在订单处理的各个环节传递TraceID
- 通过TraceID关联各个服务的日志和监控数据
- 构建完整的订单处理链路视图
效果:
- 快速定位订单处理过程中的问题
- 分析订单处理的性能瓶颈
- 优化订单处理流程
- 提高系统的可靠性
8.2 案例二:数据ETL处理
场景:
- 系统需要定期执行数据ETL任务
- ETL任务包含多个步骤:提取、转换、加载
- 任务执行时间较长,需要实时监控
解决方案:
- 为每个ETL任务分配一个TraceID
- 记录每个步骤的执行状态和耗时
- 通过TraceID追踪整个ETL过程
- 实时监控ETL任务的执行情况
效果:
- 实时了解ETL任务的执行状态
- 快速定位ETL过程中的问题
- 优化ETL任务的执行效率
- 提高数据处理的可靠性
8.3 案例三:微服务调用链
场景:
- 一个请求需要调用多个微服务
- 每个微服务都有自己的处理逻辑
- 需要追踪整个调用链的执行情况
解决方案:
- 为每个请求分配一个TraceID
- 在微服务之间传递TraceID
- 记录每个微服务的调用情况
- 构建完整的调用链视图
效果:
- 了解请求在各个微服务中的执行情况
- 快速定位微服务调用中的问题
- 分析微服务之间的性能瓶颈
- 优化微服务的调用关系
互动话题
- 您在项目中使用过哪些链路追踪工具?有什么经验分享?
- 您对本文介绍的TraceID透传方案有什么改进建议?
- 您认为在微服务架构中,链路追踪还有哪些挑战?
- 您对未来链路追踪技术的发展有什么看法?
欢迎在评论区分享您的经验和看法!欢迎关注公众号“服务端技术精选”,获取更多技术文章和建议。
标题:SpringBoot + 任务执行链路追踪 + TraceID 透传:从调度到完成,全链路可观测
作者:jiangyi
地址:http://jiangyi.space/articles/2026/03/15/1773296405543.html
公众号:服务端技术精选
- 导语
- 一、任务执行链路追踪的概念
- 1.1 什么是链路追踪
- 1.2 任务执行链路的特点
- 1.3 链路追踪的价值
- 二、TraceID 透传的实现
- 2.1 TraceID 的概念
- 2.2 实现方式
- 2.3 TraceID 生成策略
- 三、SpringBoot 集成链路追踪
- 3.1 依赖配置
- 3.2 TraceID 工具类
- 3.3 拦截器配置
- 3.4 任务执行器配置
- 四、任务执行链路追踪实现
- 4.1 任务执行服务
- 4.2 消息队列集成
- 4.3 控制器实现
- 五、全链路可观测性实现
- 5.1 监控指标
- 5.2 日志配置
- 5.3 监控面板
- 六、生产级实现
- 6.1 配置文件
- 6.2 错误处理
- 6.3 安全配置
- 七、最佳实践
- 7.1 TraceID 管理
- 7.2 任务执行监控
- 7.3 性能优化
- 八、案例分析
- 8.1 案例一:电商订单处理
- 8.2 案例二:数据ETL处理
- 8.3 案例三:微服务调用链
- 互动话题
评论
0 评论