Skip to content

16 流式订阅与回复拼装

模块目标

拆解模型流式事件如何从原始 SSE 转变为用户最终收到的 payload, 以及 block streaming(分块推送)与最终消息的去重逻辑。

核心文件

  • src/agents/pi-embedded-subscribe.ts — 订阅入口,构建 context
  • src/agents/pi-embedded-subscribe.handlers.ts — 事件路由 switch
  • src/agents/pi-embedded-subscribe.handlers.messages.ts — 文本流处理
  • src/agents/pi-embedded-subscribe.handlers.tools.ts — 工具事件处理
  • src/agents/pi-embedded-block-chunker.ts — 文本分块缓冲
  • src/auto-reply/reply/block-reply-pipeline.ts — block 推送管道
  • src/auto-reply/reply/agent-runner-payloads.ts — 最终 payload 过滤

一、订阅状态(EmbeddedPiSubscribeState)

ts
// src/agents/pi-embedded-subscribe.ts
const state: EmbeddedPiSubscribeState = {
  assistantTexts: [],           // 累积的文本片段
  toolMetas: [],                // 工具调用摘要列表
  toolMetaById: new Map(),      // toolCallId → 摘要(快速查找)
  toolSummaryById: new Set(),   // 已发送的工具摘要 ID(去重)
  lastToolError: undefined,     // 最近一次工具错误
  deltaBuffer: "",              // 单条消息的文本增量缓冲
  blockBuffer: "",              // 未分块模式的回复缓冲
  // ...compaction、reasoning 等字段省略
};

二、事件路由(精确源码)

ts
// src/agents/pi-embedded-subscribe.handlers.ts

export function createEmbeddedPiSessionEventHandler(ctx: EmbeddedPiSubscribeContext) {
  return (evt: EmbeddedPiSubscribeEvent) => {
    switch (evt.type) {
      case "message_start":
        handleMessageStart(ctx, evt as never);
        return;
      case "message_update":
        handleMessageUpdate(ctx, evt as never);
        return;
      case "message_end":
        handleMessageEnd(ctx, evt as never);
        return;
      case "tool_execution_start":
        // 异步 fire-and-forget,打字指示器等副作用,不阻塞工具摘要
        handleToolExecutionStart(ctx, evt as never).catch((err) => {
          ctx.log.debug(`tool_execution_start handler failed: ${String(err)}`);
        });
        return;
    }
  };
}

关键细节:

  • tool_execution_start异步 fire-and-forget。失败时只打 debug 日志,不冒泡。
  • message_start/update/end 是同步处理。

三、文本流处理(单调追加策略)

ts
// src/agents/pi-embedded-subscribe.handlers.messages.ts

if (evtType !== "text_delta" && evtType !== "text_start" && evtType !== "text_end") {
  return;
}

let chunk = "";
if (evtType === "text_delta") {
  chunk = delta;
} else if (evtType === "text_start" || evtType === "text_end") {
  if (delta) {
    chunk = delta;
  } else if (content) {
    // KNOWN: Some providers resend full content on text_end.
    // We only append a suffix (or nothing) to keep output monotonic.
    if (content.startsWith(ctx.state.deltaBuffer)) {
      chunk = content.slice(ctx.state.deltaBuffer.length);  // 只追加 suffix
    } else if (ctx.state.deltaBuffer.startsWith(content)) {
      chunk = "";   // 服务端返回的内容是缓冲的前缀,忽略
    } else if (!ctx.state.deltaBuffer.includes(content)) {
      chunk = content;  // 全新内容,直接追加
    }
  }
}

if (chunk) {
  ctx.state.deltaBuffer += chunk;    // 更新单条消息缓冲
  if (ctx.blockChunker) {
    ctx.blockChunker.append(chunk);  // 分块模式:交给 chunker
  } else {
    ctx.state.blockBuffer += chunk;  // 非分块模式:直接累积
  }
}

为什么要单调追加? 部分 provider(如某些 OpenAI 兼容接口)在 text_end 事件中重发完整内容,而不是只发增量。 单调追加策略通过前缀匹配来防止内容重复。

四、EmbeddedBlockChunker(文本分块缓冲)

ts
// src/agents/pi-embedded-block-chunker.ts

export class EmbeddedBlockChunker {
  #buffer = "";
  readonly #chunking: BlockReplyChunking;

  constructor(chunking: BlockReplyChunking) {
    this.#chunking = chunking;
  }

  append(text: string) {
    if (!text) return;
    this.#buffer += text;
  }

  reset() { this.#buffer = ""; }

  get bufferedText() { return this.#buffer; }

  hasBuffered(): boolean { return this.#buffer.length > 0; }
}

BlockChunker 按配置的分块策略(换行/段落/句子等)决定何时 flush,触发 block 推送。

五、BlockReplyPipeline 接口

ts
// src/auto-reply/reply/block-reply-pipeline.ts

export type BlockReplyPipeline = {
  enqueue: (payload: ReplyPayload) => void;       // 入队(可能触发 coalescer 缓冲)
  flush: (options?: { force?: boolean }) => Promise<void>;  // 强制刷新
  stop: () => void;                               // 停止接受新 payload
  hasBuffered: () => boolean;                     // 是否有待发 payload
  didStream: () => boolean;                       // 是否已流式发送过内容
  isAborted: () => boolean;                       // 是否已中止
  hasSentPayload: (payload: ReplyPayload) => boolean; // 去重检查
};

hasSentPayload 是最终 payload 过滤的核心方法(见第六节)。

六、去重机制(双路策略)

工具执行开始前,handleToolExecutionStart 会 flush 当前缓冲区,将文本提前发给用户。 这产生了"同一段文本可能被 block streaming 提前发送,又出现在最终 payload"的问题。

两种场景:

场景 A:有 BlockReplyPipeline

ts
// 最终 payload 过滤:用 pipeline.hasSentPayload() 排除已发送的
const filteredPayloads = dedupedPayloads.filter(
  (payload) => !params.blockReplyPipeline?.hasSentPayload(payload)
);

场景 B:没有 Pipeline(但启用了 blockStreaming)

工具执行前直接发送,同时记录 key:

ts
// agent-runner-execution.ts
const directlySentBlockKeys = new Set<string>();

if (params.blockStreamingEnabled && params.blockReplyPipeline) {
  params.blockReplyPipeline.enqueue(blockPayload);       // 走 pipeline
} else if (params.blockStreamingEnabled) {
  directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload));  // 记录 key
  await params.opts?.onBlockReply?.(blockPayload);       // 直接发送
}

最终过滤时排除已直接发送的:

ts
const filteredPayloads = params.directlySentBlockKeys?.size
  ? dedupedPayloads.filter(
      (payload) => !params.directlySentBlockKeys!.has(createBlockReplyPayloadKey(payload)),
    )
  : dedupedPayloads;

七、完整数据流

LLM SSE stream


session.subscribe(handler)

    ├─ message_update → text_delta/text_start/text_end
    │      │
    │      ▼
    │  deltaBuffer += chunk
    │      │
    │      ├─ blockChunker.append(chunk)  [分块模式]
    │      │      │
    │      │      ▼
    │      │  flush 触发 → BlockReplyPipeline.enqueue(payload)
    │      │                    │
    │      │                    ▼
    │      │              coalescer 缓冲 → onFlush(payload) → 推给用户
    │      │
    │      └─ blockBuffer += chunk  [非分块模式]

    ├─ tool_execution_start → flushBlockReplyBuffer()  [提前 flush]
    │      │
    │      └─ directlySentBlockKeys.add(key)  [记录直发 key]

    └─ message_end → assistantTexts.push(text)


    buildEmbeddedRunPayloads(...)  [组装最终 payload]


    buildReplyPayloads(...)


    filteredPayloads(排除 pipeline/directlySent 已发内容)


    最终发给用户

八、handleToolExecutionStart 的副作用

ts
// src/agents/pi-embedded-subscribe.handlers.tools.ts

export async function handleToolExecutionStart(
  ctx: EmbeddedPiSubscribeContext,
  evt: AgentEvent & { toolName: string; toolCallId: string; args: unknown },
) {
  // 1. Flush 待发文本(让用户先看到之前的 assistant 文字)
  ctx.flushBlockReplyBuffer();
  if (ctx.params.onBlockReplyFlush) {
    void ctx.params.onBlockReplyFlush();
  }

  // 2. 推断工具元数据
  const meta = extendExecMeta(toolName, args, inferToolMetaFromArgs(toolName, args));
  ctx.state.toolMetaById.set(toolCallId, buildToolCallSummary(toolName, args, meta));

  // 3. 发出 agent event(用于 WS 推送,让 UI 显示"正在执行工具")
  emitAgentEvent({
    runId: ctx.params.runId,
    stream: "tool",
    data: { phase: "start", name: toolName, toolCallId, args: args as Record<string, unknown> },
  });
}

九、自检清单

  1. tool_execution_start handler 是 async fire-and-forget,失败只记 debug,不影响工具执行。
  2. 文本流使用单调追加策略,防止 text_end 重发完整内容导致重复。
  3. EmbeddedBlockChunker.#buffer 私有字段,只通过 bufferedText getter 读取。
  4. directlySentBlockKeys 只在"blockStreaming 开启但无 pipeline"时使用。
  5. 最终 payload 过滤发生在 buildReplyPayloads 中,而不是 subscribe 层。
  6. BlockReplyPipeline.hasSentPayload 通过 payload key 去重,而非引用相等。

十、开发避坑

  1. 不要在 tool_execution_start 中做阻塞操作(会延迟工具摘要显示,影响 UX)。
  2. blockBufferdeltaBuffer 是两个不同的缓冲区:
    • deltaBuffer:追踪单条消息的累积文本(用于单调追加判断)
    • blockBuffer:非分块模式下的推送缓冲
  3. 最终 payload 的 text 字段来自 assistantTexts,而不是 blockBuffer—— 两者最终内容相同,但 blockBuffer 是中间态,assistantTextsmessage_end 后确认的。

用工程视角拆解 AI 智能体框架