Skip to content

07 AI框架总原理(从 run 到 fallback 的完整骨架)

这篇回答一个核心问题:OpenClaw 的智能体为什么“能长期跑”,而不是只在 demo 里跑一次。

核心源码入口

  • src/agents/pi-embedded-runner/run.ts
  • src/agents/pi-embedded-runner/run/attempt.ts
  • src/agents/pi-embedded-subscribe.ts
  • src/agents/pi-embedded-runner/runs.ts
  • src/agents/model-fallback.ts
  • src/agents/failover-error.ts

先建立全局心智图

真实链路不是一次 model.generate(),而是多层状态机:

queue(session lane) -> queue(global lane) -> attempt -> subscribe(stream) -> wait compaction -> finalize -> (error? rotate profile / compact / fallback)

对应到代码:

  1. 调度层:runEmbeddedPiAgent(...)
  2. 执行层:runEmbeddedAttempt(...)
  3. 事件层:subscribeEmbeddedPiSession(...)
  4. 运行注册层:setActiveEmbeddedRun(...) / clearActiveEmbeddedRun(...)
  5. 跨模型韧性层:runWithModelFallback(...)

模块一 调度层原理(run.ts)

runEmbeddedPiAgent(...) 不做“细节执行”,只做“宏观决策”:

  1. 并发隔离:resolveSessionLane(...) + resolveGlobalLane(...),再双层 enqueue
  2. 预检拦截:resolveContextWindowInfo(...) + evaluateContextWindowGuard(...),窗口太小直接阻断。
  3. 认证与账户轮换:内部 advanceAuthProfile(...),限流/鉴权后切账号。
  4. 上下文溢出恢复:
  • compactEmbeddedPiSessionDirect(...)
  • truncateOversizedToolResultsInSession(...)
  • 超过上限返回可读错误,不死循环
  1. 最终输出拼装:构建 payload + meta(usage、provider、model、error)。

你可以把它理解成“总导演”。

模块二 执行层原理(attempt.ts)

runEmbeddedAttempt(...) 是“一次真实执行事务”,关键点是顺序:

  1. 会话卫生:
  • sanitizeSessionHistory(...)
  • limitHistoryTurns(...)
  • sanitizeToolUseResultPairing(...)
  1. 挂订阅:subscribeEmbeddedPiSession(...)
  2. Hook 注入:
  • runBeforeAgentStart(...)
  • 若返回 prependContext,拼成 prependContext + prompt
  1. 执行 prompt:activeSession.prompt(...)
  2. 等压缩稳定:waitForCompactionRetry()
  3. 快照选择:压缩超时时 selectCompactionTimeoutSnapshot(...)
  4. 收尾 Hook:runAgentEnd(...)
  5. finally 清理:unsubscribe()、释放 active run、释放会话资源

你可以把它理解成“单次事务执行器”。

模块三 事件层原理(subscribe.ts)

订阅层负责把底层事件变成稳定的上层信号:

  1. 文本流:维护 assistantTexts
  2. 工具流:维护 toolMetas、tool summary
  3. 压缩流:维护三个关键状态:
  • compactionInFlight
  • pendingCompactionRetry
  • compactionRetryPromise
  1. 对外暴露 waitForCompactionRetry(),保证 run 不会在压缩未完成时“假完成”。

这是“流式事件稳定器”。

模块四 运行注册层原理(runs.ts)

ACTIVE_EMBEDDED_RUNS 的作用是“运行时控制平面”:

  1. 注册:setActiveEmbeddedRun(sessionId, handle)
  2. steer:queueEmbeddedPiMessage(...)
  3. 中断:abortEmbeddedPiRun(...)
  4. 清理:clearActiveEmbeddedRun(...)(带 handle 匹配,防止误删新 run)

这是“可中断、可插消息”的基础。

模块五 韧性层原理(model-fallback.ts)

runWithModelFallback(...) 是跨模型兜底:

  1. resolveFallbackCandidates(...) 先生成候选列表(主模型 + fallbacks + 去重)
  2. 每次失败后:
  • coerceToFailoverError(...) 归一化错误
  • describeFailoverError(...) 记录 reason/status/code
  1. shouldRethrowAbort(...) 对用户主动 Abort 直接停止,不做 fallback
  2. 全失败时返回汇总错误:All models failed (...)

这是“从单模型系统升级到可用系统”的关键。

从 0 到 1 复刻建议

第一版先抄 3 层:

  1. run(调度)
  2. attempt(单次事务)
  3. subscribe(事件聚合)

第二版补 2 层:

  1. runs(运行注册与中断)
  2. model-fallback(跨模型兜底)

最小复刻骨架

ts
async function runAgentTurn(req: Req) {
  return enqueue(req.sessionLane, () =>
    enqueue(req.globalLane, async () => {
      const handle = registerActive(req.sessionId);
      try {
        const attempt = await runAttempt(req);
        return attempt;
      } catch (err) {
        return await runWithFallback(err, req);
      } finally {
        clearActive(req.sessionId, handle);
      }
    }),
  );
}

自检清单

  1. 同一会话是否严格串行,不会消息乱序。
  2. 压缩重试期间是否不会提前返回“完成”。
  3. abort 后是否不会残留 active run。
  4. 模型失败时是否有结构化 attempts 轨迹。
  5. 账号轮换、压缩恢复、fallback 是否互不打架。

用工程视角拆解 AI 智能体框架