17 智能体执行状态机实现实战(run/attempt/subscribe/runs)
这篇把“智能体框架”拆成可以编码的状态机。
对应源码入口
src/agents/pi-embedded-runner/run.tssrc/agents/pi-embedded-runner/run/attempt.tssrc/agents/pi-embedded-subscribe.tssrc/agents/pi-embedded-subscribe.handlers.tssrc/agents/pi-embedded-runner/runs.tssrc/agents/pi-embedded-runner/run/types.tssrc/agents/pi-embedded-runner/run/params.tssrc/auto-reply/reply/agent-runner.tssrc/auto-reply/reply/agent-runner-execution.ts
状态机总图(建议直接照抄)
idle -> queued(session lane) -> queued(global lane) -> attempting -> streaming -> compacting(optional) -> completed/failed -> idle
补充控制态:
aborting(用户或超时触发)waiting_compaction_retry(订阅层等待重试结束)
第一层:上层编排状态机(agent-runner)
runReplyAgent(...) 与 runAgentTurnWithFallback(...) 是“入口总控”:
- 先判断是否
steer/followup,避免重复启动新 run。 - 做 memory flush、typing、reply pipeline 等外围能力。
- 用
runWithModelFallback(...)包住执行,内部调用runEmbeddedPiAgent(...)(或 CLI provider 走runCliAgent(...))。 - 统一处理“瞬时错误重试、模型回退、会话重置”等恢复路径。
可以理解为:agent-runner* 负责“业务编排”,run.ts/attempt.ts 负责“智能体内核执行”。
第二层:调度状态机(run.ts)
runEmbeddedPiAgent(...) 负责:
- 车道排队:
enqueue(sessionLane)再enqueue(globalLane)。 - 模型/鉴权准备:
resolveModel+ auth profile 轮换。 - 执行循环:
while(true)调runEmbeddedAttempt(...)。 - 故障分支:上下文溢出、限流、鉴权失败、thinking 降级、profile 轮换。
- 输出组装:
buildEmbeddedRunPayloads(...)。
第三层:执行状态机(attempt.ts)
runEmbeddedAttempt(...) 负责一次真实尝试:
- 环境与会话准备:
resolveSandboxContext(...)createOpenClawCodingTools(...)createAgentSession(...)
- 订阅挂载:
subscribeEmbeddedPiSession(...)- 返回
assistantTexts/toolMetas/waitForCompactionRetry/...
- 运行期注册 active run:
- 构造
queueHandle(queueMessage/isStreaming/isCompacting/abort) setActiveEmbeddedRun(sessionId, queueHandle)
- 退出清理(必须 finally):
unsubscribe()clearActiveEmbeddedRun(sessionId, queueHandle)flushPendingToolResultsAfterIdle(...)session.dispose()
这一步是很多项目会漏掉的关键。
第四层:事件状态机(subscribe.ts + handlers)
createEmbeddedPiSessionEventHandler(...) 分发事件:
message_start-> 重置消息级状态message_update-> 增量流处理(delta、reasoning、block chunk)message_end-> 最终文本/推理输出收敛tool_execution_start/update/end-> 工具事件与摘要auto_compaction_start/end-> 压缩生命周期agent_start/agent_end-> 生命周期事件
订阅层核心状态字段:
ts
type EmbeddedPiSubscribeState = {
assistantTexts: string[];
toolMetas: Array<{ toolName?: string; meta?: string }>;
compactionInFlight: boolean;
pendingCompactionRetry: number;
compactionRetryPromise: Promise<void> | null;
unsubscribed: boolean;
};第五层:运行注册表状态机(runs.ts)
ACTIVE_EMBEDDED_RUNS: Map<sessionId, queueHandle>
操作:
setActiveEmbeddedRun:进入processing。queueEmbeddedPiMessage:仅在isStreaming && !isCompacting时允许 steer。abortEmbeddedPiRun:调用handle.abort()。clearActiveEmbeddedRun:仅 handle 匹配才删除,避免误删新 run。waitForEmbeddedPiRunEnd:用于外部等待 run 完结。
最小复刻代码骨架
ts
type RunState = "idle" | "queued" | "attempting" | "streaming" | "compacting" | "done" | "failed";
async function runAgentTurn(req: Req) {
return enqueue(req.sessionLane, () =>
enqueue(req.globalLane, async () => {
let state: RunState = "attempting";
const handle = registerActive(req.sessionId);
try {
const sub = subscribe(req.session, onEvent);
state = "streaming";
await req.session.prompt(req.prompt);
await sub.waitForCompactionRetry();
state = "done";
return collectResult(sub);
} catch (e) {
state = "failed";
throw e;
} finally {
unregisterActive(req.sessionId, handle);
}
}),
);
}验收标准(必须过)
- 同一
sessionId同时只存在一个 active handle。 abort后不会遗留 active run。- tool 事件和文本事件都可在订阅层输出。
- compaction 重试期间不会提前返回“完成”。
- handle mismatch 时不会误 clear 新 run。
- finally 清理路径无异常吞没。
开发避坑
- 不要把 run/attempt/subscribe 混成一个大函数。
- active run 必须“注册-清理成对”,而且清理要校验 handle。
- compaction wait 不能只看
compactionInFlight,还要看pendingCompactionRetry。 - abort/timer/listener 清理必须在 finally,不能依赖 happy path。