16 流式订阅与回复拼装
模块目标
拆解模型流式事件如何从原始 SSE 转变为用户最终收到的 payload, 以及 block streaming(分块推送)与最终消息的去重逻辑。
核心文件
src/agents/pi-embedded-subscribe.ts— 订阅入口,构建 contextsrc/agents/pi-embedded-subscribe.handlers.ts— 事件路由 switchsrc/agents/pi-embedded-subscribe.handlers.messages.ts— 文本流处理src/agents/pi-embedded-subscribe.handlers.tools.ts— 工具事件处理src/agents/pi-embedded-block-chunker.ts— 文本分块缓冲src/auto-reply/reply/block-reply-pipeline.ts— block 推送管道src/auto-reply/reply/agent-runner-payloads.ts— 最终 payload 过滤
一、订阅状态(EmbeddedPiSubscribeState)
ts
// src/agents/pi-embedded-subscribe.ts
const state: EmbeddedPiSubscribeState = {
assistantTexts: [], // 累积的文本片段
toolMetas: [], // 工具调用摘要列表
toolMetaById: new Map(), // toolCallId → 摘要(快速查找)
toolSummaryById: new Set(), // 已发送的工具摘要 ID(去重)
lastToolError: undefined, // 最近一次工具错误
deltaBuffer: "", // 单条消息的文本增量缓冲
blockBuffer: "", // 未分块模式的回复缓冲
// ...compaction、reasoning 等字段省略
};二、事件路由(精确源码)
ts
// src/agents/pi-embedded-subscribe.handlers.ts
export function createEmbeddedPiSessionEventHandler(ctx: EmbeddedPiSubscribeContext) {
return (evt: EmbeddedPiSubscribeEvent) => {
switch (evt.type) {
case "message_start":
handleMessageStart(ctx, evt as never);
return;
case "message_update":
handleMessageUpdate(ctx, evt as never);
return;
case "message_end":
handleMessageEnd(ctx, evt as never);
return;
case "tool_execution_start":
// 异步 fire-and-forget,打字指示器等副作用,不阻塞工具摘要
handleToolExecutionStart(ctx, evt as never).catch((err) => {
ctx.log.debug(`tool_execution_start handler failed: ${String(err)}`);
});
return;
}
};
}关键细节:
tool_execution_start是异步 fire-and-forget。失败时只打 debug 日志,不冒泡。message_start/update/end是同步处理。
三、文本流处理(单调追加策略)
ts
// src/agents/pi-embedded-subscribe.handlers.messages.ts
if (evtType !== "text_delta" && evtType !== "text_start" && evtType !== "text_end") {
return;
}
let chunk = "";
if (evtType === "text_delta") {
chunk = delta;
} else if (evtType === "text_start" || evtType === "text_end") {
if (delta) {
chunk = delta;
} else if (content) {
// KNOWN: Some providers resend full content on text_end.
// We only append a suffix (or nothing) to keep output monotonic.
if (content.startsWith(ctx.state.deltaBuffer)) {
chunk = content.slice(ctx.state.deltaBuffer.length); // 只追加 suffix
} else if (ctx.state.deltaBuffer.startsWith(content)) {
chunk = ""; // 服务端返回的内容是缓冲的前缀,忽略
} else if (!ctx.state.deltaBuffer.includes(content)) {
chunk = content; // 全新内容,直接追加
}
}
}
if (chunk) {
ctx.state.deltaBuffer += chunk; // 更新单条消息缓冲
if (ctx.blockChunker) {
ctx.blockChunker.append(chunk); // 分块模式:交给 chunker
} else {
ctx.state.blockBuffer += chunk; // 非分块模式:直接累积
}
}为什么要单调追加? 部分 provider(如某些 OpenAI 兼容接口)在 text_end 事件中重发完整内容,而不是只发增量。 单调追加策略通过前缀匹配来防止内容重复。
四、EmbeddedBlockChunker(文本分块缓冲)
ts
// src/agents/pi-embedded-block-chunker.ts
export class EmbeddedBlockChunker {
#buffer = "";
readonly #chunking: BlockReplyChunking;
constructor(chunking: BlockReplyChunking) {
this.#chunking = chunking;
}
append(text: string) {
if (!text) return;
this.#buffer += text;
}
reset() { this.#buffer = ""; }
get bufferedText() { return this.#buffer; }
hasBuffered(): boolean { return this.#buffer.length > 0; }
}BlockChunker 按配置的分块策略(换行/段落/句子等)决定何时 flush,触发 block 推送。
五、BlockReplyPipeline 接口
ts
// src/auto-reply/reply/block-reply-pipeline.ts
export type BlockReplyPipeline = {
enqueue: (payload: ReplyPayload) => void; // 入队(可能触发 coalescer 缓冲)
flush: (options?: { force?: boolean }) => Promise<void>; // 强制刷新
stop: () => void; // 停止接受新 payload
hasBuffered: () => boolean; // 是否有待发 payload
didStream: () => boolean; // 是否已流式发送过内容
isAborted: () => boolean; // 是否已中止
hasSentPayload: (payload: ReplyPayload) => boolean; // 去重检查
};hasSentPayload 是最终 payload 过滤的核心方法(见第六节)。
六、去重机制(双路策略)
工具执行开始前,handleToolExecutionStart 会 flush 当前缓冲区,将文本提前发给用户。 这产生了"同一段文本可能被 block streaming 提前发送,又出现在最终 payload"的问题。
两种场景:
场景 A:有 BlockReplyPipeline
ts
// 最终 payload 过滤:用 pipeline.hasSentPayload() 排除已发送的
const filteredPayloads = dedupedPayloads.filter(
(payload) => !params.blockReplyPipeline?.hasSentPayload(payload)
);场景 B:没有 Pipeline(但启用了 blockStreaming)
工具执行前直接发送,同时记录 key:
ts
// agent-runner-execution.ts
const directlySentBlockKeys = new Set<string>();
if (params.blockStreamingEnabled && params.blockReplyPipeline) {
params.blockReplyPipeline.enqueue(blockPayload); // 走 pipeline
} else if (params.blockStreamingEnabled) {
directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload)); // 记录 key
await params.opts?.onBlockReply?.(blockPayload); // 直接发送
}最终过滤时排除已直接发送的:
ts
const filteredPayloads = params.directlySentBlockKeys?.size
? dedupedPayloads.filter(
(payload) => !params.directlySentBlockKeys!.has(createBlockReplyPayloadKey(payload)),
)
: dedupedPayloads;七、完整数据流
LLM SSE stream
│
▼
session.subscribe(handler)
│
├─ message_update → text_delta/text_start/text_end
│ │
│ ▼
│ deltaBuffer += chunk
│ │
│ ├─ blockChunker.append(chunk) [分块模式]
│ │ │
│ │ ▼
│ │ flush 触发 → BlockReplyPipeline.enqueue(payload)
│ │ │
│ │ ▼
│ │ coalescer 缓冲 → onFlush(payload) → 推给用户
│ │
│ └─ blockBuffer += chunk [非分块模式]
│
├─ tool_execution_start → flushBlockReplyBuffer() [提前 flush]
│ │
│ └─ directlySentBlockKeys.add(key) [记录直发 key]
│
└─ message_end → assistantTexts.push(text)
│
▼
buildEmbeddedRunPayloads(...) [组装最终 payload]
│
▼
buildReplyPayloads(...)
│
▼
filteredPayloads(排除 pipeline/directlySent 已发内容)
│
▼
最终发给用户八、handleToolExecutionStart 的副作用
ts
// src/agents/pi-embedded-subscribe.handlers.tools.ts
export async function handleToolExecutionStart(
ctx: EmbeddedPiSubscribeContext,
evt: AgentEvent & { toolName: string; toolCallId: string; args: unknown },
) {
// 1. Flush 待发文本(让用户先看到之前的 assistant 文字)
ctx.flushBlockReplyBuffer();
if (ctx.params.onBlockReplyFlush) {
void ctx.params.onBlockReplyFlush();
}
// 2. 推断工具元数据
const meta = extendExecMeta(toolName, args, inferToolMetaFromArgs(toolName, args));
ctx.state.toolMetaById.set(toolCallId, buildToolCallSummary(toolName, args, meta));
// 3. 发出 agent event(用于 WS 推送,让 UI 显示"正在执行工具")
emitAgentEvent({
runId: ctx.params.runId,
stream: "tool",
data: { phase: "start", name: toolName, toolCallId, args: args as Record<string, unknown> },
});
}九、自检清单
tool_execution_starthandler 是 async fire-and-forget,失败只记 debug,不影响工具执行。- 文本流使用单调追加策略,防止
text_end重发完整内容导致重复。 EmbeddedBlockChunker.#buffer私有字段,只通过bufferedTextgetter 读取。directlySentBlockKeys只在"blockStreaming 开启但无 pipeline"时使用。- 最终 payload 过滤发生在
buildReplyPayloads中,而不是 subscribe 层。 BlockReplyPipeline.hasSentPayload通过 payload key 去重,而非引用相等。
十、开发避坑
- 不要在
tool_execution_start中做阻塞操作(会延迟工具摘要显示,影响 UX)。 blockBuffer和deltaBuffer是两个不同的缓冲区:deltaBuffer:追踪单条消息的累积文本(用于单调追加判断)blockBuffer:非分块模式下的推送缓冲
- 最终 payload 的
text字段来自assistantTexts,而不是blockBuffer—— 两者最终内容相同,但blockBuffer是中间态,assistantTexts是message_end后确认的。