本文是 事件驱动的 ASR → LLM → ES 回写:设计拆解与实践 系列的第三篇,重点讲解 ES 回写的状态流转与失败补偿设计。

当分析链路已经跑起来之后,最后一个关键问题就是:结果怎么可靠落到 ES 上。

这一步看起来像”写一次索引”这么简单,但真正工程里,它往往决定了整个链路是否稳定。因为只要 ES 写入失败、顺序乱掉或者状态不同步,整个分析链路就会出现”看起来完成了,实际上没完成”的问题。

为什么 ES 回写要单独设计

很多人会把 ES 当成一个普通存储,但在分析系统里,它更像一个”可检索的结果视图”。

这意味着:

  • 前面的分析阶段要不断把结果回填进去
  • 某些字段会在不同阶段被多次更新
  • 状态字段需要和业务状态保持一致
  • 一旦写失败,不能简单丢掉

所以 ES 回写必须独立设计,而不能散在各个分析步骤里。

先改状态,再写结果

一个比较稳妥的方式是:

  1. 先把状态改成”正在分析”
  2. 再逐步写入各阶段结果
  3. 全部完成后再进入结束态

这样做的价值是让外部能一眼看懂当前进度。

比如:

  • 待分析
  • 正在分析
  • 分析完成
  • 分析失败
  • 待重试

这些状态并不是装饰,而是整个链路的”进度条”。

统一写入接口的意义

不管是新增、更新还是删除,最好都走统一的 ES 访问层。

这样做的原因有三个:

  • 错误处理可以统一
  • 日志可以统一
  • 失败补偿可以统一

如果每个地方都直接操作 ES,后面一旦需要加重试、加监控、加失败记录,改动范围会很大。

统一访问层的思路是:

  • 主流程只发出”写入请求”
  • 访问层负责真正执行写操作
  • 失败时统一记录上下文

失败记录不是补丁,而是设计的一部分

很多系统到了出问题的时候,才临时补一层重试。但更好的方式,是从一开始就把失败记录设计进去。

失败记录通常至少要保存:

  • 文档 ID
  • 索引名
  • 操作类型
  • 失败原因
  • 触发来源
  • 原始数据摘要

这样后续不管是人工排查还是自动补偿,都有依据。

从学习者角度看,这里最值得理解的是:

失败记录不是额外负担,而是系统可靠性的一部分。

补偿任务怎么思考

补偿任务的目标不是”无脑重试”,而是”有条件地修正”。

一个更合理的思路是:

  • 先筛选可重试的数据
  • 再分批处理
  • 对失败次数做限制
  • 对高频失败进行告警

如果不加控制,补偿任务很容易变成重试风暴,反而把系统拖慢。

所以补偿任务最重要的不是”跑起来”,而是”跑得克制”。

状态流转和补偿要一起看

状态流转和失败补偿不是两件事,而是同一件事的两面。

状态流转告诉你”现在在哪”;

补偿机制告诉你”出错了怎么办”。

两者配合起来,系统才真正可靠。

可以把它记成一句很实用的话:

状态负责表达进度,补偿负责兜住意外。

把状态流转想成一个小状态机

如果把这条链路简化一下,状态大致可以理解成一个小状态机:

1
2
TO_ANALYZE -> ANALYZING -> ANALYZED
\-> FAILED -> TO_RETRY

这里最重要的是:状态不是给人看的装饰,而是给系统和人共同使用的”进度描述”。

为什么状态要先改

先改状态的目的,是让外部看到”系统已经开始处理了”。这样就算后续写入慢一点,也不会让任务看起来像没启动。

为什么状态要和结果分开更新

阶段结果可能分批到达,如果把所有结果绑成一次大更新,失败时很难恢复。分开更新能让每一步更可控。

伪代码:写回 ES 的标准动作

下面这段伪代码可以帮助你理解回写流程:

1
2
3
4
5
6
7
8
9
10
11
function updateEsDocument(taskId, patch):
try:
ok = esClient.update(taskId, patch)
if ok:
return success
else:
recordFailure(taskId, patch, "update returned false")
return fail
except Exception e:
recordFailure(taskId, patch, e.message)
return fail

如果再加上状态更新,通常会是这样:

1
2
3
4
5
6
7
8
function markAnalyzing(taskId):
esClient.update(taskId, { status: "ANALYZING" })

function markAnalyzed(taskId, resultPatch):
esClient.update(taskId, merge({ status: "ANALYZED" }, resultPatch))

function markRetry(taskId):
esClient.update(taskId, { status: "TO_RETRY" })

这说明 ES 回写不是一个动作,而是一个动作序列。

失败记录应该记录什么

失败记录并不是简单记一句”失败了”,而是要尽量能帮助你后面恢复问题。

建议至少保存:

  • 任务 ID
  • 索引名
  • 操作类型(新增、更新、删除)
  • 失败时的字段片段
  • 错误摘要
  • 触发时间
  • 重试次数

有了这些信息,人工排查和自动补偿都更容易。

补偿任务应该怎么跑

补偿任务的核心不是”越快越好”,而是”可控、可查、可停”。

先筛选

不要对所有失败记录一起重试。先过滤:

  • 最近一段时间内的记录
  • 重试次数没超过阈值的记录
  • 状态仍然允许重试的记录

再分批

分批可以避免一次性把系统压垮。

再重试

重试失败后要记录新的失败原因,而不是把老记录直接覆盖掉。

伪代码:补偿任务怎么写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
function compensateFailures():
candidates = loadRetryableRecords()

for record in candidates:
if record.retryCount >= MAX_RETRY:
continue

result = tryApply(record)
if result.success:
markRecordSuccess(record)
else:
markRecordFailure(record, result.error)
if record.retryCount + 1 >= MAX_RETRY:
sendAlarm(record)

这段伪代码里最重要的三个点是:

  • 先筛选,再重试
  • 重试要计数
  • 到上限要告警

最后把这条链路串成一句话

你可以把整套设计理解成这样:

主流程负责把结果推向 ES,失败记录负责把异常留下来,补偿任务负责把没走通的地方重新接上。

这才是整条链路里最值得学习的部分:不是一次写成功,而是即使失败了,也有办法继续往前走。

学习这一段时最该注意的点

如果只看一次写入成功,系统会显得很简单;

如果把失败、重试、状态同步都考虑进去,你才会真正理解工程设计的难点。

建议重点记住这三条:

  • ES 不只是存结果,也是对外呈现状态
  • 失败记录是补偿链路的入口
  • 重试一定要受控,不能无限放大

到这里,ASR、LLM 和 ES 的整条链路就基本串起来了。可以回到 概述文章 回顾整体设计思路。