Skip to content

23 函数级剖析:流式订阅处理器

核心文件:

  • src/agents/pi-embedded-subscribe.ts
  • src/agents/pi-embedded-subscribe.handlers.ts
  • src/agents/pi-embedded-subscribe.handlers.messages.ts
  • src/agents/pi-embedded-subscribe.handlers.tools.ts

模块定位

subscribeEmbeddedPiSession 负责把底层 LLM 流式事件(text_delta/tool_execution_*)稳定转换成上层可消费回复(block 回调、partial 回调、messaging tool 去重)。

一、subscribeEmbeddedPiSession 返回结构

ts
// src/agents/pi-embedded-subscribe.ts

export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionParams) {
  // ...
  return {
    assistantTexts,                                        // string[] 累积的所有助手文本
    toolMetas,                                             // Array<{ toolName?, meta? }>
    unsubscribe,                                           // () => void
    isCompacting: () => state.compactionInFlight || state.pendingCompactionRetry > 0,
    isCompactionInFlight: () => state.compactionInFlight,
    getMessagingToolSentTexts: () => messagingToolSentTexts.slice(),
    getMessagingToolSentTargets: () => messagingToolSentTargets.slice(),
    didSendViaMessagingTool: () => messagingToolSentTexts.length > 0,
    getLastToolError: () => state.lastToolError ? { ...state.lastToolError } : undefined,
    getUsageTotals,
    getCompactionCount: () => compactionCount,
    waitForCompactionRetry: () => Promise<void>,  // 见下方
  };
}

waitForCompactionRetry 关键行为:

  • state.unsubscribed = true 时立即以 AbortError reject
  • compaction 进行中时等待 Promise
  • 无 compaction 时 queueMicrotask 再检查一次(防止竞态)

unsubscribe 关键行为:

ts
const unsubscribe = () => {
  if (state.unsubscribed) return;
  state.unsubscribed = true;          // 先标记,再操作
  if (state.compactionRetryPromise) {
    // reject(而非 resolve)—— 明确标识取消,不误判为成功
    reject?.(abortErr);
    state.compactionRetryPromise = null;
  }
  if (params.session.isCompacting) {
    params.session.abortCompaction(); // 中止进行中的 compaction
  }
  sessionUnsubscribe();
};

二、事件路由(handlers.ts)

ts
// src/agents/pi-embedded-subscribe.handlers.ts

export function createEmbeddedPiSessionEventHandler(ctx) {
  return (evt: EmbeddedPiSubscribeEvent) => {
    switch (evt.type) {
      case "message_start":    handleMessageStart(ctx, evt);     return;
      case "message_update":   handleMessageUpdate(ctx, evt);    return;
      case "message_end":      handleMessageEnd(ctx, evt);       return;
      case "tool_execution_start":   // async,fire-and-forget,catch 处理错误
        handleToolExecutionStart(ctx, evt).catch((err) => {
          ctx.log.debug(`tool_execution_start handler failed: ${String(err)}`);
        }); return;
      case "tool_execution_update":  handleToolExecutionUpdate(ctx, evt); return;
      case "tool_execution_end":     // async,fire-and-forget
        handleToolExecutionEnd(ctx, evt).catch(...); return;
      case "agent_start":      handleAgentStart(ctx);             return;
      case "auto_compaction_start":  handleAutoCompactionStart(ctx); return;
      case "auto_compaction_end":    handleAutoCompactionEnd(ctx, evt); return;
      case "agent_end":        handleAgentEnd(ctx);               return;
    }
  };
}

注意:tool_execution_starttool_execution_end 是异步的,但不 await(fire-and-forget),避免阻塞主事件流。

三、handleMessageStart(重置边界)

ts
// src/agents/pi-embedded-subscribe.handlers.messages.ts  行 32-49

export function handleMessageStart(
  ctx: EmbeddedPiSubscribeContext,
  evt: AgentEvent & { message: AgentMessage },
) {
  const msg = evt.message;
  if (msg?.role !== "assistant") return;

  // KNOWN: text_end 可能晚到或重复,不安全作为重置边界
  // ASSUME: message_start 才是"新 assistant 消息开始"的唯一可靠边界
  ctx.resetAssistantMessageState(ctx.state.assistantTexts.length);
  void ctx.params.onAssistantMessageStart?.();
}

resetAssistantMessageState 重置的所有状态:

ts
// pi-embedded-subscribe.ts

const resetAssistantMessageState = (nextAssistantTextBaseline: number) => {
  state.deltaBuffer = "";
  state.blockBuffer = "";
  blockChunker?.reset();
  replyDirectiveAccumulator.reset();
  partialReplyDirectiveAccumulator.reset();
  state.blockState.thinking = false;
  state.blockState.final = false;
  state.blockState.inlineCode = createInlineCodeState();
  state.partialBlockState = { thinking: false, final: false, inlineCode: createInlineCodeState() };
  state.lastStreamedAssistant = undefined;
  state.lastStreamedAssistantCleaned = undefined;
  state.emittedAssistantUpdate = false;
  state.lastBlockReplyText = undefined;
  state.lastStreamedReasoning = undefined;
  state.lastReasoningSent = undefined;
  state.suppressBlockChunks = false;
  state.assistantMessageIndex += 1;
  state.lastAssistantTextMessageIndex = -1;
  state.lastAssistantTextNormalized = undefined;
  state.lastAssistantTextTrimmed = undefined;
  state.assistantTextBaseline = nextAssistantTextBaseline;
};

四、handleMessageUpdate(deltaBuffer 去重)

deltaBuffer 类型: string(在 handlers.types.ts 中定义)

核心去重逻辑(行 86-112):

ts
// handlers.messages.ts

let chunk = "";
if (evtType === "text_delta") {
  chunk = delta;                            // delta 直接追加
} else if (evtType === "text_start" || evtType === "text_end") {
  if (delta) {
    chunk = delta;
  } else if (content) {
    // KNOWN: 某些 provider 在 text_end 时重发完整 content
    // 只取后缀,保持单调递增
    if (content.startsWith(ctx.state.deltaBuffer)) {
      chunk = content.slice(ctx.state.deltaBuffer.length);   // 仅追加新增部分
    } else if (ctx.state.deltaBuffer.startsWith(content)) {
      chunk = "";                                             // 旧内容子集,跳过
    } else if (!ctx.state.deltaBuffer.includes(content)) {
      chunk = content;                                        // 完全不重叠,全取
    }
  }
}

if (chunk) {
  ctx.state.deltaBuffer += chunk;   // 单调累积
  ctx.blockChunker?.append(chunk);  // 或 state.blockBuffer += chunk
}
条件处理
content.startsWith(deltaBuffer)取后缀 content.slice(deltaBuffer.length)
deltaBuffer.startsWith(content)跳过(content 是子集)
!deltaBuffer.includes(content)全量追加

text_end 时的 block flush:

ts
// 行 182-190
if (evtType === "text_end" && ctx.state.blockReplyBreak === "text_end") {
  if (ctx.blockChunker?.hasBuffered()) {
    ctx.blockChunker.drain({ force: true, emit: ctx.emitBlockChunk });
    ctx.blockChunker.reset();
  } else if (ctx.state.blockBuffer.length > 0) {
    ctx.emitBlockChunk(ctx.state.blockBuffer);
    ctx.state.blockBuffer = "";
  }
}

五、handleMessageEnd(最终收敛)

final 标签处理(行 217-238):

ts
const text = ctx.stripBlockTags(rawText, { thinking: false, final: false });
// ...
if (!cleanedText && !hasMedia) {
  // 回退:去掉 <final> 标签后尝试
  const rawStrippedFinal = rawText.trim().replace(/<\s*\/?s*final\s*>/gi, "").trim();
  const rawCandidate = rawStrippedFinal || rawText.trim();
  if (rawCandidate) {
    const parsedFallback = parseReplyDirectives(stripTrailingDirective(rawCandidate));
    cleanedText = parsedFallback.text ?? rawCandidate;
  }
}

messaging tool 去重检查(行 298-332):

ts
// 检查是否已经由 messaging tool 发送过相同文本
const normalizedText = normalizeTextForComparison(text);  // 小写+去emoji+折叠空白
if (isMessagingToolDuplicateNormalized(normalizedText, ctx.state.messagingToolSentTextsNormalized)) {
  ctx.log.debug(`Skipping message_end block reply - already sent via messaging tool`);
} else {
  ctx.state.lastBlockReplyText = text;
  const splitResult = ctx.consumeReplyDirectives(text, { final: true });
  if (splitResult) {
    void onBlockReply({ text: splitResult.text, mediaUrls, audioAsVoice, ... });
  }
}

message_end 时清理状态(行 367-374):

ts
ctx.state.deltaBuffer = "";
ctx.state.blockBuffer = "";
ctx.blockChunker?.reset();
ctx.state.blockState.thinking = false;
ctx.state.blockState.final = false;
ctx.state.blockState.inlineCode = createInlineCodeState();
ctx.state.lastStreamedAssistant = undefined;
ctx.state.lastStreamedAssistantCleaned = undefined;

六、messaging tool 去重机制

Start 阶段记录 pending(handlers.tools.ts 行 120-135):

ts
if (isMessagingTool(toolName) && isMessagingSend) {
  // key = toolCallId
  ctx.state.pendingMessagingTargets.set(toolCallId, sendTarget);
  const text = argsRecord.content ?? argsRecord.message;
  if (text) {
    ctx.state.pendingMessagingTexts.set(toolCallId, text);
  }
}

End 阶段 commit 或 discard(行 217-234):

ts
const pendingText = ctx.state.pendingMessagingTexts.get(toolCallId);
if (pendingText) {
  ctx.state.pendingMessagingTexts.delete(toolCallId);
  if (!isToolError) {
    // 成功:提交到去重列表
    ctx.state.messagingToolSentTexts.push(pendingText);
    ctx.state.messagingToolSentTextsNormalized.push(normalizeTextForComparison(pendingText));
    ctx.trimMessagingToolSent();  // 最多保留 200 条
  }
  // 失败:直接 delete,不提交,避免误判后续相同文本为重复
}

isMessagingToolDuplicateNormalized 去重判定:

ts
// src/agents/pi-embedded-helpers/messaging-dedupe.ts

const MIN_DUPLICATE_TEXT_LENGTH = 10;  // 短文本不做去重

export function isMessagingToolDuplicateNormalized(
  normalized: string,
  normalizedSentTexts: string[],
): boolean {
  if (normalized.length < MIN_DUPLICATE_TEXT_LENGTH) return false;
  return normalizedSentTexts.some((sent) =>
    normalized.includes(sent) || sent.includes(normalized),
  );
}

七、after_tool_call hook 调用

ts
// src/agents/pi-embedded-subscribe.handlers.tools.ts

// 模块级 Map(生命周期跨工具调用)
const toolStartData = new Map<string, { startTime: number; args: unknown }>();

// tool_execution_start 时记录(行 74)
toolStartData.set(toolCallId, { startTime: Date.now(), args });

// tool_execution_end 时计算并调用 hook(行 284-309)
const startData = toolStartData.get(toolCallId);
toolStartData.delete(toolCallId);      // 无论是否有 hook,都要清理防内存泄漏
const durationMs = startData?.startTime != null
  ? Date.now() - startData.startTime
  : undefined;

void hookRunner.runAfterToolCall({
  toolName,
  params: toolArgs as Record<string, unknown>,
  result: sanitizedResult,
  error: isToolError ? extractToolErrorMessage(sanitizedResult) : undefined,
  durationMs,  // 毫秒,可能为 undefined(若 startTime 丢失)
}, { toolName, agentId: undefined, sessionKey: undefined }).catch(...)

八、EmbeddedPiSubscribeState 关键字段

ts
// src/agents/pi-embedded-subscribe.handlers.types.ts

type EmbeddedPiSubscribeState = {
  // 流式累积
  deltaBuffer: string;                    // 当前消息累积的全量文本(单调递增)
  blockBuffer: string;                    // 待 flush 的 block 缓冲

  // 流式状态
  blockReplyBreak: "text_end" | "message_end";  // block 触发时机
  emittedAssistantUpdate: boolean;        // 是否已发出 streaming 更新

  // messaging tool 去重
  messagingToolSentTexts: string[];
  messagingToolSentTextsNormalized: string[];
  pendingMessagingTexts: Map<string, string>;    // toolCallId → text(未提交)
  pendingMessagingTargets: Map<string, MessagingToolSend>;  // toolCallId → target

  // compaction 状态
  compactionInFlight: boolean;
  pendingCompactionRetry: number;
  compactionRetryPromise: Promise<void> | null;
  unsubscribed: boolean;
};

九、自检清单

  1. handleMessageStart 才是状态重置点,text_end 不是(可能晚到或重复)。
  2. deltaBuffer 是单调累积字符串,不是环形 buffer;message_end 时清零。
  3. messaging tool 失败时 discard pending,不 commit —— 防止后续相同回复误判为重复。
  4. toolStartData 无 hook 时也要 delete,防止内存泄漏。
  5. unsubscribe 先设 state.unsubscribed = true 再 reject pending promise,防止竞态。

十、开发避坑

  1. emittedAssistantUpdate 的作用:首次 streaming 更新设为 true;handleMessageEnd 检查此标志决定是否补发"首次事件",防止 non-streaming provider 时客户端空白。
  2. blockReplyBreak 控制 block flush 时机"text_end" 时每段文字后 flush,"message_end" 时整条消息结束后 flush,影响实时性与原子性。
  3. messaging tool 去重最大 200 条trimMessagingToolSent 确保不无限增长,但超长对话的早期文本会被移出,可能出现"假重复"误判。
  4. tool 事件异步非阻塞:start/end 事件以 .catch() fire-and-forget 处理,确保工具执行不阻塞流式文本消费。

用工程视角拆解 AI 智能体框架