23 函数级剖析:流式订阅处理器
核心文件:
src/agents/pi-embedded-subscribe.tssrc/agents/pi-embedded-subscribe.handlers.tssrc/agents/pi-embedded-subscribe.handlers.messages.tssrc/agents/pi-embedded-subscribe.handlers.tools.ts
模块定位
subscribeEmbeddedPiSession 负责把底层 LLM 流式事件(text_delta/tool_execution_*)稳定转换成上层可消费回复(block 回调、partial 回调、messaging tool 去重)。
一、subscribeEmbeddedPiSession 返回结构
ts
// src/agents/pi-embedded-subscribe.ts
export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionParams) {
// ...
return {
assistantTexts, // string[] 累积的所有助手文本
toolMetas, // Array<{ toolName?, meta? }>
unsubscribe, // () => void
isCompacting: () => state.compactionInFlight || state.pendingCompactionRetry > 0,
isCompactionInFlight: () => state.compactionInFlight,
getMessagingToolSentTexts: () => messagingToolSentTexts.slice(),
getMessagingToolSentTargets: () => messagingToolSentTargets.slice(),
didSendViaMessagingTool: () => messagingToolSentTexts.length > 0,
getLastToolError: () => state.lastToolError ? { ...state.lastToolError } : undefined,
getUsageTotals,
getCompactionCount: () => compactionCount,
waitForCompactionRetry: () => Promise<void>, // 见下方
};
}waitForCompactionRetry 关键行为:
state.unsubscribed = true时立即以AbortErrorreject- compaction 进行中时等待 Promise
- 无 compaction 时
queueMicrotask再检查一次(防止竞态)
unsubscribe 关键行为:
ts
const unsubscribe = () => {
if (state.unsubscribed) return;
state.unsubscribed = true; // 先标记,再操作
if (state.compactionRetryPromise) {
// reject(而非 resolve)—— 明确标识取消,不误判为成功
reject?.(abortErr);
state.compactionRetryPromise = null;
}
if (params.session.isCompacting) {
params.session.abortCompaction(); // 中止进行中的 compaction
}
sessionUnsubscribe();
};二、事件路由(handlers.ts)
ts
// src/agents/pi-embedded-subscribe.handlers.ts
export function createEmbeddedPiSessionEventHandler(ctx) {
return (evt: EmbeddedPiSubscribeEvent) => {
switch (evt.type) {
case "message_start": handleMessageStart(ctx, evt); return;
case "message_update": handleMessageUpdate(ctx, evt); return;
case "message_end": handleMessageEnd(ctx, evt); return;
case "tool_execution_start": // async,fire-and-forget,catch 处理错误
handleToolExecutionStart(ctx, evt).catch((err) => {
ctx.log.debug(`tool_execution_start handler failed: ${String(err)}`);
}); return;
case "tool_execution_update": handleToolExecutionUpdate(ctx, evt); return;
case "tool_execution_end": // async,fire-and-forget
handleToolExecutionEnd(ctx, evt).catch(...); return;
case "agent_start": handleAgentStart(ctx); return;
case "auto_compaction_start": handleAutoCompactionStart(ctx); return;
case "auto_compaction_end": handleAutoCompactionEnd(ctx, evt); return;
case "agent_end": handleAgentEnd(ctx); return;
}
};
}注意:tool_execution_start 和 tool_execution_end 是异步的,但不 await(fire-and-forget),避免阻塞主事件流。
三、handleMessageStart(重置边界)
ts
// src/agents/pi-embedded-subscribe.handlers.messages.ts 行 32-49
export function handleMessageStart(
ctx: EmbeddedPiSubscribeContext,
evt: AgentEvent & { message: AgentMessage },
) {
const msg = evt.message;
if (msg?.role !== "assistant") return;
// KNOWN: text_end 可能晚到或重复,不安全作为重置边界
// ASSUME: message_start 才是"新 assistant 消息开始"的唯一可靠边界
ctx.resetAssistantMessageState(ctx.state.assistantTexts.length);
void ctx.params.onAssistantMessageStart?.();
}resetAssistantMessageState 重置的所有状态:
ts
// pi-embedded-subscribe.ts
const resetAssistantMessageState = (nextAssistantTextBaseline: number) => {
state.deltaBuffer = "";
state.blockBuffer = "";
blockChunker?.reset();
replyDirectiveAccumulator.reset();
partialReplyDirectiveAccumulator.reset();
state.blockState.thinking = false;
state.blockState.final = false;
state.blockState.inlineCode = createInlineCodeState();
state.partialBlockState = { thinking: false, final: false, inlineCode: createInlineCodeState() };
state.lastStreamedAssistant = undefined;
state.lastStreamedAssistantCleaned = undefined;
state.emittedAssistantUpdate = false;
state.lastBlockReplyText = undefined;
state.lastStreamedReasoning = undefined;
state.lastReasoningSent = undefined;
state.suppressBlockChunks = false;
state.assistantMessageIndex += 1;
state.lastAssistantTextMessageIndex = -1;
state.lastAssistantTextNormalized = undefined;
state.lastAssistantTextTrimmed = undefined;
state.assistantTextBaseline = nextAssistantTextBaseline;
};四、handleMessageUpdate(deltaBuffer 去重)
deltaBuffer 类型: string(在 handlers.types.ts 中定义)
核心去重逻辑(行 86-112):
ts
// handlers.messages.ts
let chunk = "";
if (evtType === "text_delta") {
chunk = delta; // delta 直接追加
} else if (evtType === "text_start" || evtType === "text_end") {
if (delta) {
chunk = delta;
} else if (content) {
// KNOWN: 某些 provider 在 text_end 时重发完整 content
// 只取后缀,保持单调递增
if (content.startsWith(ctx.state.deltaBuffer)) {
chunk = content.slice(ctx.state.deltaBuffer.length); // 仅追加新增部分
} else if (ctx.state.deltaBuffer.startsWith(content)) {
chunk = ""; // 旧内容子集,跳过
} else if (!ctx.state.deltaBuffer.includes(content)) {
chunk = content; // 完全不重叠,全取
}
}
}
if (chunk) {
ctx.state.deltaBuffer += chunk; // 单调累积
ctx.blockChunker?.append(chunk); // 或 state.blockBuffer += chunk
}| 条件 | 处理 |
|---|---|
content.startsWith(deltaBuffer) | 取后缀 content.slice(deltaBuffer.length) |
deltaBuffer.startsWith(content) | 跳过(content 是子集) |
!deltaBuffer.includes(content) | 全量追加 |
text_end 时的 block flush:
ts
// 行 182-190
if (evtType === "text_end" && ctx.state.blockReplyBreak === "text_end") {
if (ctx.blockChunker?.hasBuffered()) {
ctx.blockChunker.drain({ force: true, emit: ctx.emitBlockChunk });
ctx.blockChunker.reset();
} else if (ctx.state.blockBuffer.length > 0) {
ctx.emitBlockChunk(ctx.state.blockBuffer);
ctx.state.blockBuffer = "";
}
}五、handleMessageEnd(最终收敛)
final 标签处理(行 217-238):
ts
const text = ctx.stripBlockTags(rawText, { thinking: false, final: false });
// ...
if (!cleanedText && !hasMedia) {
// 回退:去掉 <final> 标签后尝试
const rawStrippedFinal = rawText.trim().replace(/<\s*\/?s*final\s*>/gi, "").trim();
const rawCandidate = rawStrippedFinal || rawText.trim();
if (rawCandidate) {
const parsedFallback = parseReplyDirectives(stripTrailingDirective(rawCandidate));
cleanedText = parsedFallback.text ?? rawCandidate;
}
}messaging tool 去重检查(行 298-332):
ts
// 检查是否已经由 messaging tool 发送过相同文本
const normalizedText = normalizeTextForComparison(text); // 小写+去emoji+折叠空白
if (isMessagingToolDuplicateNormalized(normalizedText, ctx.state.messagingToolSentTextsNormalized)) {
ctx.log.debug(`Skipping message_end block reply - already sent via messaging tool`);
} else {
ctx.state.lastBlockReplyText = text;
const splitResult = ctx.consumeReplyDirectives(text, { final: true });
if (splitResult) {
void onBlockReply({ text: splitResult.text, mediaUrls, audioAsVoice, ... });
}
}message_end 时清理状态(行 367-374):
ts
ctx.state.deltaBuffer = "";
ctx.state.blockBuffer = "";
ctx.blockChunker?.reset();
ctx.state.blockState.thinking = false;
ctx.state.blockState.final = false;
ctx.state.blockState.inlineCode = createInlineCodeState();
ctx.state.lastStreamedAssistant = undefined;
ctx.state.lastStreamedAssistantCleaned = undefined;六、messaging tool 去重机制
Start 阶段记录 pending(handlers.tools.ts 行 120-135):
ts
if (isMessagingTool(toolName) && isMessagingSend) {
// key = toolCallId
ctx.state.pendingMessagingTargets.set(toolCallId, sendTarget);
const text = argsRecord.content ?? argsRecord.message;
if (text) {
ctx.state.pendingMessagingTexts.set(toolCallId, text);
}
}End 阶段 commit 或 discard(行 217-234):
ts
const pendingText = ctx.state.pendingMessagingTexts.get(toolCallId);
if (pendingText) {
ctx.state.pendingMessagingTexts.delete(toolCallId);
if (!isToolError) {
// 成功:提交到去重列表
ctx.state.messagingToolSentTexts.push(pendingText);
ctx.state.messagingToolSentTextsNormalized.push(normalizeTextForComparison(pendingText));
ctx.trimMessagingToolSent(); // 最多保留 200 条
}
// 失败:直接 delete,不提交,避免误判后续相同文本为重复
}isMessagingToolDuplicateNormalized 去重判定:
ts
// src/agents/pi-embedded-helpers/messaging-dedupe.ts
const MIN_DUPLICATE_TEXT_LENGTH = 10; // 短文本不做去重
export function isMessagingToolDuplicateNormalized(
normalized: string,
normalizedSentTexts: string[],
): boolean {
if (normalized.length < MIN_DUPLICATE_TEXT_LENGTH) return false;
return normalizedSentTexts.some((sent) =>
normalized.includes(sent) || sent.includes(normalized),
);
}七、after_tool_call hook 调用
ts
// src/agents/pi-embedded-subscribe.handlers.tools.ts
// 模块级 Map(生命周期跨工具调用)
const toolStartData = new Map<string, { startTime: number; args: unknown }>();
// tool_execution_start 时记录(行 74)
toolStartData.set(toolCallId, { startTime: Date.now(), args });
// tool_execution_end 时计算并调用 hook(行 284-309)
const startData = toolStartData.get(toolCallId);
toolStartData.delete(toolCallId); // 无论是否有 hook,都要清理防内存泄漏
const durationMs = startData?.startTime != null
? Date.now() - startData.startTime
: undefined;
void hookRunner.runAfterToolCall({
toolName,
params: toolArgs as Record<string, unknown>,
result: sanitizedResult,
error: isToolError ? extractToolErrorMessage(sanitizedResult) : undefined,
durationMs, // 毫秒,可能为 undefined(若 startTime 丢失)
}, { toolName, agentId: undefined, sessionKey: undefined }).catch(...)八、EmbeddedPiSubscribeState 关键字段
ts
// src/agents/pi-embedded-subscribe.handlers.types.ts
type EmbeddedPiSubscribeState = {
// 流式累积
deltaBuffer: string; // 当前消息累积的全量文本(单调递增)
blockBuffer: string; // 待 flush 的 block 缓冲
// 流式状态
blockReplyBreak: "text_end" | "message_end"; // block 触发时机
emittedAssistantUpdate: boolean; // 是否已发出 streaming 更新
// messaging tool 去重
messagingToolSentTexts: string[];
messagingToolSentTextsNormalized: string[];
pendingMessagingTexts: Map<string, string>; // toolCallId → text(未提交)
pendingMessagingTargets: Map<string, MessagingToolSend>; // toolCallId → target
// compaction 状态
compactionInFlight: boolean;
pendingCompactionRetry: number;
compactionRetryPromise: Promise<void> | null;
unsubscribed: boolean;
};九、自检清单
handleMessageStart才是状态重置点,text_end不是(可能晚到或重复)。deltaBuffer是单调累积字符串,不是环形 buffer;message_end时清零。- messaging tool 失败时 discard pending,不 commit —— 防止后续相同回复误判为重复。
toolStartData无 hook 时也要delete,防止内存泄漏。unsubscribe先设state.unsubscribed = true再 reject pending promise,防止竞态。
十、开发避坑
emittedAssistantUpdate的作用:首次 streaming 更新设为 true;handleMessageEnd检查此标志决定是否补发"首次事件",防止 non-streaming provider 时客户端空白。blockReplyBreak控制 block flush 时机:"text_end"时每段文字后 flush,"message_end"时整条消息结束后 flush,影响实时性与原子性。- messaging tool 去重最大 200 条:
trimMessagingToolSent确保不无限增长,但超长对话的早期文本会被移出,可能出现"假重复"误判。 - tool 事件异步非阻塞:start/end 事件以
.catch()fire-and-forget 处理,确保工具执行不阻塞流式文本消费。