Kang YiKai

从批处理到精细化任务 - 如何重构错误追踪机制

· 8 min read

有一种经典的场景是,为了提升系统的可维护性和可观测性,我们需要将系统从基于批次的状态(Tasks Batch Status)转向至单个任务(Single Task)的精细化错误追踪。

为了实现这个目标,我们可以让 Data Object 自己携带目标,而不是通过方法的参数来统一传递状态,从而使代码的变动最小,并保持优雅。

当前的情况

  1. 获取任务:业务逻辑层(use case layer)指挥服务层(service layer),按批次(Batch)从数据库中拉取待处理的任务。
  2. 获取数据和调用:基础设施层(infra)发送请求,每个请求会携带批量信息。
  3. 状态转换:服务层根据外部服务的返回结果或抛出的异常,将结果统一包装成一个自定义的 BatchStatus(比如 SUCCESS, PARTIAL_SUCCESS, FAILED
  4. 批量更新:业务逻辑层根据 BatchStatus,对这一批任务进行分类,并循环调用 Repository 层的方法更新数据库状态。

核心的目标

我们希望提升服务在批量任务处理中的可观测性与精细化排障能力。

当底层服务调用失败的时候,系统不仅能够为任务标记一个笼统的 FAILEDRETRY 状态,还能够将底层错误的原生信息,例如 HTTP 状态码、具体的 Client Error Code、Service Response Message 等等,精准地穿透并传递,并持久化到每一个具体任务的数据库记录中。

遇到的问题

在当前的机制下,服务有严重的错误上下文丢失(Context Dropping)问题:

  • 信息断层:当底层的基础设施捕获了错误码和错误信息,但这些 debug 信息仅仅被打印在了日志中,随后就被丢弃了。
  • 状态扁平:服务层只包装了一个干瘪的 BatchStatus 和受到影响的任务 ID 集合。
  • Debug 成本高:数据库中,所有的失败任务看起来千篇一律。开发人员如果想要知道某个具体任务的原因时,必须去拿着任务的 ID 去日志系统中搜索。

代码示例

我们需要从持久层中获取 Task,然后请求外部 Client 并获得结果,并将包含着结果或错误的 Result 传递回 Task,最后更新持久层。

第一步:让 BatchProcessResult 具备携带错误的能力

假设当前有一个 BatchProcessResult,里面包含了请求外部 Client 后的统一结果 BatchStatus,里面包含了多个 ItemId

以前只能返回一个简单的 BatchStatus,现在我们希望记录每一个 item 的具体细节。为此,我们引入一个新的 ErrorInfo 记录,并在 Result 中建立项目 Id 和错误信息的映射。

// 1. 新增 record 来封装对于每个 item 的错误细节
public record ErrorInfo(
    String failedReason,
    String errorLabel,
    String errorDetails
) {}


// 2. 改造批处理返回结果类
public class BatchProcessResult {
	// 原来只有Batch的结果
    private BatchStatus batchStatus;
    private final Set<String> failedItemIds = new HashSet<>();

    // 【新增】用于存储哪个 Item 发生了什么错误
    private final Map<String, ErrorInfo> errorInfoByItemId = new HashMap<>();

    // ... 省略 getter 和 setter ...

    public void addFailedItemWithError(String itemId, ErrorInfo errorInfo) {
        failedItemIds.add(itemId);
        errorInfoByItemId.put(itemId, errorInfo);
    }

    public ErrorInfo getErrorInfo(String itemId) {
        return errorInfoByItemId.getOrDefault(itemId, null);
    }
}

第二步:在底层捕获异常

在发起网络调用的位置(infra/service layer),我们要改变过去「只打印日志,返回模糊 FAILED」的做法。我们需要捕获 HTTP 状态码,三方系统返回的 Message 等等信息,包装进 ErrorInfo 中,并向上传递。

// 服务层,与client交互并获得 result
public BatchProcessResult processBatch(String clientId, Set<String> itemIds) {

	RemoteClient client = ClientRegistry.get(clientId);
	if (client == null) {
		//【新增】如果 client 错误,则该 batch 中所有的 items 都会有统一的错误
		ErrorInfo errorInfo = new ErrorInfo(
			"CLIENT_CONFIG_ERROR",
			"NO_CLIENT",
			"more details"
		);
		return createBatchResultWithError(BatchStatus.FAILED, itemIds, errorInfo);
	}

	try {
		Response response = client.sendItems(itemIds);

		if (response == null) {
			return createBatchResult(BatchStatus.SUCCESS);
		}
		//【新增】可以返回包含不同 item 错误的 BatchProcessResult
		return createBatchResultForPartialSuccess(itemIds, response);
	} catch (ClientRequestException e) {
		// 进一步的丰富 client 的错误返回信息
		ErrorInfo errorInfo = new ErrorInfo(
			"CLIENT_REQUEST_ERROR",
			e.getErrorCode(),
			e.getMessage()
		);
		return createBatchResultWithError(BatchStatus.RETRY, itemIds, errorInfo);
	} catch(Exception e) {
		ErrorInfo errorInfo = new ErrorInfo(
			"SYSTEM_ERROR",
			"UNKNOWN_EXCEPTION",
			e.getMessage()
		);
		return createBatchResultWithError(BatchStatus.UNKNOWN, itemIds, errorInfo);
	}
}

第三步:业务逻辑层调度组织,持久化前缝合数据

这是整合,或对齐 taskresult 的步骤。在业务逻辑层,拿到了底层返回的 BatchProcessResult,我们不直接丢给 Repository,而是根据 result 丰富 task 信息:将底层传上来的错误细节,根据 itemId 赋给 Task 实体(enrichTasksWithErrorInfo)。

// 业务逻辑层,负责编排服务
public void processBatchTasks(String clientId) {
	// 获取 task (或说 Entity)
	List<Task> tasks = taskService.getNextTasks();
	if (tasks.isEmpty()) return;

	Set<String> itemIds = extractItemIds(tasks);

	BatchProcessResult result = batchProcessor.processBatch(clientId, itemIds);

	// 由于 batch 成功,没有错误,则不需要绑定到 task
	if (result.getBatchStatus() == BatchStatus.SUCCESS) {
        taskService.updateTasks(tasks, TaskStatus.SUCCESS);
        return;
    }

    // 【新增】 在持久化前,把 result 的错误细节缝合到 task 上
    enrichTasksWithErrorInfo(tasks, result);

    // 基于 BatchStatus 分发 TaskStatus
    switch (result.getBatchStatus()) {
        case PARTIAL_SUCCESS -> handlePartialSuccess(tasks, result); // 需要对 failed item 做额外的处理
        case RETRY -> taskService.updateTasks(tasks, TaskStatus.RETRY);
        default -> taskService.updateTasks(tasks, TaskStatus.FAILED);
    }
}

// 缝合 task 和 result 的方法
private void enrichTasksWithErrorInfo(List<Task> tasks, BatchProcessResult result) {
    for (Task task : tasks) {
        ErrorInfo errorInfo = result.getErrorInfo(task.getItemId());
        if (errorInfo != null) {
            task.setFailedReason(errorInfo.failedReason());
            task.setErrorLabel(errorInfo.errorLabel());
            task.setErrorDetails(errorInfo.errorDetails());
        }
    }
}

第四步:干净的服务层,去做持久化

该服务层可以专注于和 task 有关的信息处理,而不会掺杂 result 里面的 failedReason 之类的信息。

// 1. 现在的 Service 层更新方法变得极其干净
public void updateTasks(List<Task> tasks, TaskStatus status) {
    // tasks 里面已经包含了 errorLabel 和 errorDetails, Repository 执行 update 时会自然而然地把它们刷进数据库
    // 这里根据 TaskStatus 做最后的处理
    switch (status) {
        case SUCCESS -> taskRepository.deleteTasks(tasks);
        case FAILED -> handleFailed(tasks);
        default -> handleRetry(tasks);
    }
}

// 2. Task 实体类准备接收这些数据
@Entity
@Table(name = "tasks")
public class Task {
    @Column(name = "error_label", length = 255)
    private String errorLabel;

    @Column(name = "error_details", length = 255)
    private String errorDetails;

    // ...
}

总结

通过引入了额外的 Context 载体 ErrorInfo,我们解耦了「捕获异常」与「持久化异常」的生命周期。底层只需要如实记录,服务层和业务逻辑层只管按图索骥赋值。

这保证了方法和模块的单一职责,又极大地提升了系统在线上环境的可观测性。

免责声明: 代码示例已简化并通用化,仅用于教学目的,侧重于架构模式而非具体项目实现。

感谢您的阅读!您的支持是我的动力。

如果您喜欢这篇文章,不妨请我喝杯咖啡。 ☕️

wechat