Skip to content

17 智能体执行状态机实现实战(run/attempt/subscribe/runs)

这篇把“智能体框架”拆成可以编码的状态机。

对应源码入口

  • src/agents/pi-embedded-runner/run.ts
  • src/agents/pi-embedded-runner/run/attempt.ts
  • src/agents/pi-embedded-subscribe.ts
  • src/agents/pi-embedded-subscribe.handlers.ts
  • src/agents/pi-embedded-runner/runs.ts
  • src/agents/pi-embedded-runner/run/types.ts
  • src/agents/pi-embedded-runner/run/params.ts
  • src/auto-reply/reply/agent-runner.ts
  • src/auto-reply/reply/agent-runner-execution.ts

状态机总图(建议直接照抄)

idle -> queued(session lane) -> queued(global lane) -> attempting -> streaming -> compacting(optional) -> completed/failed -> idle

补充控制态:

  • aborting(用户或超时触发)
  • waiting_compaction_retry(订阅层等待重试结束)

第一层:上层编排状态机(agent-runner)

runReplyAgent(...)runAgentTurnWithFallback(...) 是“入口总控”:

  1. 先判断是否 steer/followup,避免重复启动新 run。
  2. 做 memory flush、typing、reply pipeline 等外围能力。
  3. runWithModelFallback(...) 包住执行,内部调用 runEmbeddedPiAgent(...)(或 CLI provider 走 runCliAgent(...))。
  4. 统一处理“瞬时错误重试、模型回退、会话重置”等恢复路径。

可以理解为:agent-runner* 负责“业务编排”,run.ts/attempt.ts 负责“智能体内核执行”。

第二层:调度状态机(run.ts)

runEmbeddedPiAgent(...) 负责:

  1. 车道排队:enqueue(sessionLane)enqueue(globalLane)
  2. 模型/鉴权准备:resolveModel + auth profile 轮换。
  3. 执行循环:while(true)runEmbeddedAttempt(...)
  4. 故障分支:上下文溢出、限流、鉴权失败、thinking 降级、profile 轮换。
  5. 输出组装:buildEmbeddedRunPayloads(...)

第三层:执行状态机(attempt.ts)

runEmbeddedAttempt(...) 负责一次真实尝试:

  1. 环境与会话准备:
  • resolveSandboxContext(...)
  • createOpenClawCodingTools(...)
  • createAgentSession(...)
  1. 订阅挂载:
  • subscribeEmbeddedPiSession(...)
  • 返回 assistantTexts/toolMetas/waitForCompactionRetry/...
  1. 运行期注册 active run:
  • 构造 queueHandlequeueMessage/isStreaming/isCompacting/abort
  • setActiveEmbeddedRun(sessionId, queueHandle)
  1. 退出清理(必须 finally):
  • unsubscribe()
  • clearActiveEmbeddedRun(sessionId, queueHandle)
  • flushPendingToolResultsAfterIdle(...)
  • session.dispose()

这一步是很多项目会漏掉的关键。

第四层:事件状态机(subscribe.ts + handlers)

createEmbeddedPiSessionEventHandler(...) 分发事件:

  • message_start -> 重置消息级状态
  • message_update -> 增量流处理(delta、reasoning、block chunk)
  • message_end -> 最终文本/推理输出收敛
  • tool_execution_start/update/end -> 工具事件与摘要
  • auto_compaction_start/end -> 压缩生命周期
  • agent_start/agent_end -> 生命周期事件

订阅层核心状态字段:

ts
type EmbeddedPiSubscribeState = {
  assistantTexts: string[];
  toolMetas: Array<{ toolName?: string; meta?: string }>;
  compactionInFlight: boolean;
  pendingCompactionRetry: number;
  compactionRetryPromise: Promise<void> | null;
  unsubscribed: boolean;
};

第五层:运行注册表状态机(runs.ts)

ACTIVE_EMBEDDED_RUNS: Map<sessionId, queueHandle>

操作:

  1. setActiveEmbeddedRun:进入 processing
  2. queueEmbeddedPiMessage:仅在 isStreaming && !isCompacting 时允许 steer。
  3. abortEmbeddedPiRun:调用 handle.abort()
  4. clearActiveEmbeddedRun:仅 handle 匹配才删除,避免误删新 run。
  5. waitForEmbeddedPiRunEnd:用于外部等待 run 完结。

最小复刻代码骨架

ts
type RunState = "idle" | "queued" | "attempting" | "streaming" | "compacting" | "done" | "failed";

async function runAgentTurn(req: Req) {
  return enqueue(req.sessionLane, () =>
    enqueue(req.globalLane, async () => {
      let state: RunState = "attempting";
      const handle = registerActive(req.sessionId);
      try {
        const sub = subscribe(req.session, onEvent);
        state = "streaming";
        await req.session.prompt(req.prompt);
        await sub.waitForCompactionRetry();
        state = "done";
        return collectResult(sub);
      } catch (e) {
        state = "failed";
        throw e;
      } finally {
        unregisterActive(req.sessionId, handle);
      }
    }),
  );
}

验收标准(必须过)

  1. 同一 sessionId 同时只存在一个 active handle。
  2. abort 后不会遗留 active run。
  3. tool 事件和文本事件都可在订阅层输出。
  4. compaction 重试期间不会提前返回“完成”。
  5. handle mismatch 时不会误 clear 新 run。
  6. finally 清理路径无异常吞没。

开发避坑

  1. 不要把 run/attempt/subscribe 混成一个大函数。
  2. active run 必须“注册-清理成对”,而且清理要校验 handle。
  3. compaction wait 不能只看 compactionInFlight,还要看 pendingCompactionRetry
  4. abort/timer/listener 清理必须在 finally,不能依赖 happy path。

用工程视角拆解 AI 智能体框架