07 AI框架总原理(从 run 到 fallback 的完整骨架)
这篇回答一个核心问题:OpenClaw 的智能体为什么“能长期跑”,而不是只在 demo 里跑一次。
核心源码入口
src/agents/pi-embedded-runner/run.tssrc/agents/pi-embedded-runner/run/attempt.tssrc/agents/pi-embedded-subscribe.tssrc/agents/pi-embedded-runner/runs.tssrc/agents/model-fallback.tssrc/agents/failover-error.ts
先建立全局心智图
真实链路不是一次 model.generate(),而是多层状态机:
queue(session lane) -> queue(global lane) -> attempt -> subscribe(stream) -> wait compaction -> finalize -> (error? rotate profile / compact / fallback)
对应到代码:
- 调度层:
runEmbeddedPiAgent(...) - 执行层:
runEmbeddedAttempt(...) - 事件层:
subscribeEmbeddedPiSession(...) - 运行注册层:
setActiveEmbeddedRun(...) / clearActiveEmbeddedRun(...) - 跨模型韧性层:
runWithModelFallback(...)
模块一 调度层原理(run.ts)
runEmbeddedPiAgent(...) 不做“细节执行”,只做“宏观决策”:
- 并发隔离:
resolveSessionLane(...)+resolveGlobalLane(...),再双层enqueue。 - 预检拦截:
resolveContextWindowInfo(...)+evaluateContextWindowGuard(...),窗口太小直接阻断。 - 认证与账户轮换:内部
advanceAuthProfile(...),限流/鉴权后切账号。 - 上下文溢出恢复:
- 先
compactEmbeddedPiSessionDirect(...) - 再
truncateOversizedToolResultsInSession(...) - 超过上限返回可读错误,不死循环
- 最终输出拼装:构建 payload + meta(usage、provider、model、error)。
你可以把它理解成“总导演”。
模块二 执行层原理(attempt.ts)
runEmbeddedAttempt(...) 是“一次真实执行事务”,关键点是顺序:
- 会话卫生:
sanitizeSessionHistory(...)limitHistoryTurns(...)sanitizeToolUseResultPairing(...)
- 挂订阅:
subscribeEmbeddedPiSession(...) - Hook 注入:
runBeforeAgentStart(...)- 若返回
prependContext,拼成prependContext + prompt
- 执行 prompt:
activeSession.prompt(...) - 等压缩稳定:
waitForCompactionRetry() - 快照选择:压缩超时时
selectCompactionTimeoutSnapshot(...) - 收尾 Hook:
runAgentEnd(...) - finally 清理:
unsubscribe()、释放 active run、释放会话资源
你可以把它理解成“单次事务执行器”。
模块三 事件层原理(subscribe.ts)
订阅层负责把底层事件变成稳定的上层信号:
- 文本流:维护
assistantTexts - 工具流:维护
toolMetas、tool summary - 压缩流:维护三个关键状态:
compactionInFlightpendingCompactionRetrycompactionRetryPromise
- 对外暴露
waitForCompactionRetry(),保证 run 不会在压缩未完成时“假完成”。
这是“流式事件稳定器”。
模块四 运行注册层原理(runs.ts)
ACTIVE_EMBEDDED_RUNS 的作用是“运行时控制平面”:
- 注册:
setActiveEmbeddedRun(sessionId, handle) - steer:
queueEmbeddedPiMessage(...) - 中断:
abortEmbeddedPiRun(...) - 清理:
clearActiveEmbeddedRun(...)(带 handle 匹配,防止误删新 run)
这是“可中断、可插消息”的基础。
模块五 韧性层原理(model-fallback.ts)
runWithModelFallback(...) 是跨模型兜底:
resolveFallbackCandidates(...)先生成候选列表(主模型 + fallbacks + 去重)- 每次失败后:
coerceToFailoverError(...)归一化错误describeFailoverError(...)记录 reason/status/code
shouldRethrowAbort(...)对用户主动 Abort 直接停止,不做 fallback- 全失败时返回汇总错误:
All models failed (...)
这是“从单模型系统升级到可用系统”的关键。
从 0 到 1 复刻建议
第一版先抄 3 层:
run(调度)attempt(单次事务)subscribe(事件聚合)
第二版补 2 层:
runs(运行注册与中断)model-fallback(跨模型兜底)
最小复刻骨架
ts
async function runAgentTurn(req: Req) {
return enqueue(req.sessionLane, () =>
enqueue(req.globalLane, async () => {
const handle = registerActive(req.sessionId);
try {
const attempt = await runAttempt(req);
return attempt;
} catch (err) {
return await runWithFallback(err, req);
} finally {
clearActive(req.sessionId, handle);
}
}),
);
}自检清单
- 同一会话是否严格串行,不会消息乱序。
- 压缩重试期间是否不会提前返回“完成”。
- abort 后是否不会残留 active run。
- 模型失败时是否有结构化 attempts 轨迹。
- 账号轮换、压缩恢复、fallback 是否互不打架。